Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: base/task_scheduler/scheduler_single_thread_task_runner_manager.cc

Issue 2698843006: Introduce SchedulerSingleThreadTaskRunnerManager (Closed)
Patch Set: Merge in https://codereview.chromium.org/2726073002/ Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/task_scheduler/delayed_task_manager.h"
18 #include "base/task_scheduler/scheduler_worker.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/task.h"
21 #include "base/task_scheduler/task_tracker.h"
22 #include "base/task_scheduler/task_traits.h"
23 #include "base/threading/platform_thread.h"
24 #include "base/time/time.h"
25
26 namespace base {
27 namespace internal {
28
29 namespace {
30
31 // Allows for checking the PlatformThread::CurrentRef() against a set
32 // PlatformThreadRef atomically without using locks.
33 class AtomicThreadRefChecker {
34 public:
35 AtomicThreadRefChecker() = default;
36 ~AtomicThreadRefChecker() = default;
37
38 void Set() {
39 thread_ref_ = PlatformThread::CurrentRef();
40 is_set_.Set();
41 }
42
43 bool IsCurrentThreadSameAsSetThread() {
44 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
45 }
46
47 private:
48 AtomicFlag is_set_;
49 PlatformThreadRef thread_ref_;
50
51 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
52 };
53
54 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
55 public:
56 SchedulerWorkerDelegate(const std::string& thread_name)
57 : thread_name_(thread_name) {}
58
59 // SchedulerWorker::Delegate:
60 void OnMainEntry(SchedulerWorker* worker) override {
61 thread_ref_checker_.Set();
62 PlatformThread::SetName(thread_name_);
63 }
64
65 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
66 AutoSchedulerLock auto_lock(sequence_lock_);
67 bool has_work = has_work_;
68 has_work_ = false;
69 return has_work ? sequence_ : nullptr;
70 }
71
72 void DidRunTask() override {}
73
74 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
75 AutoSchedulerLock auto_lock(sequence_lock_);
76 // We've shut down, so no-op this work request. Any sequence cleanup will
77 // occur in the caller's context.
78 if (!sequence_)
79 return;
80
81 DCHECK_EQ(sequence, sequence_);
82 has_work_ = true;
83 }
84
85 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
86
87 bool CanDetach(SchedulerWorker* worker) override { return false; }
88
89 void OnDetach() override { NOTREACHED(); }
90
91 bool RunsTasksOnCurrentThread() {
92 // We check the thread ref instead of the sequence for the benefit of COM
93 // callbacks which may execute without a sequence context.
94 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
95 }
96
97 void OnMainExit() override {
98 // Move |sequence_| to |local_sequence| so that if we have the last
99 // reference to the sequence we don't destroy it (and its tasks) within
100 // |sequence_lock_|.
101 scoped_refptr<Sequence> local_sequence;
102 {
103 AutoSchedulerLock auto_lock(sequence_lock_);
104 // To reclaim skipped tasks on shutdown, we null out the sequence to allow
105 // the tasks to destroy themselves.
106 local_sequence = std::move(sequence_);
107 }
108 }
109
110 // SchedulerWorkerDelegate:
111
112 // Consumers should release their sequence reference as soon as possible to
113 // ensure timely cleanup for general shutdown.
114 scoped_refptr<Sequence> sequence() {
115 AutoSchedulerLock auto_lock(sequence_lock_);
116 return sequence_;
117 }
118
119 private:
120 const std::string thread_name_;
121
122 // Synchronizes access to |sequence_| and |has_work_|.
123 SchedulerLock sequence_lock_;
124 scoped_refptr<Sequence> sequence_ = new Sequence;
125 bool has_work_ = false;
126
127 AtomicThreadRefChecker thread_ref_checker_;
128
129 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
130 };
131
132 } // namespace
133
134 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
135 : public SingleThreadTaskRunner {
136 public:
137 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
138 // lifetime of a dedicated |worker| for |traits|.
139 SchedulerSingleThreadTaskRunner(
140 SchedulerSingleThreadTaskRunnerManager* const outer,
141 const TaskTraits& traits,
142 SchedulerWorker* worker)
143 : outer_(outer), traits_(traits), worker_(worker) {
144 DCHECK(outer_);
145 DCHECK(worker_);
146 }
147
148 // SingleThreadTaskRunner:
149 bool PostDelayedTask(const tracked_objects::Location& from_here,
150 const Closure& closure,
151 TimeDelta delay) override {
152 auto task = MakeUnique<Task>(from_here, closure, traits_, delay);
153 task->single_thread_task_runner_ref = this;
154
155 if (!outer_->task_tracker_->WillPostTask(task.get()))
156 return false;
157
158 if (task->delayed_run_time.is_null()) {
159 PostTaskNow(std::move(task));
160 } else {
161 outer_->delayed_task_manager_->AddDelayedTask(
162 std::move(task), Bind(&SchedulerSingleThreadTaskRunner::PostTaskNow,
163 Unretained(this)));
164 }
165 return true;
166 }
167
168 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
169 const Closure& closure,
170 base::TimeDelta delay) override {
171 // Tasks are never nested within the task scheduler.
172 return PostDelayedTask(from_here, closure, delay);
173 }
174
175 bool RunsTasksOnCurrentThread() const override {
176 return GetDelegate()->RunsTasksOnCurrentThread();
177 }
178
179 private:
180 ~SchedulerSingleThreadTaskRunner() override {
181 outer_->UnregisterSchedulerWorker(worker_);
182 }
183
184 void PostTaskNow(std::unique_ptr<Task> task) {
185 scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
186 // If |sequence| is null, then the thread is effectively gone (either
187 // shutdown or joined).
188 if (!sequence)
189 return;
190
191 const bool sequence_was_empty = sequence->PushTask(std::move(task));
192 if (sequence_was_empty) {
193 GetDelegate()->ReEnqueueSequence(std::move(sequence));
194 worker_->WakeUp();
195 }
196 }
197
198 SchedulerWorkerDelegate* GetDelegate() const {
199 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
200 }
201
202 SchedulerSingleThreadTaskRunnerManager* const outer_;
203 const TaskTraits traits_;
204 SchedulerWorker* const worker_;
205
206 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
207 };
208
209 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
210 const std::vector<SchedulerWorkerPoolParams>& worker_pool_params_vector,
211 const TaskScheduler::WorkerPoolIndexForTraitsCallback&
212 worker_pool_index_for_traits_callback,
213 TaskTracker* task_tracker,
214 DelayedTaskManager* delayed_task_manager)
215 : worker_pool_params_vector_(worker_pool_params_vector),
216 worker_pool_index_for_traits_callback_(
217 worker_pool_index_for_traits_callback),
218 task_tracker_(task_tracker),
219 delayed_task_manager_(delayed_task_manager) {
220 DCHECK_GT(worker_pool_params_vector_.size(), 0U);
221 DCHECK(worker_pool_index_for_traits_callback_);
222 DCHECK(task_tracker_);
223 DCHECK(delayed_task_manager_);
224 }
225
226 SchedulerSingleThreadTaskRunnerManager::
227 ~SchedulerSingleThreadTaskRunnerManager() {
228 #if DCHECK_IS_ON()
229 size_t workers_unregistered_during_join =
230 subtle::NoBarrier_Load(&workers_unregistered_during_join_);
231 DCHECK_EQ(workers_unregistered_during_join, workers_.size())
232 << "There cannot be outstanding SingleThreadTaskRunners upon destruction"
233 "of SchedulerSingleThreadTaskRunnerManager or the Task Scheduler";
234 #endif
235 }
236
237 scoped_refptr<SingleThreadTaskRunner>
238 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
239 const TaskTraits& traits) {
240 size_t index = worker_pool_index_for_traits_callback_.Run(traits);
241 DCHECK_LT(index, worker_pool_params_vector_.size());
242 return new SchedulerSingleThreadTaskRunner(
243 this, traits,
244 CreateAndRegisterSchedulerWorker(worker_pool_params_vector_[index]));
245 }
246
247 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
248 decltype(workers_) local_workers;
249 {
250 AutoSchedulerLock auto_lock(workers_lock_);
251 local_workers = std::move(workers_);
252 }
253
254 for (const auto& worker : local_workers)
255 worker->JoinForTesting();
256
257 {
258 AutoSchedulerLock auto_lock(workers_lock_);
259 DCHECK(workers_.empty())
260 << "New worker(s) unexpectedly registered during join.";
261 workers_ = std::move(local_workers);
262 }
263 }
264
265 SchedulerWorker*
266 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
267 const SchedulerWorkerPoolParams& params) {
268 AutoSchedulerLock auto_lock(workers_lock_);
269 int id = next_worker_id_++;
270 auto delegate = MakeUnique<SchedulerWorkerDelegate>(base::StringPrintf(
271 "TaskSchedulerSingleThreadWorker%d%s", id, params.name().c_str()));
272 workers_.emplace_back(SchedulerWorker::Create(
273 params.priority_hint(), std::move(delegate), task_tracker_,
274 SchedulerWorker::InitialState::DETACHED));
275 return workers_.back().get();
276 }
277
278 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
279 SchedulerWorker* worker) {
280 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
281 // |workers_lock_|.
282 scoped_refptr<SchedulerWorker> worker_to_destroy;
283 {
284 AutoSchedulerLock auto_lock(workers_lock_);
285
286 // We might be joining, so record that a worker was unregistered for
287 // verification at destruction.
288 if (workers_.empty()) {
289 #if DCHECK_IS_ON()
290 subtle::NoBarrier_AtomicIncrement(&workers_unregistered_during_join_, 1);
291 #endif
292 return;
293 }
294
295 auto worker_iter =
296 std::find_if(workers_.begin(), workers_.end(),
297 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
298 return candidate.get() == worker;
299 });
300 DCHECK(worker_iter != workers_.end());
301 worker_to_destroy = std::move(*worker_iter);
302 workers_.erase(worker_iter);
303 }
304 worker_to_destroy->Cleanup();
305 }
306
307 } // namespace internal
308 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698