Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(214)

Side by Side Diff: base/task_scheduler/thread_pool.cc

Issue 1685423002: Task Scheduler. (Closed) Base URL: https://luckyluke-private.googlesource.com/src@a_master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/thread_pool.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/task_scheduler/utils.h"
12
13 namespace base {
14 namespace task_scheduler {
15
16 namespace {
17
18 // A task runner that runs tasks with the PARALLEL strategy.
19 class SchedulerParallelTaskRunner : public TaskRunner {
20 public:
21 // Tasks posted through this task runner have |traits| and are inserted in
22 // |shared_priority_queue|. |delayed_task_manager| is used to post delayed
23 // tasks. |shutdown_manager| is notified when a task is posted.
24 SchedulerParallelTaskRunner(const TaskTraits& traits,
25 PriorityQueue* priority_queue,
26 DelayedTaskManager* delayed_task_manager,
27 ShutdownManager* shutdown_manager);
28
29 // TaskRunner:
30 bool PostDelayedTask(const tracked_objects::Location& from_here,
31 const Closure& closure,
32 TimeDelta delay) override;
33 bool RunsTasksOnCurrentThread() const override;
34
35 private:
36 ~SchedulerParallelTaskRunner() override;
37
38 TaskTraits traits_;
39 PriorityQueue* priority_queue_;
40 DelayedTaskManager* delayed_task_manager_;
41 ShutdownManager* shutdown_manager_;
42
43 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
44 };
45
46 SchedulerParallelTaskRunner::SchedulerParallelTaskRunner(
47 const TaskTraits& traits,
48 PriorityQueue* priority_queue,
49 DelayedTaskManager* delayed_task_manager,
50 ShutdownManager* shutdown_manager)
51 : traits_(traits),
52 priority_queue_(priority_queue),
53 delayed_task_manager_(delayed_task_manager),
54 shutdown_manager_(shutdown_manager) {}
55
56 bool SchedulerParallelTaskRunner::PostDelayedTask(
57 const tracked_objects::Location& from_here,
58 const Closure& closure,
59 TimeDelta delay) {
60 Task task(from_here, closure, traits_, TimeTicks::Now());
61 if (!delay.is_zero())
62 task.delayed_run_time = task.post_time + delay;
63 PostTaskHelper(task, make_scoped_refptr(new Sequence), priority_queue_,
64 shutdown_manager_, delayed_task_manager_);
65 return true;
66 }
67
68 bool SchedulerParallelTaskRunner::RunsTasksOnCurrentThread() const {
69 // TODO(fdoray): Return true only if tasks posted may actually run on the
70 // current thread. It is valid, but not ideal, to always return true.
71 return true;
72 }
73
74 SchedulerParallelTaskRunner::~SchedulerParallelTaskRunner() = default;
75
76 // A task runner that runs tasks in with the SEQUENCED strategy.
77 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
78 public:
79 // Tasks posted through this task runner have |traits| and are inserted in
80 // |sequence|. When appropriate, |sequence| is inserted in |priority_queue|.
81 // |delayed_task_manager| is used to post delayed tasks. |shutdown_manager| is
82 // notified when a task is posted.
83 SchedulerSequencedTaskRunner(const TaskTraits& traits,
84 scoped_refptr<Sequence> sequence,
85 PriorityQueue* priority_queue,
86 DelayedTaskManager* delayed_task_manager,
87 ShutdownManager* shutdown_manager);
88
89 // SequencedTaskRunner:
90 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
91 const Closure& task,
92 TimeDelta delay) override;
93 bool PostDelayedTask(const tracked_objects::Location& from_here,
94 const Closure& closure,
95 TimeDelta delay) override;
96 bool RunsTasksOnCurrentThread() const override;
97
98 private:
99 ~SchedulerSequencedTaskRunner() override;
100
101 TaskTraits traits_;
102 scoped_refptr<Sequence> sequence_;
103 PriorityQueue* priority_queue_;
104 DelayedTaskManager* delayed_task_manager_;
105 ShutdownManager* shutdown_manager_;
106
107 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
108 };
109
110 SchedulerSequencedTaskRunner::SchedulerSequencedTaskRunner(
111 const TaskTraits& traits,
112 scoped_refptr<Sequence> sequence,
113 PriorityQueue* priority_queue,
114 DelayedTaskManager* delayed_task_manager,
115 ShutdownManager* shutdown_manager)
116 : traits_(traits),
117 sequence_(sequence),
118 priority_queue_(priority_queue),
119 delayed_task_manager_(delayed_task_manager),
120 shutdown_manager_(shutdown_manager) {}
121
122 bool SchedulerSequencedTaskRunner::PostDelayedTask(
123 const tracked_objects::Location& from_here,
124 const Closure& closure,
125 TimeDelta delay) {
126 Task task(from_here, closure, traits_, TimeTicks::Now());
127 if (!delay.is_zero())
128 task.delayed_run_time = task.post_time + delay;
129 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
130 delayed_task_manager_);
131 return true;
132 }
133
134 bool SchedulerSequencedTaskRunner::RunsTasksOnCurrentThread() const {
135 // TODO(fdoray): Return true only if tasks posted may actually run on the
136 // current thread. It is valid, but not ideal, to always return true.
137 return true;
138 }
139
140 bool SchedulerSequencedTaskRunner::PostNonNestableDelayedTask(
141 const tracked_objects::Location& from_here,
142 const Closure& task,
143 TimeDelta delay) {
144 return PostDelayedTask(from_here, task, delay);
145 }
146
147 SchedulerSequencedTaskRunner::~SchedulerSequencedTaskRunner() = default;
148
149 } // namespace
150
151
152 ThreadPool::~ThreadPool() = default;
153
154 scoped_ptr<ThreadPool> ThreadPool::CreateThreadPool(
155 ThreadPriority thread_priority,
156 size_t num_threads,
157 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
158 ShutdownManager* shutdown_manager) {
159 scoped_ptr<ThreadPool> thread_pool(
160 new ThreadPool(thread_priority, num_threads, reinsert_sequence_callback,
161 shutdown_manager));
162 return (thread_pool->GetNumThreads() > 0) ? std::move(thread_pool)
163 : scoped_ptr<ThreadPool>();
164 }
165
166 size_t ThreadPool::GetNumThreads() const {
167 return worker_threads_.size();
168 }
169
170 scoped_refptr<TaskRunner> ThreadPool::CreateTaskRunnerWithTraits(
171 const TaskTraits& traits,
172 ExecutionMode execution_mode) {
173 switch (execution_mode) {
174 case ExecutionMode::PARALLEL: {
175 return scoped_refptr<TaskRunner>(new SchedulerParallelTaskRunner(
176 traits, &priority_queue_, &delayed_task_manager_, shutdown_manager_));
177 }
178
179 case ExecutionMode::SEQUENCED: {
180 // TODO(fdoray): Support TaskTraits().WithSequenceToken().
181 return scoped_refptr<TaskRunner>(new SchedulerSequencedTaskRunner(
182 traits, scoped_refptr<Sequence>(new Sequence), &priority_queue_,
183 &delayed_task_manager_, shutdown_manager_));
184 }
185
186 #if defined(OS_WIN)
187 case ExecutionMode::SINGLE_THREADED_COM_STA:
188 #endif
189 case ExecutionMode::SINGLE_THREADED: {
190 DCHECK(!worker_threads_.empty());
191 // TODO(fdoray): Better thread assignment.
192 return scoped_refptr<TaskRunner>(
193 worker_threads_.front()
194 ->CreateTaskRunnerWithTraits(traits, execution_mode)
195 .get());
196 }
197
198 default: {
199 NOTREACHED();
200 return scoped_refptr<TaskRunner>();
201 }
202 }
203 }
204
205 void ThreadPool::ReinsertSequence(scoped_refptr<Sequence> sequence,
206 const SequenceSortKey& sequence_sort_key,
207 const WorkerThread* worker_thread) {
208 DCHECK(!disable_wake_up_thread_on_sequence_insertion_.Get());
209
210 // Set a flag to avoid waking up a thread when reinserting |sequence| in
211 // |priority_queue_| if the thread doing the reinsertion:
212 // - Can run tasks from |priority_queue_|, and,
213 // - Doesn't have pending single-threaded tasks.
214 // If these conditions are met, the thread doing the reinsertion will soon
215 // pop a sequence from |priority_queue_|. There is no need to wake up a new
216 // thread to do it.
217 if (worker_thread->shared_priority_queue() == &priority_queue_ &&
218 !worker_thread->HasSingleThreadedTasks()) {
219 disable_wake_up_thread_on_sequence_insertion_.Set(true);
220 }
221
222 // Insert the sequence in the priority queue.
223 priority_queue_.BeginTransaction()->PushSequence(sequence, sequence_sort_key);
224
225 disable_wake_up_thread_on_sequence_insertion_.Set(false);
226 }
227
228 void ThreadPool::JoinAllThreadsForTesting() {
229 for (const auto& worker_thread : worker_threads_) {
230 worker_thread->WakeUp();
231 worker_thread->JoinForTesting();
232 }
233 }
234
235 ThreadPool::ThreadPool(
236 ThreadPriority thread_priority,
237 size_t num_threads,
238 const WorkerThread::ReinsertSequenceCallback& reinsert_sequence_callback,
239 ShutdownManager* shutdown_manager)
240 : priority_queue_(Bind(&ThreadPool::OnSequenceInsertedInPriorityQueue,
241 Unretained(this))),
242 shutdown_manager_(shutdown_manager),
243 delayed_task_manager_(
244 Bind(&ThreadPool::WakeUpOneThread, Unretained(this)),
245 shutdown_manager_) {
246 DCHECK_GT(num_threads, 0u);
247 DCHECK(shutdown_manager);
248
249 const WorkerThread::BecomesIdleCallback becomes_idle_callback =
250 Bind(&ThreadPool::WorkerThreadBecomesIdleCallback, Unretained(this));
251 worker_threads_.reserve(num_threads);
252
253 for (size_t i = 0; i < num_threads; ++i) {
254 scoped_ptr<WorkerThread> worker_thread = WorkerThread::CreateWorkerThread(
255 thread_priority, &priority_queue_, reinsert_sequence_callback,
256 becomes_idle_callback, &delayed_task_manager_, shutdown_manager_);
257 if (worker_thread.get() != nullptr)
258 worker_threads_.push_back(std::move(worker_thread));
259 }
260 }
261
262 void ThreadPool::WorkerThreadBecomesIdleCallback(WorkerThread* worker_thread) {
fdoray 2016/02/11 17:30:33 DCHECK that |worker_thread| belongs to this thread
fdoray 2016/02/12 04:16:20 Done.
263 AutoSchedulerLock auto_lock_(idle_worker_threads_lock_);
264
265 if (idle_worker_threads_set_.find(worker_thread) !=
266 idle_worker_threads_set_.end()) {
267 // The worker thread is already on the stack of idle threads.
268 return;
269 }
270
271 // Add the worker thread to the stack of idle threads.
272 idle_worker_threads_stack_.push(worker_thread);
273 idle_worker_threads_set_.insert(worker_thread);
274 }
275
276 void ThreadPool::WakeUpOneThread() {
277 // Wake up the first thread found on |idle_worker_threads_stack_| that doesn't
278 // have pending or running single-threaded tasks.
279 AutoSchedulerLock auto_lock(idle_worker_threads_lock_);
280 while (!idle_worker_threads_stack_.empty()) {
281 WorkerThread* worker_thread = idle_worker_threads_stack_.top();
282
283 idle_worker_threads_stack_.pop();
284 idle_worker_threads_set_.erase(worker_thread);
285
286 // HasSingleThreadedTasks() can return stale results. However, when it
287 // returns true below, it is guaranteed that |worker_thread| is either awake
288 // or about to be woken up and that it will not enter
289 // WaitUntilWorkIsAvailable() before |priority_queue_| becomes empty. This
290 // is important because if all threads in |idle_worker_threads_stack_|
291 // report that they have single-threaded tasks, no thread is woken up by
292 // this method. If these threads don't check |priority_queue_| before
293 // entering WaitUntilWorkIsAvailable(), the work in |priority_queue_| could
294 // end up never being done. The guarantee works between the moment
295 // HasSingleThreadedTasks() goes from true to false and the moment
296 // |worker_thread| enters WaitUntilWorkIsAvailable(),
297 // WorkerThreadBecomesIdleCallback() has to be called on this ThreadPool.
298 // Both WorkerThreadBecomesIdleCallback() and the current method acquire
299 // |idle_worker_threads_lock_|, which synchronizes the value returned by
300 // HasSingleThreadedTasks().
301 //
302 // TODO(fdoray): A single-threaded task can be posted to |worker_thread|
303 // immediately after HasSingleThreadedTasks() has returned false. Ideally,
304 // when this happens, another worker thread should be woken up.
305 if (!worker_thread->HasSingleThreadedTasks()) {
robliao 2016/02/11 22:30:07 Maybe a controller pattern might make reasoning ov
fdoray 2016/02/12 04:16:20 I agree. I'll try to write this (tomorrow in the p
306 worker_thread->WakeUp();
307 break;
308 }
309 }
310 }
311
312 void ThreadPool::OnSequenceInsertedInPriorityQueue() {
313 if (disable_wake_up_thread_on_sequence_insertion_.Get())
314 return;
315
316 WakeUpOneThread();
317 }
318
319 } // namespace task_scheduler
320 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698