Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(591)

Unified Diff: base/task_scheduler/thread_pool.cc

Issue 1685423002: Task Scheduler. (Closed) Base URL: https://luckyluke-private.googlesource.com/src@a_master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/thread_pool.cc
diff --git a/base/task_scheduler/thread_pool.cc b/base/task_scheduler/thread_pool.cc
new file mode 100644
index 0000000000000000000000000000000000000000..953613e122c2e5cef764a3ee1a05e63f0ee44eba
--- /dev/null
+++ b/base/task_scheduler/thread_pool.cc
@@ -0,0 +1,320 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/thread_pool.h"
+
+#include <utility>
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/task_scheduler/utils.h"
+
+namespace base {
+namespace task_scheduler {
+
+namespace {
+
+// A task runner that runs tasks with the PARALLEL strategy.
+class SchedulerParallelTaskRunner : public TaskRunner {
+ public:
+ // Tasks posted through this task runner have |traits| and are inserted in
+ // |shared_priority_queue|. |delayed_task_manager| is used to post delayed
+ // tasks. |shutdown_manager| is notified when a task is posted.
+ SchedulerParallelTaskRunner(const TaskTraits& traits,
+ PriorityQueue* priority_queue,
+ DelayedTaskManager* delayed_task_manager,
+ ShutdownManager* shutdown_manager);
+
+ // TaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override;
+ bool RunsTasksOnCurrentThread() const override;
+
+ private:
+ ~SchedulerParallelTaskRunner() override;
+
+ TaskTraits traits_;
+ PriorityQueue* priority_queue_;
+ DelayedTaskManager* delayed_task_manager_;
+ ShutdownManager* shutdown_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
+};
+
+SchedulerParallelTaskRunner::SchedulerParallelTaskRunner(
+ const TaskTraits& traits,
+ PriorityQueue* priority_queue,
+ DelayedTaskManager* delayed_task_manager,
+ ShutdownManager* shutdown_manager)
+ : traits_(traits),
+ priority_queue_(priority_queue),
+ delayed_task_manager_(delayed_task_manager),
+ shutdown_manager_(shutdown_manager) {}
+
+bool SchedulerParallelTaskRunner::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) {
+ Task task(from_here, closure, traits_, TimeTicks::Now());
+ if (!delay.is_zero())
+ task.delayed_run_time = task.post_time + delay;
+ PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_,
+ shutdown_manager_, delayed_task_manager_);
+ return true;
+}
+
+bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const {
+ // TODO(fdoray): Return true only if tasks posted may actually run on the
+ // current thread. It is valid, but not ideal, to always return true.
+ return true;
+}
+
+SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default;
+
+// A task runner that runs tasks in with the SEQUENCED strategy.
+class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
+ public:
+ // Tasks posted through this task runner have |traits| and are inserted in
+ // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|.
+ // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is
+ // notified when a task is posted.
+ SchedulerSequencedTaskRunner(const TaskTraits& traits,
+ scoped_refptr<Sequence> sequence,
+ PriorityQueue* priority_queue,
+ DelayedTaskManager* delayed_task_manager,
+ ShutdownManager* shutdown_manager);
+
+ // SequencedTaskRunner:
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) override;
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override;
+ bool RunsTasksOnCurrentThread() const override;
+
+ private:
+ ~SchedulerSequencedTaskRunner() override;
+
+ TaskTraits traits_;
+ scoped_refptr<Sequence> sequence_;
+ PriorityQueue* priority_queue_;
+ DelayedTaskManager* delayed_task_manager_;
+ ShutdownManager* shutdown_manager_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
+};
+
+SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
+ const TaskTraits& traits,
+ scoped_refptr<Sequence> sequence,
+ PriorityQueue* priority_queue,
+ DelayedTaskManager* delayed_task_manager,
+ ShutdownManager* shutdown_manager)
+ : traits_(traits),
+ sequence_(sequence),
+ priority_queue_(priority_queue),
+ delayed_task_manager_(delayed_task_manager),
+ shutdown_manager_(shutdown_manager) {}
+
+bool SchedulerSequencedTaskRunner::PostDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) {
+ Task task(from_here, closure, traits_, TimeTicks::Now());
+ if (!delay.is_zero())
+ task.delayed_run_time = task.post_time + delay;
+ PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
+ delayed_task_manager_);
+ return true;
+}
+
+bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const {
+ // TODO(fdoray): Return true only if tasks posted may actually run on the
+ // current thread. It is valid, but not ideal, to always return true.
+ return true;
+}
+
+bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask(
+ const tracked_objects::Location& from_here,
+ const Closure& task,
+ TimeDelta delay) {
+ return PostDelayedTask(from_here, task, delay);
+}
+
+SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
+
+} // namespace
+
+
+ThreadPool::~ThreadPool() = default;
+
+scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool(
+ ThreadPriority thread_priority,
+ size_t num_threads,
+ const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
+ ShutdownManager* shutdown_manager) {
+ scoped_ptr<ThreadPool> thread_pool(
+ new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback,
+ shutdown_manager));
+ return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool)
+ : scoped_ptr<ThreadPool>();
+}
+
+size_t ThreadPool::GetNumThreads() const {
+ return worker_threads_.size();
+}
+
+scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits(
+ const TaskTraits& traits,
+ ExecutionMode execution_mode) {
+ switch (execution_mode) {
+ case ExecutionMode::PARALLEL: {
+ return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner(
+ traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_));
+ }
+
+ case ExecutionMode::SEQUENCED: {
+ // TODO(fdoray): Support TaskTraits().WithSequenceToken().
+ return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner(
+ traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_,
+ &delayed_task_manager_, shutdown_manager_));
+ }
+
+#if defined(OS_WIN)
+ case ExecutionMode::SINGLE_THREADED_COM_STA:
+#endif
+ case ExecutionMode::SINGLE_THREADED: {
+ DCHECK(!worker_threads_.empty());
+ // TODO(fdoray): Better thread assignment.
+ return scoped_refptr<TaskRunner>(
+ worker_threads_.front()
+ ->CreateTaskRunnerWithTraits(traits, execution_mode)
+ .get());
+ }
+
+ default: {
+ NOTREACHED();
+ return scoped_refptr<TaskRunner>();
+ }
+ }
+}
+
+void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence,
+ const SequenceSortKey& sequence_sort_key,
+ const WorkerThread* worker_thread) {
+ DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get());
+
+ // Set a flag to avoid waking up a thread when reinserting |sequence| in
+ // |priority_queue_| if the thread doing the reinsertion:
+ // - Can run tasks from |priority_queue_|, and,
+ // - Doesn't have pending single-threaded tasks.
+ // If these conditions are met, the thread doing the reinsertion will soon
+ // pop a sequence from |priority_queue_|. There is no need to wake up a new
+ // thread to do it.
+ if (worker_thread->shared_priority_queue() == &priority_queue_ &&
+ !worker_thread->HasSingleThreadedTasks()) {
+ disable_wake_up_thread_on_sequence_insertion_.Set(true);
+ }
+
+ // Insert the sequence in the priority queue.
+ priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key);
+
+ disable_wake_up_thread_on_sequence_insertion_.Set(false);
+}
+
+void ThreadPool::JoinAllThreadsForTesting() {
+ for (const auto& worker_thread : worker_threads_) {
+ worker_thread->WakeUp();
+ worker_thread->JoinForTesting();
+ }
+}
+
+ThreadPool::ThreadPool(
+ ThreadPriority thread_priority,
+ size_t num_threads,
+ const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
+ ShutdownManager* shutdown_manager)
+ : priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue,
+ Unretained(this))),
+ shutdown_manager_(shutdown_manager),
+ delayed_task_manager_(
+ Bind(&ThreadPool::WakeUpOneThread, Unretained(this)),
+ shutdown_manager_) {
+ DCHECK_GT(num_threads, 0u);
+ DCHECK(shutdown_manager);
+
+ const WorkerThread::BecomesIdleCallback becomes_idle_callback =
+ Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this));
+ worker_threads_.reserve(num_threads);
+
+ for (size_t i = 0; i < num_threads; ++i) {
+ scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread(
+ thread_priority, &priority_queue_, reinsert_sequence_callback,
+ becomes_idle_callback, &delayed_task_manager_, shutdown_manager_);
+ if (worker_thread.get() != nullptr)
+ worker_threads_.push_back(std::move(worker_thread));
+ }
+}
+
+void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) {
fdoray 2016/02/11 17:30:33 DCHECK that |worker_thread| belongs to this thread
fdoray 2016/02/12 04:16:20 Done.
+ AutoSchedulerLock auto_lock_(idle_worker_threads_lock_);
+
+ if (idle_worker_threads_set_.find(worker_thread) !=
+ idle_worker_threads_set_.end()) {
+ // The worker thread is already on the stack of idle threads.
+ return;
+ }
+
+ // Add the worker thread to the stack of idle threads.
+ idle_worker_threads_stack_.push(worker_thread);
+ idle_worker_threads_set_.insert(worker_thread);
+}
+
+void ThreadPool::WakeUpOneThread() {
+ // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't
+ // have pending or running single-threaded tasks.
+ AutoSchedulerLock auto_lock(idle_worker_threads_lock_);
+ while (!idle_worker_threads_stack_.empty()) {
+ WorkerThread* worker_thread = idle_worker_threads_stack_.top();
+
+ idle_worker_threads_stack_.pop();
+ idle_worker_threads_set_.erase(worker_thread);
+
+ // HasSingleThreadedTasks() can return stale results. However, when it
+ // returns true below, it is guaranteed that |worker_thread| is either awake
+ // or about to be woken up and that it will not enter
+ // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This
+ // is important because if all threads in |idle_worker_threads_stack_|
+ // report that they have single-threaded tasks, no thread is woken up by
+ // this method. If these threads don't check |priority_queue_| before
+ // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could
+ // end up never being done. The guarantee works between the moment
+ // HasSingleThreadedTasks() goes from true to false and the moment
+ // |worker_thread| enters WaitUntilWorkIsAvailable(),
+ // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool.
+ // Both WorkerThreadBecomesIdleCallback() and the current method acquire
+ // |idle_worker_threads_lock_|, which synchronizes the value returned by
+ // HasSingleThreadedTasks().
+ //
+ // TODO(fdoray): A single-threaded task can be posted to |worker_thread|
+ // immediately after HasSingleThreadedTasks() has returned false. Ideally,
+ // when this happens, another worker thread should be woken up.
+ if (!worker_thread->HasSingleThreadedTasks()) {
robliao 2016/02/11 22:30:07 Maybe a controller pattern might make reasoning ov
fdoray 2016/02/12 04:16:20 I agree. I'll try to write this (tomorrow in the p
+ worker_thread->WakeUp();
+ break;
+ }
+ }
+}
+
+void ThreadPool::OnSequenceInsertedInPriorityQueue() {
+ if (disable_wake_up_thread_on_sequence_insertion_.Get())
+ return;
+
+ WakeUpOneThread();
+}
+
+} // namespace task_scheduler
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698