Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "base/task_scheduler/thread_pool.h" | |
| 6 | |
| 7 #include <utility> | |
| 8 | |
| 9 #include "base/bind.h" | |
| 10 #include "base/logging.h" | |
| 11 #include "base/task_scheduler/utils.h" | |
| 12 | |
| 13 namespace base { | |
| 14 namespace task_scheduler { | |
| 15 | |
| 16 namespace { | |
| 17 | |
| 18 // A task runner that runs tasks with the PARALLEL strategy. | |
| 19 class SchedulerParallelTaskRunner : public TaskRunner { | |
| 20 public: | |
| 21 // Tasks posted through this task runner have |traits| and are inserted in | |
| 22 // |shared_priority_queue|. |delayed_task_manager| is used to post delayed | |
| 23 // tasks. |shutdown_manager| is notified when a task is posted. | |
| 24 SchedulerParallelTaskRunner(const TaskTraits& traits, | |
| 25 PriorityQueue* priority_queue, | |
| 26 DelayedTaskManager* delayed_task_manager, | |
| 27 ShutdownManager* shutdown_manager); | |
| 28 | |
| 29 // TaskRunner: | |
| 30 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 31 const Closure& closure, | |
| 32 TimeDelta delay) override; | |
| 33 bool RunsTasksOnCurrentThread() const override; | |
| 34 | |
| 35 private: | |
| 36 ~SchedulerParallelTaskRunner() override; | |
| 37 | |
| 38 TaskTraits traits_; | |
| 39 PriorityQueue* priority_queue_; | |
| 40 DelayedTaskManager* delayed_task_manager_; | |
| 41 ShutdownManager* shutdown_manager_; | |
| 42 | |
| 43 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | |
| 44 }; | |
| 45 | |
| 46 SchedulerParallelTaskRunner::SchedulerParallelTaskRunner( | |
| 47 const TaskTraits& traits, | |
| 48 PriorityQueue* priority_queue, | |
| 49 DelayedTaskManager* delayed_task_manager, | |
| 50 ShutdownManager* shutdown_manager) | |
| 51 : traits_(traits), | |
| 52 priority_queue_(priority_queue), | |
| 53 delayed_task_manager_(delayed_task_manager), | |
| 54 shutdown_manager_(shutdown_manager) {} | |
| 55 | |
| 56 bool SchedulerParallelTaskRunner::PostDelayedTask( | |
| 57 const tracked_objects::Location& from_here, | |
| 58 const Closure& closure, | |
| 59 TimeDelta delay) { | |
| 60 Task task(from_here, closure, traits_, TimeTicks::Now()); | |
| 61 if (!delay.is_zero()) | |
| 62 task.delayed_run_time = task.post_time + delay; | |
| 63 PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_, | |
| 64 shutdown_manager_, delayed_task_manager_); | |
| 65 return true; | |
| 66 } | |
| 67 | |
| 68 bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const { | |
| 69 // TODO(fdoray): Return true only if tasks posted may actually run on the | |
| 70 // current thread. It is valid, but not ideal, to always return true. | |
| 71 return true; | |
| 72 } | |
| 73 | |
| 74 SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default; | |
| 75 | |
| 76 // A task runner that runs tasks in with the SEQUENCED strategy. | |
| 77 class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | |
| 78 public: | |
| 79 // Tasks posted through this task runner have |traits| and are inserted in | |
| 80 // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|. | |
| 81 // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is | |
| 82 // notified when a task is posted. | |
| 83 SchedulerSequencedTaskRunner(const TaskTraits& traits, | |
| 84 scoped_refptr<Sequence> sequence, | |
| 85 PriorityQueue* priority_queue, | |
| 86 DelayedTaskManager* delayed_task_manager, | |
| 87 ShutdownManager* shutdown_manager); | |
| 88 | |
| 89 // SequencedTaskRunner: | |
| 90 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
| 91 const Closure& task, | |
| 92 TimeDelta delay) override; | |
| 93 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
| 94 const Closure& closure, | |
| 95 TimeDelta delay) override; | |
| 96 bool RunsTasksOnCurrentThread() const override; | |
| 97 | |
| 98 private: | |
| 99 ~SchedulerSequencedTaskRunner() override; | |
| 100 | |
| 101 TaskTraits traits_; | |
| 102 scoped_refptr<Sequence> sequence_; | |
| 103 PriorityQueue* priority_queue_; | |
| 104 DelayedTaskManager* delayed_task_manager_; | |
| 105 ShutdownManager* shutdown_manager_; | |
| 106 | |
| 107 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | |
| 108 }; | |
| 109 | |
| 110 SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner( | |
| 111 const TaskTraits& traits, | |
| 112 scoped_refptr<Sequence> sequence, | |
| 113 PriorityQueue* priority_queue, | |
| 114 DelayedTaskManager* delayed_task_manager, | |
| 115 ShutdownManager* shutdown_manager) | |
| 116 : traits_(traits), | |
| 117 sequence_(sequence), | |
| 118 priority_queue_(priority_queue), | |
| 119 delayed_task_manager_(delayed_task_manager), | |
| 120 shutdown_manager_(shutdown_manager) {} | |
| 121 | |
| 122 bool SchedulerSequencedTaskRunner::PostDelayedTask( | |
| 123 const tracked_objects::Location& from_here, | |
| 124 const Closure& closure, | |
| 125 TimeDelta delay) { | |
| 126 Task task(from_here, closure, traits_, TimeTicks::Now()); | |
| 127 if (!delay.is_zero()) | |
| 128 task.delayed_run_time = task.post_time + delay; | |
| 129 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_, | |
| 130 delayed_task_manager_); | |
| 131 return true; | |
| 132 } | |
| 133 | |
| 134 bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const { | |
| 135 // TODO(fdoray): Return true only if tasks posted may actually run on the | |
| 136 // current thread. It is valid, but not ideal, to always return true. | |
| 137 return true; | |
| 138 } | |
| 139 | |
| 140 bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask( | |
| 141 const tracked_objects::Location& from_here, | |
| 142 const Closure& task, | |
| 143 TimeDelta delay) { | |
| 144 return PostDelayedTask(from_here, task, delay); | |
| 145 } | |
| 146 | |
| 147 SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default; | |
| 148 | |
| 149 } // namespace | |
| 150 | |
| 151 | |
| 152 ThreadPool::~ThreadPool() = default; | |
| 153 | |
| 154 scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool( | |
| 155 ThreadPriority thread_priority, | |
| 156 size_t num_threads, | |
| 157 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, | |
| 158 ShutdownManager* shutdown_manager) { | |
| 159 scoped_ptr<ThreadPool> thread_pool( | |
| 160 new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback, | |
| 161 shutdown_manager)); | |
| 162 return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool) | |
| 163 : scoped_ptr<ThreadPool>(); | |
| 164 } | |
| 165 | |
| 166 size_t ThreadPool::GetNumThreads() const { | |
| 167 return worker_threads_.size(); | |
| 168 } | |
| 169 | |
| 170 scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits( | |
| 171 const TaskTraits& traits, | |
| 172 ExecutionMode execution_mode) { | |
| 173 switch (execution_mode) { | |
| 174 case ExecutionMode::PARALLEL: { | |
| 175 return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner( | |
| 176 traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_)); | |
| 177 } | |
| 178 | |
| 179 case ExecutionMode::SEQUENCED: { | |
| 180 // TODO(fdoray): Support TaskTraits().WithSequenceToken(). | |
| 181 return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner( | |
| 182 traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_, | |
| 183 &delayed_task_manager_, shutdown_manager_)); | |
| 184 } | |
| 185 | |
| 186 #if defined(OS_WIN) | |
| 187 case ExecutionMode::SINGLE_THREADED_COM_STA: | |
| 188 #endif | |
| 189 case ExecutionMode::SINGLE_THREADED: { | |
| 190 DCHECK(!worker_threads_.empty()); | |
| 191 // TODO(fdoray): Better thread assignment. | |
| 192 return scoped_refptr<TaskRunner>( | |
| 193 worker_threads_.front() | |
| 194 ->CreateTaskRunnerWithTraits(traits, execution_mode) | |
| 195 .get()); | |
| 196 } | |
| 197 | |
| 198 default: { | |
| 199 NOTREACHED(); | |
| 200 return scoped_refptr<TaskRunner>(); | |
| 201 } | |
| 202 } | |
| 203 } | |
| 204 | |
| 205 void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence, | |
| 206 const SequenceSortKey& sequence_sort_key, | |
| 207 const WorkerThread* worker_thread) { | |
| 208 DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get()); | |
| 209 | |
| 210 // Set a flag to avoid waking up a thread when reinserting |sequence| in | |
| 211 // |priority_queue_| if the thread doing the reinsertion: | |
| 212 // - Can run tasks from |priority_queue_|, and, | |
| 213 // - Doesn't have pending single-threaded tasks. | |
| 214 // If these conditions are met, the thread doing the reinsertion will soon | |
| 215 // pop a sequence from |priority_queue_|. There is no need to wake up a new | |
| 216 // thread to do it. | |
| 217 if (worker_thread->shared_priority_queue() == &priority_queue_ && | |
| 218 !worker_thread->HasSingleThreadedTasks()) { | |
| 219 disable_wake_up_thread_on_sequence_insertion_.Set(true); | |
| 220 } | |
| 221 | |
| 222 // Insert the sequence in the priority queue. | |
| 223 priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key); | |
| 224 | |
| 225 disable_wake_up_thread_on_sequence_insertion_.Set(false); | |
| 226 } | |
| 227 | |
| 228 void ThreadPool::JoinAllThreadsForTesting() { | |
| 229 for (const auto& worker_thread : worker_threads_) { | |
| 230 worker_thread->WakeUp(); | |
| 231 worker_thread->JoinForTesting(); | |
| 232 } | |
| 233 } | |
| 234 | |
| 235 ThreadPool::ThreadPool( | |
| 236 ThreadPriority thread_priority, | |
| 237 size_t num_threads, | |
| 238 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, | |
| 239 ShutdownManager* shutdown_manager) | |
| 240 : priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue, | |
| 241 Unretained(this))), | |
| 242 shutdown_manager_(shutdown_manager), | |
| 243 delayed_task_manager_( | |
| 244 Bind(&ThreadPool::WakeUpOneThread, Unretained(this)), | |
| 245 shutdown_manager_) { | |
| 246 DCHECK_GT(num_threads, 0u); | |
| 247 DCHECK(shutdown_manager); | |
| 248 | |
| 249 const WorkerThread::BecomesIdleCallback becomes_idle_callback = | |
| 250 Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this)); | |
| 251 worker_threads_.reserve(num_threads); | |
| 252 | |
| 253 for (size_t i = 0; i < num_threads; ++i) { | |
| 254 scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread( | |
| 255 thread_priority, &priority_queue_, reinsert_sequence_callback, | |
| 256 becomes_idle_callback, &delayed_task_manager_, shutdown_manager_); | |
| 257 if (worker_thread.get() != nullptr) | |
| 258 worker_threads_.push_back(std::move(worker_thread)); | |
| 259 } | |
| 260 } | |
| 261 | |
| 262 void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) { | |
|
fdoray
2016/02/11 17:30:33
DCHECK that |worker_thread| belongs to this thread
fdoray
2016/02/12 04:16:20
Done.
| |
| 263 AutoSchedulerLock auto_lock_(idle_worker_threads_lock_); | |
| 264 | |
| 265 if (idle_worker_threads_set_.find(worker_thread) != | |
| 266 idle_worker_threads_set_.end()) { | |
| 267 // The worker thread is already on the stack of idle threads. | |
| 268 return; | |
| 269 } | |
| 270 | |
| 271 // Add the worker thread to the stack of idle threads. | |
| 272 idle_worker_threads_stack_.push(worker_thread); | |
| 273 idle_worker_threads_set_.insert(worker_thread); | |
| 274 } | |
| 275 | |
| 276 void ThreadPool::WakeUpOneThread() { | |
| 277 // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't | |
| 278 // have pending or running single-threaded tasks. | |
| 279 AutoSchedulerLock auto_lock(idle_worker_threads_lock_); | |
| 280 while (!idle_worker_threads_stack_.empty()) { | |
| 281 WorkerThread* worker_thread = idle_worker_threads_stack_.top(); | |
| 282 | |
| 283 idle_worker_threads_stack_.pop(); | |
| 284 idle_worker_threads_set_.erase(worker_thread); | |
| 285 | |
| 286 // HasSingleThreadedTasks() can return stale results. However, when it | |
| 287 // returns true below, it is guaranteed that |worker_thread| is either awake | |
| 288 // or about to be woken up and that it will not enter | |
| 289 // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This | |
| 290 // is important because if all threads in |idle_worker_threads_stack_| | |
| 291 // report that they have single-threaded tasks, no thread is woken up by | |
| 292 // this method. If these threads don't check |priority_queue_| before | |
| 293 // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could | |
| 294 // end up never being done. The guarantee works between the moment | |
| 295 // HasSingleThreadedTasks() goes from true to false and the moment | |
| 296 // |worker_thread| enters WaitUntilWorkIsAvailable(), | |
| 297 // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool. | |
| 298 // Both WorkerThreadBecomesIdleCallback() and the current method acquire | |
| 299 // |idle_worker_threads_lock_|, which synchronizes the value returned by | |
| 300 // HasSingleThreadedTasks(). | |
| 301 // | |
| 302 // TODO(fdoray): A single-threaded task can be posted to |worker_thread| | |
| 303 // immediately after HasSingleThreadedTasks() has returned false. Ideally, | |
| 304 // when this happens, another worker thread should be woken up. | |
| 305 if (!worker_thread->HasSingleThreadedTasks()) { | |
|
robliao
2016/02/11 22:30:07
Maybe a controller pattern might make reasoning ov
fdoray
2016/02/12 04:16:20
I agree. I'll try to write this (tomorrow in the p
| |
| 306 worker_thread->WakeUp(); | |
| 307 break; | |
| 308 } | |
| 309 } | |
| 310 } | |
| 311 | |
| 312 void ThreadPool::OnSequenceInsertedInPriorityQueue() { | |
| 313 if (disable_wake_up_thread_on_sequence_insertion_.Get()) | |
| 314 return; | |
| 315 | |
| 316 WakeUpOneThread(); | |
| 317 } | |
| 318 | |
| 319 } // namespace task_scheduler | |
| 320 } // namespace base | |
| OLD | NEW |