Index: base/task_scheduler/thread_pool.cc |
diff --git a/base/task_scheduler/thread_pool.cc b/base/task_scheduler/thread_pool.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..953613e122c2e5cef764a3ee1a05e63f0ee44eba |
--- /dev/null |
+++ b/base/task_scheduler/thread_pool.cc |
@@ -0,0 +1,320 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "base/task_scheduler/thread_pool.h" |
+ |
+#include <utility> |
+ |
+#include "base/bind.h" |
+#include "base/logging.h" |
+#include "base/task_scheduler/utils.h" |
+ |
+namespace base { |
+namespace task_scheduler { |
+ |
+namespace { |
+ |
+// A task runner that runs tasks with the PARALLEL strategy. |
+class SchedulerParallelTaskRunner : public TaskRunner { |
+ public: |
+ // Tasks posted through this task runner have |traits| and are inserted in |
+ // |shared_priority_queue|. |delayed_task_manager| is used to post delayed |
+ // tasks. |shutdown_manager| is notified when a task is posted. |
+ SchedulerParallelTaskRunner(const TaskTraits& traits, |
+ PriorityQueue* priority_queue, |
+ DelayedTaskManager* delayed_task_manager, |
+ ShutdownManager* shutdown_manager); |
+ |
+ // TaskRunner: |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) override; |
+ bool RunsTasksOnCurrentThread() const override; |
+ |
+ private: |
+ ~SchedulerParallelTaskRunner() override; |
+ |
+ TaskTraits traits_; |
+ PriorityQueue* priority_queue_; |
+ DelayedTaskManager* delayed_task_manager_; |
+ ShutdownManager* shutdown_manager_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); |
+}; |
+ |
+SchedulerParallelTaskRunner::SchedulerParallelTaskRunner( |
+ const TaskTraits& traits, |
+ PriorityQueue* priority_queue, |
+ DelayedTaskManager* delayed_task_manager, |
+ ShutdownManager* shutdown_manager) |
+ : traits_(traits), |
+ priority_queue_(priority_queue), |
+ delayed_task_manager_(delayed_task_manager), |
+ shutdown_manager_(shutdown_manager) {} |
+ |
+bool SchedulerParallelTaskRunner::PostDelayedTask( |
+ const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) { |
+ Task task(from_here, closure, traits_, TimeTicks::Now()); |
+ if (!delay.is_zero()) |
+ task.delayed_run_time = task.post_time + delay; |
+ PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_, |
+ shutdown_manager_, delayed_task_manager_); |
+ return true; |
+} |
+ |
+bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const { |
+ // TODO(fdoray): Return true only if tasks posted may actually run on the |
+ // current thread. It is valid, but not ideal, to always return true. |
+ return true; |
+} |
+ |
+SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default; |
+ |
+// A task runner that runs tasks in with the SEQUENCED strategy. |
+class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
+ public: |
+ // Tasks posted through this task runner have |traits| and are inserted in |
+ // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|. |
+ // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is |
+ // notified when a task is posted. |
+ SchedulerSequencedTaskRunner(const TaskTraits& traits, |
+ scoped_refptr<Sequence> sequence, |
+ PriorityQueue* priority_queue, |
+ DelayedTaskManager* delayed_task_manager, |
+ ShutdownManager* shutdown_manager); |
+ |
+ // SequencedTaskRunner: |
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& task, |
+ TimeDelta delay) override; |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) override; |
+ bool RunsTasksOnCurrentThread() const override; |
+ |
+ private: |
+ ~SchedulerSequencedTaskRunner() override; |
+ |
+ TaskTraits traits_; |
+ scoped_refptr<Sequence> sequence_; |
+ PriorityQueue* priority_queue_; |
+ DelayedTaskManager* delayed_task_manager_; |
+ ShutdownManager* shutdown_manager_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
+}; |
+ |
+SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner( |
+ const TaskTraits& traits, |
+ scoped_refptr<Sequence> sequence, |
+ PriorityQueue* priority_queue, |
+ DelayedTaskManager* delayed_task_manager, |
+ ShutdownManager* shutdown_manager) |
+ : traits_(traits), |
+ sequence_(sequence), |
+ priority_queue_(priority_queue), |
+ delayed_task_manager_(delayed_task_manager), |
+ shutdown_manager_(shutdown_manager) {} |
+ |
+bool SchedulerSequencedTaskRunner::PostDelayedTask( |
+ const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) { |
+ Task task(from_here, closure, traits_, TimeTicks::Now()); |
+ if (!delay.is_zero()) |
+ task.delayed_run_time = task.post_time + delay; |
+ PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_, |
+ delayed_task_manager_); |
+ return true; |
+} |
+ |
+bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const { |
+ // TODO(fdoray): Return true only if tasks posted may actually run on the |
+ // current thread. It is valid, but not ideal, to always return true. |
+ return true; |
+} |
+ |
+bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask( |
+ const tracked_objects::Location& from_here, |
+ const Closure& task, |
+ TimeDelta delay) { |
+ return PostDelayedTask(from_here, task, delay); |
+} |
+ |
+SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default; |
+ |
+} // namespace |
+ |
+ |
+ThreadPool::~ThreadPool() = default; |
+ |
+scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool( |
+ ThreadPriority thread_priority, |
+ size_t num_threads, |
+ const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, |
+ ShutdownManager* shutdown_manager) { |
+ scoped_ptr<ThreadPool> thread_pool( |
+ new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback, |
+ shutdown_manager)); |
+ return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool) |
+ : scoped_ptr<ThreadPool>(); |
+} |
+ |
+size_t ThreadPool::GetNumThreads() const { |
+ return worker_threads_.size(); |
+} |
+ |
+scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits( |
+ const TaskTraits& traits, |
+ ExecutionMode execution_mode) { |
+ switch (execution_mode) { |
+ case ExecutionMode::PARALLEL: { |
+ return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner( |
+ traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_)); |
+ } |
+ |
+ case ExecutionMode::SEQUENCED: { |
+ // TODO(fdoray): Support TaskTraits().WithSequenceToken(). |
+ return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner( |
+ traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_, |
+ &delayed_task_manager_, shutdown_manager_)); |
+ } |
+ |
+#if defined(OS_WIN) |
+ case ExecutionMode::SINGLE_THREADED_COM_STA: |
+#endif |
+ case ExecutionMode::SINGLE_THREADED: { |
+ DCHECK(!worker_threads_.empty()); |
+ // TODO(fdoray): Better thread assignment. |
+ return scoped_refptr<TaskRunner>( |
+ worker_threads_.front() |
+ ->CreateTaskRunnerWithTraits(traits, execution_mode) |
+ .get()); |
+ } |
+ |
+ default: { |
+ NOTREACHED(); |
+ return scoped_refptr<TaskRunner>(); |
+ } |
+ } |
+} |
+ |
+void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence, |
+ const SequenceSortKey& sequence_sort_key, |
+ const WorkerThread* worker_thread) { |
+ DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get()); |
+ |
+ // Set a flag to avoid waking up a thread when reinserting |sequence| in |
+ // |priority_queue_| if the thread doing the reinsertion: |
+ // - Can run tasks from |priority_queue_|, and, |
+ // - Doesn't have pending single-threaded tasks. |
+ // If these conditions are met, the thread doing the reinsertion will soon |
+ // pop a sequence from |priority_queue_|. There is no need to wake up a new |
+ // thread to do it. |
+ if (worker_thread->shared_priority_queue() == &priority_queue_ && |
+ !worker_thread->HasSingleThreadedTasks()) { |
+ disable_wake_up_thread_on_sequence_insertion_.Set(true); |
+ } |
+ |
+ // Insert the sequence in the priority queue. |
+ priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key); |
+ |
+ disable_wake_up_thread_on_sequence_insertion_.Set(false); |
+} |
+ |
+void ThreadPool::JoinAllThreadsForTesting() { |
+ for (const auto& worker_thread : worker_threads_) { |
+ worker_thread->WakeUp(); |
+ worker_thread->JoinForTesting(); |
+ } |
+} |
+ |
+ThreadPool::ThreadPool( |
+ ThreadPriority thread_priority, |
+ size_t num_threads, |
+ const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback, |
+ ShutdownManager* shutdown_manager) |
+ : priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue, |
+ Unretained(this))), |
+ shutdown_manager_(shutdown_manager), |
+ delayed_task_manager_( |
+ Bind(&ThreadPool::WakeUpOneThread, Unretained(this)), |
+ shutdown_manager_) { |
+ DCHECK_GT(num_threads, 0u); |
+ DCHECK(shutdown_manager); |
+ |
+ const WorkerThread::BecomesIdleCallback becomes_idle_callback = |
+ Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this)); |
+ worker_threads_.reserve(num_threads); |
+ |
+ for (size_t i = 0; i < num_threads; ++i) { |
+ scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread( |
+ thread_priority, &priority_queue_, reinsert_sequence_callback, |
+ becomes_idle_callback, &delayed_task_manager_, shutdown_manager_); |
+ if (worker_thread.get() != nullptr) |
+ worker_threads_.push_back(std::move(worker_thread)); |
+ } |
+} |
+ |
+void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) { |
fdoray
2016/02/11 17:30:33
DCHECK that |worker_thread| belongs to this thread
fdoray
2016/02/12 04:16:20
Done.
|
+ AutoSchedulerLock auto_lock_(idle_worker_threads_lock_); |
+ |
+ if (idle_worker_threads_set_.find(worker_thread) != |
+ idle_worker_threads_set_.end()) { |
+ // The worker thread is already on the stack of idle threads. |
+ return; |
+ } |
+ |
+ // Add the worker thread to the stack of idle threads. |
+ idle_worker_threads_stack_.push(worker_thread); |
+ idle_worker_threads_set_.insert(worker_thread); |
+} |
+ |
+void ThreadPool::WakeUpOneThread() { |
+ // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't |
+ // have pending or running single-threaded tasks. |
+ AutoSchedulerLock auto_lock(idle_worker_threads_lock_); |
+ while (!idle_worker_threads_stack_.empty()) { |
+ WorkerThread* worker_thread = idle_worker_threads_stack_.top(); |
+ |
+ idle_worker_threads_stack_.pop(); |
+ idle_worker_threads_set_.erase(worker_thread); |
+ |
+ // HasSingleThreadedTasks() can return stale results. However, when it |
+ // returns true below, it is guaranteed that |worker_thread| is either awake |
+ // or about to be woken up and that it will not enter |
+ // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This |
+ // is important because if all threads in |idle_worker_threads_stack_| |
+ // report that they have single-threaded tasks, no thread is woken up by |
+ // this method. If these threads don't check |priority_queue_| before |
+ // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could |
+ // end up never being done. The guarantee works between the moment |
+ // HasSingleThreadedTasks() goes from true to false and the moment |
+ // |worker_thread| enters WaitUntilWorkIsAvailable(), |
+ // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool. |
+ // Both WorkerThreadBecomesIdleCallback() and the current method acquire |
+ // |idle_worker_threads_lock_|, which synchronizes the value returned by |
+ // HasSingleThreadedTasks(). |
+ // |
+ // TODO(fdoray): A single-threaded task can be posted to |worker_thread| |
+ // immediately after HasSingleThreadedTasks() has returned false. Ideally, |
+ // when this happens, another worker thread should be woken up. |
+ if (!worker_thread->HasSingleThreadedTasks()) { |
robliao
2016/02/11 22:30:07
Maybe a controller pattern might make reasoning ov
fdoray
2016/02/12 04:16:20
I agree. I'll try to write this (tomorrow in the p
|
+ worker_thread->WakeUp(); |
+ break; |
+ } |
+ } |
+} |
+ |
+void ThreadPool::OnSequenceInsertedInPriorityQueue() { |
+ if (disable_wake_up_thread_on_sequence_insertion_.Get()) |
+ return; |
+ |
+ WakeUpOneThread(); |
+} |
+ |
+} // namespace task_scheduler |
+} // namespace base |