| Index: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
|
| diff --git a/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0d9d55dbfab7f40183ff5f95b154c0b395868c8c
|
| --- /dev/null
|
| +++ b/base/task_scheduler/scheduler_single_thread_task_runner_manager.cc
|
| @@ -0,0 +1,308 @@
|
| +// Copyright 2017 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
|
| +
|
| +#include <algorithm>
|
| +#include <memory>
|
| +#include <string>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/callback.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/single_thread_task_runner.h"
|
| +#include "base/strings/stringprintf.h"
|
| +#include "base/synchronization/atomic_flag.h"
|
| +#include "base/task_scheduler/delayed_task_manager.h"
|
| +#include "base/task_scheduler/scheduler_worker.h"
|
| +#include "base/task_scheduler/sequence.h"
|
| +#include "base/task_scheduler/task.h"
|
| +#include "base/task_scheduler/task_tracker.h"
|
| +#include "base/task_scheduler/task_traits.h"
|
| +#include "base/threading/platform_thread.h"
|
| +#include "base/time/time.h"
|
| +
|
| +namespace base {
|
| +namespace internal {
|
| +
|
| +namespace {
|
| +
|
| +// Allows for checking the PlatformThread::CurrentRef() against a set
|
| +// PlatformThreadRef atomically without using locks.
|
| +class AtomicThreadRefChecker {
|
| + public:
|
| + AtomicThreadRefChecker() = default;
|
| + ~AtomicThreadRefChecker() = default;
|
| +
|
| + void Set() {
|
| + thread_ref_ = PlatformThread::CurrentRef();
|
| + is_set_.Set();
|
| + }
|
| +
|
| + bool IsCurrentThreadSameAsSetThread() {
|
| + return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
|
| + }
|
| +
|
| + private:
|
| + AtomicFlag is_set_;
|
| + PlatformThreadRef thread_ref_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
|
| +};
|
| +
|
| +class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
|
| + public:
|
| + SchedulerWorkerDelegate(const std::string& thread_name)
|
| + : thread_name_(thread_name) {}
|
| +
|
| + // SchedulerWorker::Delegate:
|
| + void OnMainEntry(SchedulerWorker* worker) override {
|
| + thread_ref_checker_.Set();
|
| + PlatformThread::SetName(thread_name_);
|
| + }
|
| +
|
| + scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
|
| + AutoSchedulerLock auto_lock(sequence_lock_);
|
| + bool has_work = has_work_;
|
| + has_work_ = false;
|
| + return has_work ? sequence_ : nullptr;
|
| + }
|
| +
|
| + void DidRunTask() override {}
|
| +
|
| + void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
|
| + AutoSchedulerLock auto_lock(sequence_lock_);
|
| + // We've shut down, so no-op this work request. Any sequence cleanup will
|
| + // occur in the caller's context.
|
| + if (!sequence_)
|
| + return;
|
| +
|
| + DCHECK_EQ(sequence, sequence_);
|
| + has_work_ = true;
|
| + }
|
| +
|
| + TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
|
| +
|
| + bool CanDetach(SchedulerWorker* worker) override { return false; }
|
| +
|
| + void OnDetach() override { NOTREACHED(); }
|
| +
|
| + bool RunsTasksOnCurrentThread() {
|
| + // We check the thread ref instead of the sequence for the benefit of COM
|
| + // callbacks which may execute without a sequence context.
|
| + return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
|
| + }
|
| +
|
| + void OnMainExit() override {
|
| + // Move |sequence_| to |local_sequence| so that if we have the last
|
| + // reference to the sequence we don't destroy it (and its tasks) within
|
| + // |sequence_lock_|.
|
| + scoped_refptr<Sequence> local_sequence;
|
| + {
|
| + AutoSchedulerLock auto_lock(sequence_lock_);
|
| + // To reclaim skipped tasks on shutdown, we null out the sequence to allow
|
| + // the tasks to destroy themselves.
|
| + local_sequence = std::move(sequence_);
|
| + }
|
| + }
|
| +
|
| + // SchedulerWorkerDelegate:
|
| +
|
| + // Consumers should release their sequence reference as soon as possible to
|
| + // ensure timely cleanup for general shutdown.
|
| + scoped_refptr<Sequence> sequence() {
|
| + AutoSchedulerLock auto_lock(sequence_lock_);
|
| + return sequence_;
|
| + }
|
| +
|
| + private:
|
| + const std::string thread_name_;
|
| +
|
| + // Synchronizes access to |sequence_| and |has_work_|.
|
| + SchedulerLock sequence_lock_;
|
| + scoped_refptr<Sequence> sequence_ = new Sequence;
|
| + bool has_work_ = false;
|
| +
|
| + AtomicThreadRefChecker thread_ref_checker_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
|
| + : public SingleThreadTaskRunner {
|
| + public:
|
| + // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
|
| + // lifetime of a dedicated |worker| for |traits|.
|
| + SchedulerSingleThreadTaskRunner(
|
| + SchedulerSingleThreadTaskRunnerManager* const outer,
|
| + const TaskTraits& traits,
|
| + SchedulerWorker* worker)
|
| + : outer_(outer), traits_(traits), worker_(worker) {
|
| + DCHECK(outer_);
|
| + DCHECK(worker_);
|
| + }
|
| +
|
| + // SingleThreadTaskRunner:
|
| + bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + TimeDelta delay) override {
|
| + auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
|
| + task->single_thread_task_runner_ref = this;
|
| +
|
| + if (!outer_->task_tracker_->WillPostTask(task.get()))
|
| + return false;
|
| +
|
| + if (task->delayed_run_time.is_null()) {
|
| + PostTaskNow(std::move(task));
|
| + } else {
|
| + outer_->delayed_task_manager_->AddDelayedTask(
|
| + std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow,
|
| + Unretained(this)));
|
| + }
|
| + return true;
|
| + }
|
| +
|
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
|
| + const Closure& closure,
|
| + base::TimeDelta delay) override {
|
| + // Tasks are never nested within the task scheduler.
|
| + return PostDelayedTask(from_here, closure, delay);
|
| + }
|
| +
|
| + bool RunsTasksOnCurrentThread() const override {
|
| + return GetDelegate()->RunsTasksOnCurrentThread();
|
| + }
|
| +
|
| + private:
|
| + ~SchedulerSingleThreadTaskRunner() override {
|
| + outer_->UnregisterSchedulerWorker(worker_);
|
| + }
|
| +
|
| + void PostTaskNow(std::unique_ptr<Task> task) {
|
| + scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
|
| + // If |sequence| is null, then the thread is effectively gone (either
|
| + // shutdown or joined).
|
| + if (!sequence)
|
| + return;
|
| +
|
| + const bool sequence_was_empty = sequence->PushTask(std::move(task));
|
| + if (sequence_was_empty) {
|
| + GetDelegate()->ReEnqueueSequence(std::move(sequence));
|
| + worker_->WakeUp();
|
| + }
|
| + }
|
| +
|
| + SchedulerWorkerDelegate* GetDelegate() const {
|
| + return static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
|
| + }
|
| +
|
| + SchedulerSingleThreadTaskRunnerManager* const outer_;
|
| + const TaskTraits traits_;
|
| + SchedulerWorker* const worker_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
|
| +};
|
| +
|
| +SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
|
| + const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
|
| + const TaskScheduler::WorkerPoolIndexForTraitsCallback&
|
| + worker_pool_index_for_traits_callback,
|
| + TaskTracker* task_tracker,
|
| + DelayedTaskManager* delayed_task_manager)
|
| + : worker_pool_params_vector_(worker_pool_params_vector),
|
| + worker_pool_index_for_traits_callback_(
|
| + worker_pool_index_for_traits_callback),
|
| + task_tracker_(task_tracker),
|
| + delayed_task_manager_(delayed_task_manager) {
|
| + DCHECK_GT(worker_pool_params_vector_.size(), 0U);
|
| + DCHECK(worker_pool_index_for_traits_callback_);
|
| + DCHECK(task_tracker_);
|
| + DCHECK(delayed_task_manager_);
|
| +}
|
| +
|
| +SchedulerSingleThreadTaskRunnerManager::
|
| + ~SchedulerSingleThreadTaskRunnerManager() {
|
| +#if DCHECK_IS_ON()
|
| + size_t workers_unregistered_during_join =
|
| + subtle::NoBarrier_Load(&workers_unregistered_during_join_);
|
| + DCHECK_EQ(workers_unregistered_during_join, workers_.size())
|
| + << "There cannot be outstanding SingleThreadTaskRunners upon destruction"
|
| + "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler";
|
| +#endif
|
| +}
|
| +
|
| +scoped_refptr<SingleThreadTaskRunner>
|
| +SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
|
| + const TaskTraits& traits) {
|
| + size_t index = worker_pool_index_for_traits_callback_.Run(traits);
|
| + DCHECK_LT(index, worker_pool_params_vector_.size());
|
| + return new SchedulerSingleThreadTaskRunner(
|
| + this, traits,
|
| + CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
|
| +}
|
| +
|
| +void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
|
| + decltype(workers_) local_workers;
|
| + {
|
| + AutoSchedulerLock auto_lock(workers_lock_);
|
| + local_workers = std::move(workers_);
|
| + }
|
| +
|
| + for (const auto& worker : local_workers)
|
| + worker->JoinForTesting();
|
| +
|
| + {
|
| + AutoSchedulerLock auto_lock(workers_lock_);
|
| + DCHECK(workers_.empty())
|
| + << "New worker(s) unexpectedly registered during join.";
|
| + workers_ = std::move(local_workers);
|
| + }
|
| +}
|
| +
|
| +SchedulerWorker*
|
| +SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
|
| + const SchedulerWorkerPoolParams& params) {
|
| + AutoSchedulerLock auto_lock(workers_lock_);
|
| + int id = next_worker_id_++;
|
| + auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
|
| + "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
|
| + workers_.emplace_back(SchedulerWorker::Create(
|
| + params.priority_hint(), std::move(delegate), task_tracker_,
|
| + SchedulerWorker::InitialState::DETACHED));
|
| + return workers_.back().get();
|
| +}
|
| +
|
| +void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
|
| + SchedulerWorker* worker) {
|
| + // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
|
| + // |workers_lock_|.
|
| + scoped_refptr<SchedulerWorker> worker_to_destroy;
|
| + {
|
| + AutoSchedulerLock auto_lock(workers_lock_);
|
| +
|
| + // We might be joining, so record that a worker was unregistered for
|
| + // verification at destruction.
|
| + if (workers_.empty()) {
|
| +#if DCHECK_IS_ON()
|
| + subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1);
|
| +#endif
|
| + return;
|
| + }
|
| +
|
| + auto worker_iter =
|
| + std::find_if(workers_.begin(), workers_.end(),
|
| + [worker](const scoped_refptr<SchedulerWorker>& candidate) {
|
| + return candidate.get() == worker;
|
| + });
|
| + DCHECK(worker_iter != workers_.end());
|
| + worker_to_destroy = std::move(*worker_iter);
|
| + workers_.erase(worker_iter);
|
| + }
|
| + worker_to_destroy->Cleanup();
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace base
|
|
|