Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(536)

Unified Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: Merge in https://codereview.chromium.org/2726073002/ Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..0d9d55dbfab7f40183ff5f95b154c0b395868c8c
--- /dev/null
+++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
@@ -0,0 +1,308 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
+
+#include <algorithm>
+#include <memory>
+#include <string>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/memory/ptr_util.h"
+#include "base/single_thread_task_runner.h"
+#include "base/strings/stringprintf.h"
+#include "base/synchronization/atomic_flag.h"
+#include "base/task_scheduler/delayed_task_manager.h"
+#include "base/task_scheduler/scheduler_worker.h"
+#include "base/task_scheduler/sequence.h"
+#include "base/task_scheduler/task.h"
+#include "base/task_scheduler/task_tracker.h"
+#include "base/task_scheduler/task_traits.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+
+namespace base {
+namespace internal {
+
+namespace {
+
+// Allows for checking the PlatformThread::CurrentRef() against a set
+// PlatformThreadRef atomically without using locks.
+class AtomicThreadRefChecker {
+ public:
+ AtomicThreadRefChecker() = default;
+ ~AtomicThreadRefChecker() = default;
+
+ void Set() {
+ thread_ref_ = PlatformThread::CurrentRef();
+ is_set_.Set();
+ }
+
+ bool IsCurrentThreadSameAsSetThread() {
+ return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
+ }
+
+ private:
+ AtomicFlag is_set_;
+ PlatformThreadRef thread_ref_;
+
+ DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
+};
+
+class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
+ public:
+ SchedulerWorkerDelegate(const std::string& thread_name)
+ : thread_name_(thread_name) {}
+
+ // SchedulerWorker::Delegate:
+ void OnMainEntry(SchedulerWorker* worker) override {
+ thread_ref_checker_.Set();
+ PlatformThread::SetName(thread_name_);
+ }
+
+ scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ bool has_work = has_work_;
+ has_work_ = false;
+ return has_work ? sequence_ : nullptr;
+ }
+
+ void DidRunTask() override {}
+
+ void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ // We've shut down, so no-op this work request. Any sequence cleanup will
+ // occur in the caller's context.
+ if (!sequence_)
+ return;
+
+ DCHECK_EQ(sequence, sequence_);
+ has_work_ = true;
+ }
+
+ TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
+
+ bool CanDetach(SchedulerWorker* worker) override { return false; }
+
+ void OnDetach() override { NOTREACHED(); }
+
+ bool RunsTasksOnCurrentThread() {
+ // We check the thread ref instead of the sequence for the benefit of COM
+ // callbacks which may execute without a sequence context.
+ return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
+ }
+
+ void OnMainExit() override {
+ // Move |sequence_| to |local_sequence| so that if we have the last
+ // reference to the sequence we don't destroy it (and its tasks) within
+ // |sequence_lock_|.
+ scoped_refptr<Sequence> local_sequence;
+ {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ // To reclaim skipped tasks on shutdown, we null out the sequence to allow
+ // the tasks to destroy themselves.
+ local_sequence = std::move(sequence_);
+ }
+ }
+
+ // SchedulerWorkerDelegate:
+
+ // Consumers should release their sequence reference as soon as possible to
+ // ensure timely cleanup for general shutdown.
+ scoped_refptr<Sequence> sequence() {
+ AutoSchedulerLock auto_lock(sequence_lock_);
+ return sequence_;
+ }
+
+ private:
+ const std::string thread_name_;
+
+ // Synchronizes access to |sequence_| and |has_work_|.
+ SchedulerLock sequence_lock_;
+ scoped_refptr<Sequence> sequence_ = new Sequence;
+ bool has_work_ = false;
+
+ AtomicThreadRefChecker thread_ref_checker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
+};
+
+} // namespace
+
+class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
+ : public SingleThreadTaskRunner {
+ public:
+ // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
+ // lifetime of a dedicated |worker| for |traits|.
+ SchedulerSingleThreadTaskRunner(
+ SchedulerSingleThreadTaskRunnerManager* const outer,
+ const TaskTraits& traits,
+ SchedulerWorker* worker)
+ : outer_(outer), traits_(traits), worker_(worker) {
+ DCHECK(outer_);
+ DCHECK(worker_);
+ }
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override {
+ auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
+ task->single_thread_task_runner_ref = this;
+
+ if (!outer_->task_tracker_->WillPostTask(task.get()))
+ return false;
+
+ if (task->delayed_run_time.is_null()) {
+ PostTaskNow(std::move(task));
+ } else {
+ outer_->delayed_task_manager_->AddDelayedTask(
+ std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow,
+ Unretained(this)));
+ }
+ return true;
+ }
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ return GetDelegate()->RunsTasksOnCurrentThread();
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override {
+ outer_->UnregisterSchedulerWorker(worker_);
+ }
+
+ void PostTaskNow(std::unique_ptr<Task> task) {
+ scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
+ // If |sequence| is null, then the thread is effectively gone (either
+ // shutdown or joined).
+ if (!sequence)
+ return;
+
+ const bool sequence_was_empty = sequence->PushTask(std::move(task));
+ if (sequence_was_empty) {
+ GetDelegate()->ReEnqueueSequence(std::move(sequence));
+ worker_->WakeUp();
+ }
+ }
+
+ SchedulerWorkerDelegate* GetDelegate() const {
+ return static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
+ }
+
+ SchedulerSingleThreadTaskRunnerManager* const outer_;
+ const TaskTraits traits_;
+ SchedulerWorker* const worker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
+SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
+ const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
+ const TaskScheduler::WorkerPoolIndexForTraitsCallback&
+ worker_pool_index_for_traits_callback,
+ TaskTracker* task_tracker,
+ DelayedTaskManager* delayed_task_manager)
+ : worker_pool_params_vector_(worker_pool_params_vector),
+ worker_pool_index_for_traits_callback_(
+ worker_pool_index_for_traits_callback),
+ task_tracker_(task_tracker),
+ delayed_task_manager_(delayed_task_manager) {
+ DCHECK_GT(worker_pool_params_vector_.size(), 0U);
+ DCHECK(worker_pool_index_for_traits_callback_);
+ DCHECK(task_tracker_);
+ DCHECK(delayed_task_manager_);
+}
+
+SchedulerSingleThreadTaskRunnerManager::
+ ~SchedulerSingleThreadTaskRunnerManager() {
+#if DCHECK_IS_ON()
+ size_t workers_unregistered_during_join =
+ subtle::NoBarrier_Load(&workers_unregistered_during_join_);
+ DCHECK_EQ(workers_unregistered_during_join, workers_.size())
+ << "There cannot be outstanding SingleThreadTaskRunners upon destruction"
+ "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler";
+#endif
+}
+
+scoped_refptr<SingleThreadTaskRunner>
+SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
+ const TaskTraits& traits) {
+ size_t index = worker_pool_index_for_traits_callback_.Run(traits);
+ DCHECK_LT(index, worker_pool_params_vector_.size());
+ return new SchedulerSingleThreadTaskRunner(
+ this, traits,
+ CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
+}
+
+void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
+ decltype(workers_) local_workers;
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ local_workers = std::move(workers_);
+ }
+
+ for (const auto& worker : local_workers)
+ worker->JoinForTesting();
+
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ DCHECK(workers_.empty())
+ << "New worker(s) unexpectedly registered during join.";
+ workers_ = std::move(local_workers);
+ }
+}
+
+SchedulerWorker*
+SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
+ const SchedulerWorkerPoolParams& params) {
+ AutoSchedulerLock auto_lock(workers_lock_);
+ int id = next_worker_id_++;
+ auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
+ "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
+ workers_.emplace_back(SchedulerWorker::Create(
+ params.priority_hint(), std::move(delegate), task_tracker_,
+ SchedulerWorker::InitialState::DETACHED));
+ return workers_.back().get();
+}
+
+void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
+ SchedulerWorker* worker) {
+ // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
+ // |workers_lock_|.
+ scoped_refptr<SchedulerWorker> worker_to_destroy;
+ {
+ AutoSchedulerLock auto_lock(workers_lock_);
+
+ // We might be joining, so record that a worker was unregistered for
+ // verification at destruction.
+ if (workers_.empty()) {
+#if DCHECK_IS_ON()
+ subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1);
+#endif
+ return;
+ }
+
+ auto worker_iter =
+ std::find_if(workers_.begin(), workers_.end(),
+ [worker](const scoped_refptr<SchedulerWorker>& candidate) {
+ return candidate.get() == worker;
+ });
+ DCHECK(worker_iter != workers_.end());
+ worker_to_destroy = std::move(*worker_iter);
+ workers_.erase(worker_iter);
+ }
+ worker_to_destroy->Cleanup();
+}
+
+} // namespace internal
+} // namespace base

Powered by Google App Engine
This is Rietveld 408576698