Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: cc/base/worker_pool.cc

Issue 17004002: cc: Move WorkerPool from cc/base to cc/resources. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cc/base/worker_pool.h"
6
7 #if defined(OS_ANDROID)
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h>
10 #endif
11
12 #include <map>
13
14 #include "base/bind.h"
15 #include "base/containers/hash_tables.h"
16 #include "base/debug/trace_event.h"
17 #include "base/strings/stringprintf.h"
18 #include "base/threading/simple_thread.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "cc/base/scoped_ptr_deque.h"
21
22 namespace cc {
23
24 namespace internal {
25
26 WorkerPoolTask::WorkerPoolTask()
27 : did_schedule_(false),
28 did_run_(false),
29 did_complete_(false) {
30 }
31
32 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
33 : did_schedule_(false),
34 did_run_(false),
35 did_complete_(false) {
36 dependencies_.swap(*dependencies);
37 }
38
39 WorkerPoolTask::~WorkerPoolTask() {
40 DCHECK_EQ(did_schedule_, did_complete_);
41 DCHECK(!did_run_ || did_schedule_);
42 DCHECK(!did_run_ || did_complete_);
43 }
44
45 void WorkerPoolTask::DidSchedule() {
46 DCHECK(!did_complete_);
47 did_schedule_ = true;
48 }
49
50 void WorkerPoolTask::WillRun() {
51 DCHECK(did_schedule_);
52 DCHECK(!did_complete_);
53 DCHECK(!did_run_);
54 }
55
56 void WorkerPoolTask::DidRun() {
57 did_run_ = true;
58 }
59
60 void WorkerPoolTask::DidComplete() {
61 DCHECK(did_schedule_);
62 DCHECK(!did_complete_);
63 did_complete_ = true;
64 }
65
66 bool WorkerPoolTask::IsReadyToRun() const {
67 // TODO(reveman): Use counter to improve performance.
68 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
69 it != dependencies_.rend(); ++it) {
70 WorkerPoolTask* dependency = it->get();
71 if (!dependency->HasFinishedRunning())
72 return false;
73 }
74 return true;
75 }
76
77 bool WorkerPoolTask::HasFinishedRunning() const {
78 return did_run_;
79 }
80
81 bool WorkerPoolTask::HasCompleted() const {
82 return did_complete_;
83 }
84
85 } // namespace internal
86
87 // Internal to the worker pool. Any data or logic that needs to be
88 // shared between threads lives in this class. All members are guarded
89 // by |lock_|.
90 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
91 public:
92 Inner(size_t num_threads, const std::string& thread_name_prefix);
93 virtual ~Inner();
94
95 void Shutdown();
96
97 // Schedule running of tasks in |graph|. Tasks previously scheduled but
98 // no longer needed will be canceled unless already running. Canceled
99 // tasks are moved to |completed_tasks_| without being run. The result
100 // is that once scheduled, a task is guaranteed to end up in the
101 // |completed_tasks_| queue even if they later get canceled by another
102 // call to SetTaskGraph().
103 void SetTaskGraph(TaskGraph* graph);
104
105 // Collect all completed tasks in |completed_tasks|.
106 void CollectCompletedTasks(TaskDeque* completed_tasks);
107
108 private:
109 // Overridden from base::DelegateSimpleThread:
110 virtual void Run() OVERRIDE;
111
112 // This lock protects all members of this class except
113 // |worker_pool_on_origin_thread_|. Do not read or modify anything
114 // without holding this lock. Do not block while holding this lock.
115 mutable base::Lock lock_;
116
117 // Condition variable that is waited on by worker threads until new
118 // tasks are ready to run or shutdown starts.
119 base::ConditionVariable has_ready_to_run_tasks_cv_;
120
121 // Provides each running thread loop with a unique index. First thread
122 // loop index is 0.
123 unsigned next_thread_index_;
124
125 // Set during shutdown. Tells workers to exit when no more tasks
126 // are pending.
127 bool shutdown_;
128
129 // This set contains all pending tasks.
130 GraphNodeMap pending_tasks_;
131
132 // Ordered set of tasks that are ready to run.
133 // TODO(reveman): priority_queue might be more efficient.
134 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
135 TaskMap ready_to_run_tasks_;
136
137 // This set contains all currently running tasks.
138 GraphNodeMap running_tasks_;
139
140 // Completed tasks not yet collected by origin thread.
141 TaskDeque completed_tasks_;
142
143 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
144
145 DISALLOW_COPY_AND_ASSIGN(Inner);
146 };
147
148 WorkerPool::Inner::Inner(
149 size_t num_threads, const std::string& thread_name_prefix)
150 : lock_(),
151 has_ready_to_run_tasks_cv_(&lock_),
152 next_thread_index_(0),
153 shutdown_(false) {
154 base::AutoLock lock(lock_);
155
156 while (workers_.size() < num_threads) {
157 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
158 new base::DelegateSimpleThread(
159 this,
160 thread_name_prefix +
161 base::StringPrintf(
162 "Worker%u",
163 static_cast<unsigned>(workers_.size() + 1)).c_str()));
164 worker->Start();
165 workers_.push_back(worker.Pass());
166 }
167 }
168
169 WorkerPool::Inner::~Inner() {
170 base::AutoLock lock(lock_);
171
172 DCHECK(shutdown_);
173
174 DCHECK_EQ(0u, pending_tasks_.size());
175 DCHECK_EQ(0u, ready_to_run_tasks_.size());
176 DCHECK_EQ(0u, running_tasks_.size());
177 DCHECK_EQ(0u, completed_tasks_.size());
178 }
179
180 void WorkerPool::Inner::Shutdown() {
181 {
182 base::AutoLock lock(lock_);
183
184 DCHECK(!shutdown_);
185 shutdown_ = true;
186
187 // Wake up a worker so it knows it should exit. This will cause all workers
188 // to exit as each will wake up another worker before exiting.
189 has_ready_to_run_tasks_cv_.Signal();
190 }
191
192 while (workers_.size()) {
193 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
194 // http://crbug.com/240453 - Join() is considered IO and will block this
195 // thread. See also http://crbug.com/239423 for further ideas.
196 base::ThreadRestrictions::ScopedAllowIO allow_io;
197 worker->Join();
198 }
199 }
200
201 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) {
202 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
203 DCHECK(graph->empty() || !shutdown_);
204
205 GraphNodeMap new_pending_tasks;
206 GraphNodeMap new_running_tasks;
207 TaskMap new_ready_to_run_tasks;
208
209 new_pending_tasks.swap(*graph);
210
211 {
212 base::AutoLock lock(lock_);
213
214 // First remove all completed tasks from |new_pending_tasks|.
215 for (TaskDeque::iterator it = completed_tasks_.begin();
216 it != completed_tasks_.end(); ++it) {
217 internal::WorkerPoolTask* task = it->get();
218 new_pending_tasks.take_and_erase(task);
219 }
220
221 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
222 for (GraphNodeMap::iterator it = pending_tasks_.begin();
223 it != pending_tasks_.end(); ++it) {
224 internal::WorkerPoolTask* task = it->first;
225
226 // Task has completed if not present in |new_pending_tasks|.
227 if (!new_pending_tasks.contains(task))
228 completed_tasks_.push_back(task);
229 }
230
231 // Build new running task set.
232 for (GraphNodeMap::iterator it = running_tasks_.begin();
233 it != running_tasks_.end(); ++it) {
234 internal::WorkerPoolTask* task = it->first;
235 // Transfer scheduled task value from |new_pending_tasks| to
236 // |new_running_tasks| if currently running. Value must be set to
237 // NULL if |new_pending_tasks| doesn't contain task. This does
238 // the right in both cases.
239 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
240 }
241
242 // Build new "ready to run" tasks queue.
243 for (GraphNodeMap::iterator it = new_pending_tasks.begin();
244 it != new_pending_tasks.end(); ++it) {
245 internal::WorkerPoolTask* task = it->first;
246
247 // Completed tasks should not exist in |new_pending_tasks|.
248 DCHECK(!task->HasFinishedRunning());
249
250 // Call DidSchedule() to indicate that this task has been scheduled.
251 // Note: This is only for debugging purposes.
252 task->DidSchedule();
253
254 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
255 if (task->IsReadyToRun())
256 new_ready_to_run_tasks[it->second->priority()] = task;
257 }
258
259 // Swap task sets.
260 // Note: old tasks are intentionally destroyed after releasing |lock_|.
261 pending_tasks_.swap(new_pending_tasks);
262 running_tasks_.swap(new_running_tasks);
263 ready_to_run_tasks_.swap(new_ready_to_run_tasks);
264
265 // If there is more work available, wake up worker thread.
266 if (!ready_to_run_tasks_.empty())
267 has_ready_to_run_tasks_cv_.Signal();
268 }
269 }
270
271 void WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
272 base::AutoLock lock(lock_);
273
274 DCHECK_EQ(0u, completed_tasks->size());
275 completed_tasks->swap(completed_tasks_);
276 }
277
278 void WorkerPool::Inner::Run() {
279 #if defined(OS_ANDROID)
280 base::PlatformThread::SetThreadPriority(
281 base::PlatformThread::CurrentHandle(),
282 base::kThreadPriority_Background);
283 #endif
284
285 base::AutoLock lock(lock_);
286
287 // Get a unique thread index.
288 int thread_index = next_thread_index_++;
289
290 while (true) {
291 if (ready_to_run_tasks_.empty()) {
292 // Exit when shutdown is set and no more tasks are pending.
293 if (shutdown_ && pending_tasks_.empty())
294 break;
295
296 // Wait for more tasks.
297 has_ready_to_run_tasks_cv_.Wait();
298 continue;
299 }
300
301 // Take top priority task from |ready_to_run_tasks_|.
302 scoped_refptr<internal::WorkerPoolTask> task(
303 ready_to_run_tasks_.begin()->second);
304 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
305
306 // Move task from |pending_tasks_| to |running_tasks_|.
307 DCHECK(pending_tasks_.contains(task.get()));
308 DCHECK(!running_tasks_.contains(task.get()));
309 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get()));
310
311 // There may be more work available, so wake up another worker thread.
312 has_ready_to_run_tasks_cv_.Signal();
313
314 // Call WillRun() before releasing |lock_| and running task.
315 task->WillRun();
316
317 {
318 base::AutoUnlock unlock(lock_);
319
320 task->RunOnThread(thread_index);
321 }
322
323 // This will mark task as finished running.
324 task->DidRun();
325
326 // Now iterate over all dependents to check if they are ready to run.
327 scoped_ptr<GraphNode> node = running_tasks_.take_and_erase(task.get());
328 if (node) {
329 typedef internal::WorkerPoolTask::TaskVector TaskVector;
330 for (TaskVector::const_iterator it = node->dependents().begin();
331 it != node->dependents().end(); ++it) {
332 GraphNodeMap::iterator dependent_it = pending_tasks_.find(it->get());
333 DCHECK(dependent_it != pending_tasks_.end());
334
335 internal::WorkerPoolTask* dependent = dependent_it->first;
336 if (!dependent->IsReadyToRun())
337 continue;
338
339 // Task is ready. Add it to |ready_to_run_tasks_|.
340 GraphNode* dependent_node = dependent_it->second;
341 unsigned priority = dependent_node->priority();
342 DCHECK(!ready_to_run_tasks_.count(priority) ||
343 ready_to_run_tasks_[priority] == dependent);
344 ready_to_run_tasks_[priority] = dependent;
345 }
346 }
347
348 // Finally add task to |completed_tasks_|.
349 completed_tasks_.push_back(task);
350 }
351
352 // We noticed we should exit. Wake up the next worker so it knows it should
353 // exit as well (because the Shutdown() code only signals once).
354 has_ready_to_run_tasks_cv_.Signal();
355 }
356
357 WorkerPool::GraphNode::GraphNode(
358 internal::WorkerPoolTask* dependent, unsigned priority)
359 : priority_(priority) {
360 if (dependent)
361 dependents_.push_back(dependent);
362 }
363
364 WorkerPool::GraphNode::~GraphNode() {
365 }
366
367 void WorkerPool::GraphNode::AddDependent(internal::WorkerPoolTask* dependent) {
368 DCHECK(dependent);
369 dependents_.push_back(dependent);
370 }
371
372 WorkerPool::WorkerPool(size_t num_threads,
373 const std::string& thread_name_prefix)
374 : in_dispatch_completion_callbacks_(false),
375 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
376 }
377
378 WorkerPool::~WorkerPool() {
379 }
380
381 void WorkerPool::Shutdown() {
382 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
383
384 DCHECK(!in_dispatch_completion_callbacks_);
385
386 inner_->Shutdown();
387 }
388
389 void WorkerPool::CheckForCompletedTasks() {
390 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
391
392 DCHECK(!in_dispatch_completion_callbacks_);
393
394 TaskDeque completed_tasks;
395 inner_->CollectCompletedTasks(&completed_tasks);
396 DispatchCompletionCallbacks(&completed_tasks);
397 }
398
399 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
400 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
401
402 // Worker pool instance is not reentrant while processing completed tasks.
403 in_dispatch_completion_callbacks_ = true;
404
405 while (!completed_tasks->empty()) {
406 internal::WorkerPoolTask* task = completed_tasks->front().get();
407
408 task->DidComplete();
409 task->DispatchCompletionCallback();
410
411 completed_tasks->pop_front();
412 }
413
414 in_dispatch_completion_callbacks_ = false;
415 }
416
417 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
418 TRACE_EVENT0("cc", "WorkerPool::SetTaskGraph");
419
420 DCHECK(!in_dispatch_completion_callbacks_);
421
422 inner_->SetTaskGraph(graph);
423 }
424
425 // static
426 unsigned WorkerPool::BuildTaskGraphRecursive(
427 internal::WorkerPoolTask* task,
428 internal::WorkerPoolTask* dependent,
429 unsigned priority,
430 TaskGraph* tasks) {
431 // Skip sub-tree if task has already completed.
432 if (task->HasCompleted())
433 return priority;
434
435 GraphNodeMap::iterator it = tasks->find(task);
436 if (it != tasks->end()) {
437 it->second->AddDependent(dependent);
438 return priority;
439 }
440
441 typedef internal::WorkerPoolTask::TaskVector TaskVector;
442 for (TaskVector::iterator dependency_it = task->dependencies().begin();
443 dependency_it != task->dependencies().end(); ++dependency_it) {
444 internal::WorkerPoolTask* dependency = dependency_it->get();
445 priority = BuildTaskGraphRecursive(dependency, task, priority, tasks);
446 }
447
448 tasks->set(task, make_scoped_ptr(new GraphNode(dependent, priority)));
449
450 return priority + 1;
451 }
452
453 // static
454 void WorkerPool::BuildTaskGraph(
455 internal::WorkerPoolTask* root, TaskGraph* tasks) {
456 const unsigned kBasePriority = 0u;
457 if (root)
458 BuildTaskGraphRecursive(root, NULL, kBasePriority, tasks);
459 }
460
461 } // namespace cc
OLDNEW
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698