Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Check and prevent worker pool reentrancy during dispatch of completion callbacks Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/base/worker_pool.h" 5 #include "cc/base/worker_pool.h"
6 6
7 #include <algorithm> 7 #if defined(OS_ANDROID)
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h>
10 #endif
11
12 #include <map>
8 13
9 #include "base/bind.h" 14 #include "base/bind.h"
10 #include "base/debug/trace_event.h" 15 #include "base/debug/trace_event.h"
16 #include "base/hash_tables.h"
11 #include "base/stringprintf.h" 17 #include "base/stringprintf.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/threading/simple_thread.h" 18 #include "base/threading/simple_thread.h"
14 #include "base/threading/thread_restrictions.h" 19 #include "base/threading/thread_restrictions.h"
20 #include "cc/base/scoped_ptr_deque.h"
21 #include "cc/base/scoped_ptr_hash_map.h"
22
23 #if defined(COMPILER_GCC)
24 namespace BASE_HASH_NAMESPACE {
25 template <> struct hash<cc::internal::WorkerPoolTask*> {
26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
27 return hash<size_t>()(reinterpret_cast<size_t>(ptr));
28 }
29 };
30 } // namespace BASE_HASH_NAMESPACE
31 #endif // COMPILER
15 32
16 namespace cc { 33 namespace cc {
17 34
18 namespace {
19
20 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
21 public:
22 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
23 const base::Closure& reply)
24 : internal::WorkerPoolTask(reply),
25 task_(task) {}
26
27 virtual void RunOnThread(unsigned thread_index) OVERRIDE {
28 task_.Run();
29 }
30
31 private:
32 WorkerPool::Callback task_;
33 };
34
35 } // namespace
36
37 namespace internal { 35 namespace internal {
38 36
39 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { 37 WorkerPoolTask::WorkerPoolTask()
38 : did_schedule_(false),
39 did_run_(false),
40 did_complete_(false) {
41 }
42
43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
44 : did_schedule_(false),
45 did_run_(false),
46 did_complete_(false) {
47 dependencies_.swap(*dependencies);
40 } 48 }
41 49
42 WorkerPoolTask::~WorkerPoolTask() { 50 WorkerPoolTask::~WorkerPoolTask() {
51 DCHECK_EQ(did_schedule_, did_complete_);
52 DCHECK(!did_run_ || did_schedule_);
53 DCHECK(!did_run_ || did_complete_);
54 }
55
56 void WorkerPoolTask::DidSchedule() {
57 DCHECK(!did_complete_);
58 did_schedule_ = true;
59 }
60
61 void WorkerPoolTask::WillRun() {
62 DCHECK(did_schedule_);
63 DCHECK(!did_complete_);
64 DCHECK(!did_run_);
65 }
66
67 void WorkerPoolTask::DidRun() {
68 did_run_ = true;
43 } 69 }
44 70
45 void WorkerPoolTask::DidComplete() { 71 void WorkerPoolTask::DidComplete() {
46 reply_.Run(); 72 DCHECK(did_schedule_);
73 DCHECK(!did_complete_);
74 did_complete_ = true;
75 }
76
77 bool WorkerPoolTask::IsReadyToRun() const {
78 // TODO(reveman): Use counter to improve performance.
79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
80 it != dependencies_.rend(); ++it) {
81 WorkerPoolTask* dependency = *it;
82 if (!dependency->HasFinishedRunning())
83 return false;
84 }
85 return true;
86 }
87
88 bool WorkerPoolTask::HasFinishedRunning() const {
89 return did_run_;
90 }
91
92 bool WorkerPoolTask::HasCompleted() const {
93 return did_complete_;
47 } 94 }
48 95
49 } // namespace internal 96 } // namespace internal
50 97
51 // Internal to the worker pool. Any data or logic that needs to be 98 // Internal to the worker pool. Any data or logic that needs to be
52 // shared between threads lives in this class. All members are guarded 99 // shared between threads lives in this class. All members are guarded
53 // by |lock_|. 100 // by |lock_|.
54 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
55 public: 102 public:
56 Inner(WorkerPool* worker_pool, 103 Inner(WorkerPool* worker_pool,
57 size_t num_threads, 104 size_t num_threads,
58 const std::string& thread_name_prefix); 105 const std::string& thread_name_prefix);
59 virtual ~Inner(); 106 virtual ~Inner();
60 107
61 void Shutdown(); 108 void Shutdown();
62 109
63 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); 110 // Schedule running of |root| task and all its dependencies. Tasks
111 // previously scheduled but no longer needed to run |root| will be
112 // canceled unless already running. Canceled tasks are moved to
113 // |completed_tasks_| without being run. The result is that once
114 // scheduled, a task is guaranteed to end up in the |completed_tasks_|
115 // queue even if they later get canceled by another call to
116 // ScheduleTasks().
117 void ScheduleTasks(internal::WorkerPoolTask* root);
64 118
65 // Appends all completed tasks to worker pool's completed tasks queue 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle.
66 // and returns true if idle. 120 bool CollectCompletedTasks(TaskDeque* completed_tasks);
67 bool CollectCompletedTasks();
68 121
69 private: 122 private:
70 // Appends all completed tasks to |completed_tasks|. Lock must 123 class ScheduledTask {
71 // already be acquired before calling this function. 124 public:
72 bool AppendCompletedTasksWithLockAcquired( 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
73 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); 126 : priority_(priority) {
127 if (dependent)
128 dependents_.push_back(dependent);
129 }
130
131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
132 unsigned priority() const { return priority_; }
133
134 private:
135 internal::WorkerPoolTask::TaskVector dependents_;
136 unsigned priority_;
137 };
138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
140 ScheduledTaskMap;
141
142 // This builds a ScheduledTaskMap from a root task.
143 static unsigned BuildScheduledTaskMapRecursive(
144 internal::WorkerPoolTask* task,
145 internal::WorkerPoolTask* dependent,
146 unsigned priority,
147 ScheduledTaskMap* scheduled_tasks);
148 static void BuildScheduledTaskMap(
149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
150
151 // Collect all completed tasks by swapping the contents of
152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired
153 // before calling this function. Returns true if idle.
154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
74 155
75 // Schedule an OnIdleOnOriginThread callback if not already pending. 156 // Schedule an OnIdleOnOriginThread callback if not already pending.
76 // Lock must already be acquired before calling this function. 157 // Lock must already be acquired before calling this function.
77 void ScheduleOnIdleWithLockAcquired(); 158 void ScheduleOnIdleWithLockAcquired();
78 void OnIdleOnOriginThread(); 159 void OnIdleOnOriginThread();
79 160
80 // Overridden from base::DelegateSimpleThread: 161 // Overridden from base::DelegateSimpleThread:
81 virtual void Run() OVERRIDE; 162 virtual void Run() OVERRIDE;
82 163
83 // Pointer to worker pool. Can only be used on origin thread. 164 // Pointer to worker pool. Can only be used on origin thread.
84 // Not guarded by |lock_|. 165 // Not guarded by |lock_|.
85 WorkerPool* worker_pool_on_origin_thread_; 166 WorkerPool* worker_pool_on_origin_thread_;
86 167
87 // This lock protects all members of this class except 168 // This lock protects all members of this class except
88 // |worker_pool_on_origin_thread_|. Do not read or modify anything 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything
89 // without holding this lock. Do not block while holding this lock. 170 // without holding this lock. Do not block while holding this lock.
90 mutable base::Lock lock_; 171 mutable base::Lock lock_;
91 172
92 // Condition variable that is waited on by worker threads until new 173 // Condition variable that is waited on by worker threads until new
93 // tasks are posted or shutdown starts. 174 // tasks are ready to run or shutdown starts.
94 base::ConditionVariable has_pending_tasks_cv_; 175 base::ConditionVariable has_ready_to_run_tasks_cv_;
95 176
96 // Target message loop used for posting callbacks. 177 // Target message loop used for posting callbacks.
97 scoped_refptr<base::MessageLoopProxy> origin_loop_; 178 scoped_refptr<base::MessageLoopProxy> origin_loop_;
98 179
99 base::WeakPtrFactory<Inner> weak_ptr_factory_; 180 base::WeakPtrFactory<Inner> weak_ptr_factory_;
100 181
101 const base::Closure on_idle_callback_; 182 const base::Closure on_idle_callback_;
102 // Set when a OnIdleOnOriginThread() callback is pending. 183 // Set when a OnIdleOnOriginThread() callback is pending.
103 bool on_idle_pending_; 184 bool on_idle_pending_;
104 185
105 // Provides each running thread loop with a unique index. First thread 186 // Provides each running thread loop with a unique index. First thread
106 // loop index is 0. 187 // loop index is 0.
107 unsigned next_thread_index_; 188 unsigned next_thread_index_;
108 189
109 // Number of tasks currently running.
110 unsigned running_task_count_;
111
112 // Set during shutdown. Tells workers to exit when no more tasks 190 // Set during shutdown. Tells workers to exit when no more tasks
113 // are pending. 191 // are pending.
114 bool shutdown_; 192 bool shutdown_;
115 193
116 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; 194 // The root task that is a dependent of all other tasks.
117 TaskDeque pending_tasks_; 195 scoped_refptr<internal::WorkerPoolTask> root_;
196
197 // This set contains all pending tasks.
198 ScheduledTaskMap pending_tasks_;
199
200 // Ordered set of tasks that are ready to run.
201 // TODO(reveman): priority_queue might be more efficient.
202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
203 TaskMap ready_to_run_tasks_;
204
205 // This set contains all currently running tasks.
206 ScheduledTaskMap running_tasks_;
207
208 // Completed tasks not yet collected by origin thread.
118 TaskDeque completed_tasks_; 209 TaskDeque completed_tasks_;
119 210
120 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
121 212
122 DISALLOW_COPY_AND_ASSIGN(Inner); 213 DISALLOW_COPY_AND_ASSIGN(Inner);
123 }; 214 };
124 215
125 WorkerPool::Inner::Inner(WorkerPool* worker_pool, 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
126 size_t num_threads, 217 size_t num_threads,
127 const std::string& thread_name_prefix) 218 const std::string& thread_name_prefix)
128 : worker_pool_on_origin_thread_(worker_pool), 219 : worker_pool_on_origin_thread_(worker_pool),
129 lock_(), 220 lock_(),
130 has_pending_tasks_cv_(&lock_), 221 has_ready_to_run_tasks_cv_(&lock_),
131 origin_loop_(base::MessageLoopProxy::current()), 222 origin_loop_(base::MessageLoopProxy::current()),
132 weak_ptr_factory_(this), 223 weak_ptr_factory_(this),
133 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
134 weak_ptr_factory_.GetWeakPtr())), 225 weak_ptr_factory_.GetWeakPtr())),
135 on_idle_pending_(false), 226 on_idle_pending_(false),
136 next_thread_index_(0), 227 next_thread_index_(0),
137 running_task_count_(0),
138 shutdown_(false) { 228 shutdown_(false) {
139 base::AutoLock lock(lock_); 229 base::AutoLock lock(lock_);
140 230
141 while (workers_.size() < num_threads) { 231 while (workers_.size() < num_threads) {
142 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
143 new base::DelegateSimpleThread( 233 new base::DelegateSimpleThread(
144 this, 234 this,
145 thread_name_prefix + 235 thread_name_prefix +
146 base::StringPrintf( 236 base::StringPrintf(
147 "Worker%u", 237 "Worker%u",
148 static_cast<unsigned>(workers_.size() + 1)).c_str())); 238 static_cast<unsigned>(workers_.size() + 1)).c_str()));
149 worker->Start(); 239 worker->Start();
150 workers_.push_back(worker.Pass()); 240 workers_.push_back(worker.Pass());
151 } 241 }
152 } 242 }
153 243
154 WorkerPool::Inner::~Inner() { 244 WorkerPool::Inner::~Inner() {
155 base::AutoLock lock(lock_); 245 base::AutoLock lock(lock_);
156 246
157 DCHECK(shutdown_); 247 DCHECK(shutdown_);
158 248
159 // Cancel all pending callbacks.
160 weak_ptr_factory_.InvalidateWeakPtrs();
161
162 DCHECK_EQ(0u, pending_tasks_.size()); 249 DCHECK_EQ(0u, pending_tasks_.size());
250 DCHECK_EQ(0u, ready_to_run_tasks_.size());
251 DCHECK_EQ(0u, running_tasks_.size());
163 DCHECK_EQ(0u, completed_tasks_.size()); 252 DCHECK_EQ(0u, completed_tasks_.size());
164 DCHECK_EQ(0u, running_task_count_);
165 } 253 }
166 254
167 void WorkerPool::Inner::Shutdown() { 255 void WorkerPool::Inner::Shutdown() {
168 { 256 {
169 base::AutoLock lock(lock_); 257 base::AutoLock lock(lock_);
170 258
171 DCHECK(!shutdown_); 259 DCHECK(!shutdown_);
172 shutdown_ = true; 260 shutdown_ = true;
173 261
174 // Wake up a worker so it knows it should exit. This will cause all workers 262 // Wake up a worker so it knows it should exit. This will cause all workers
175 // to exit as each will wake up another worker before exiting. 263 // to exit as each will wake up another worker before exiting.
176 has_pending_tasks_cv_.Signal(); 264 has_ready_to_run_tasks_cv_.Signal();
177 } 265 }
178 266
179 while (workers_.size()) { 267 while (workers_.size()) {
180 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
181 // http://crbug.com/240453 - Join() is considered IO and will block this 269 // http://crbug.com/240453 - Join() is considered IO and will block this
182 // thread. See also http://crbug.com/239423 for further ideas. 270 // thread. See also http://crbug.com/239423 for further ideas.
183 base::ThreadRestrictions::ScopedAllowIO allow_io; 271 base::ThreadRestrictions::ScopedAllowIO allow_io;
184 worker->Join(); 272 worker->Join();
185 } 273 }
274
275 // Cancel any pending OnIdle callback.
276 weak_ptr_factory_.InvalidateWeakPtrs();
186 } 277 }
187 278
188 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
281 DCHECK(!root || !shutdown_);
282
283 scoped_refptr<internal::WorkerPoolTask> new_root(root);
284
285 ScheduledTaskMap new_pending_tasks;
286 ScheduledTaskMap new_running_tasks;
287 TaskMap new_ready_to_run_tasks;
288
289 // Build scheduled task map before acquiring |lock_|.
290 if (root)
291 BuildScheduledTaskMap(root, &new_pending_tasks);
292
293 {
294 base::AutoLock lock(lock_);
295
296 // First remove all completed tasks from |new_pending_tasks|.
297 for (TaskDeque::iterator it = completed_tasks_.begin();
298 it != completed_tasks_.end(); ++it) {
299 internal::WorkerPoolTask* task = *it;
300 new_pending_tasks.take_and_erase(task);
301 }
302
303 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
304 for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
305 it != pending_tasks_.end(); ++it) {
306 internal::WorkerPoolTask* task = it->first;
307
308 // Task has completed if not present in |new_pending_tasks|.
309 if (!new_pending_tasks.contains(task))
310 completed_tasks_.push_back(task);
311 }
312
313 // Build new running task set.
314 for (ScheduledTaskMap::iterator it = running_tasks_.begin();
315 it != running_tasks_.end(); ++it) {
316 internal::WorkerPoolTask* task = it->first;
317 // Transfer scheduled task value from |new_pending_tasks| to
318 // |new_running_tasks| if currently running. Value must be set to
319 // NULL if |new_pending_tasks| doesn't contain task. This does
320 // the right in both cases.
321 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
322 }
323
324 // Build new "ready to run" tasks queue.
325 for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
326 it != new_pending_tasks.end(); ++it) {
327 internal::WorkerPoolTask* task = it->first;
328
329 // Completed tasks should not exist in |new_pending_tasks|.
330 DCHECK(!task->HasFinishedRunning());
331
332 // Call DidSchedule() to indicate that this task has been scheduled.
333 // Note: This is only for debugging purposes.
334 task->DidSchedule();
335
336 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
337 if (task->IsReadyToRun())
338 new_ready_to_run_tasks[it->second->priority()] = task;
339 }
340
341 // Swap root taskand task sets.
342 // Note: old tasks are intentionally destroyed after releasing |lock_|.
343 root_.swap(new_root);
344 pending_tasks_.swap(new_pending_tasks);
345 running_tasks_.swap(new_running_tasks);
346 ready_to_run_tasks_.swap(new_ready_to_run_tasks);
347
348 // If there is more work available, wake up worker thread.
349 if (!ready_to_run_tasks_.empty())
350 has_ready_to_run_tasks_cv_.Signal();
351 }
352 }
353
354 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
189 base::AutoLock lock(lock_); 355 base::AutoLock lock(lock_);
190 356
191 pending_tasks_.push_back(task.Pass()); 357 return CollectCompletedTasksWithLockAcquired(completed_tasks);
192
193 // There is more work available, so wake up worker thread.
194 has_pending_tasks_cv_.Signal();
195 } 358 }
196 359
197 bool WorkerPool::Inner::CollectCompletedTasks() { 360 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
198 base::AutoLock lock(lock_); 361 TaskDeque* completed_tasks) {
199
200 return AppendCompletedTasksWithLockAcquired(
201 &worker_pool_on_origin_thread_->completed_tasks_);
202 }
203
204 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
205 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
206 lock_.AssertAcquired(); 362 lock_.AssertAcquired();
207 363
208 while (completed_tasks_.size()) 364 DCHECK_EQ(0u, completed_tasks->size());
209 completed_tasks->push_back(completed_tasks_.take_front().Pass()); 365 completed_tasks->swap(completed_tasks_);
210 366
211 return !running_task_count_ && pending_tasks_.empty(); 367 return running_tasks_.empty() && pending_tasks_.empty();
212 } 368 }
213 369
214 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { 370 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
215 lock_.AssertAcquired(); 371 lock_.AssertAcquired();
216 372
217 if (on_idle_pending_) 373 if (on_idle_pending_)
218 return; 374 return;
219 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); 375 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
220 on_idle_pending_ = true; 376 on_idle_pending_ = true;
221 } 377 }
222 378
223 void WorkerPool::Inner::OnIdleOnOriginThread() { 379 void WorkerPool::Inner::OnIdleOnOriginThread() {
380 TaskDeque completed_tasks;
381
224 { 382 {
225 base::AutoLock lock(lock_); 383 base::AutoLock lock(lock_);
226 384
227 DCHECK(on_idle_pending_); 385 DCHECK(on_idle_pending_);
228 on_idle_pending_ = false; 386 on_idle_pending_ = false;
229 387
230 // Early out if no longer idle. 388 // Early out if no longer idle.
231 if (running_task_count_ || !pending_tasks_.empty()) 389 if (!running_tasks_.empty() || !pending_tasks_.empty())
232 return; 390 return;
233 391
234 AppendCompletedTasksWithLockAcquired( 392 CollectCompletedTasksWithLockAcquired(&completed_tasks);
235 &worker_pool_on_origin_thread_->completed_tasks_);
236 } 393 }
237 394
238 worker_pool_on_origin_thread_->OnIdle(); 395 worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
239 } 396 }
240 397
241 void WorkerPool::Inner::Run() { 398 void WorkerPool::Inner::Run() {
242 #if defined(OS_ANDROID) 399 #if defined(OS_ANDROID)
243 base::PlatformThread::SetThreadPriority( 400 base::PlatformThread::SetThreadPriority(
244 base::PlatformThread::CurrentHandle(), 401 base::PlatformThread::CurrentHandle(),
245 base::kThreadPriority_Background); 402 base::kThreadPriority_Background);
246 #endif 403 #endif
247 404
248 base::AutoLock lock(lock_); 405 base::AutoLock lock(lock_);
249 406
250 // Get a unique thread index. 407 // Get a unique thread index.
251 int thread_index = next_thread_index_++; 408 int thread_index = next_thread_index_++;
252 409
253 while (true) { 410 while (true) {
254 if (pending_tasks_.empty()) { 411 if (ready_to_run_tasks_.empty()) {
255 // Exit when shutdown is set and no more tasks are pending. 412 if (pending_tasks_.empty()) {
256 if (shutdown_) 413 // Exit when shutdown is set and no more tasks are pending.
257 break; 414 if (shutdown_)
415 break;
258 416
259 // Schedule an idle callback if requested and not pending. 417 // Schedule an idle callback if no tasks are running.
260 if (!running_task_count_) 418 if (running_tasks_.empty())
261 ScheduleOnIdleWithLockAcquired(); 419 ScheduleOnIdleWithLockAcquired();
420 }
262 421
263 // Wait for new pending tasks. 422 // Wait for more tasks.
264 has_pending_tasks_cv_.Wait(); 423 has_ready_to_run_tasks_cv_.Wait();
265 continue; 424 continue;
266 } 425 }
267 426
268 // Get next task. 427 // Take top priority task from |ready_to_run_tasks_|.
269 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); 428 scoped_refptr<internal::WorkerPoolTask> task(
429 ready_to_run_tasks_.begin()->second);
430 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
270 431
271 // Increment |running_task_count_| before starting to run task. 432 // Move task from |pending_tasks_| to |running_tasks_|.
272 running_task_count_++; 433 DCHECK(pending_tasks_.contains(task));
434 DCHECK(!running_tasks_.contains(task));
435 running_tasks_.set(task, pending_tasks_.take_and_erase(task));
273 436
274 // There may be more work available, so wake up another 437 // There may be more work available, so wake up another worker thread.
275 // worker thread. 438 has_ready_to_run_tasks_cv_.Signal();
276 has_pending_tasks_cv_.Signal(); 439
440 // Call WillRun() before releasing |lock_| and running task.
441 task->WillRun();
277 442
278 { 443 {
279 base::AutoUnlock unlock(lock_); 444 base::AutoUnlock unlock(lock_);
280 445
281 task->RunOnThread(thread_index); 446 task->RunOnThread(thread_index);
282 } 447 }
283 448
284 completed_tasks_.push_back(task.Pass()); 449 // This will mark task as finished running.
450 task->DidRun();
285 451
286 // Decrement |running_task_count_| now that we are done running task. 452 // Now iterate over all dependents to check if they are ready to run.
287 running_task_count_--; 453 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
454 task);
455 if (scheduled_task) {
456 typedef internal::WorkerPoolTask::TaskVector TaskVector;
457 for (TaskVector::iterator it = scheduled_task->dependents().begin();
458 it != scheduled_task->dependents().end(); ++it) {
459 internal::WorkerPoolTask* dependent = *it;
460 if (!dependent->IsReadyToRun())
461 continue;
462
463 // Task is ready. Add it to |ready_to_run_tasks_|.
464 DCHECK(pending_tasks_.contains(dependent));
465 unsigned priority = pending_tasks_.get(dependent)->priority();
466 DCHECK(!ready_to_run_tasks_.count(priority) ||
467 ready_to_run_tasks_[priority] == dependent);
468 ready_to_run_tasks_[priority] = dependent;
469 }
470 }
471
472 // Finally add task to |completed_tasks_|.
473 completed_tasks_.push_back(task);
288 } 474 }
289 475
290 // We noticed we should exit. Wake up the next worker so it knows it should 476 // We noticed we should exit. Wake up the next worker so it knows it should
291 // exit as well (because the Shutdown() code only signals once). 477 // exit as well (because the Shutdown() code only signals once).
292 has_pending_tasks_cv_.Signal(); 478 has_ready_to_run_tasks_cv_.Signal();
479 }
480
481 // BuildScheduledTaskMap() takes a task tree as input and constructs
482 // a unique set of tasks with edges between dependencies pointing in
483 // the direction of the dependents. Each task is given a unique priority
484 // which is currently the same as the DFS traversal order.
485 //
486 // Input: Output:
487 //
488 // root task4 Task | Priority (lower is better)
489 // / \ / \ -------+---------------------------
490 // task1 task2 task3 task2 root | 4
491 // | | | | task1 | 2
492 // task3 | task1 | task2 | 3
493 // | | \ / task3 | 1
494 // task4 task4 root task4 | 0
495 //
496 // The output can be used to efficiently maintain a queue of
497 // "ready to run" tasks.
498
499 // static
500 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
501 internal::WorkerPoolTask* task,
502 internal::WorkerPoolTask* dependent,
503 unsigned priority,
504 ScheduledTaskMap* scheduled_tasks) {
505 // Skip sub-tree if task has already completed.
506 if (task->HasCompleted())
507 return priority;
508
509 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
510 if (scheduled_it != scheduled_tasks->end()) {
511 DCHECK(dependent);
512 scheduled_it->second->dependents().push_back(dependent);
513 return priority;
514 }
515
516 typedef internal::WorkerPoolTask::TaskVector TaskVector;
517 for (TaskVector::iterator it = task->dependencies().begin();
518 it != task->dependencies().end(); ++it) {
519 internal::WorkerPoolTask* dependency = *it;
520 priority = BuildScheduledTaskMapRecursive(
521 dependency, task, priority, scheduled_tasks);
522 }
523
524 scheduled_tasks->set(task,
525 make_scoped_ptr(new ScheduledTask(dependent,
526 priority)));
527
528 return priority + 1;
529 }
530
531 // static
532 void WorkerPool::Inner::BuildScheduledTaskMap(
533 internal::WorkerPoolTask* root,
534 ScheduledTaskMap* scheduled_tasks) {
535 const unsigned kBasePriority = 0u;
536 DCHECK(root);
537 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
293 } 538 }
294 539
295 WorkerPool::WorkerPool(size_t num_threads, 540 WorkerPool::WorkerPool(size_t num_threads,
296 base::TimeDelta check_for_completed_tasks_delay, 541 base::TimeDelta check_for_completed_tasks_delay,
297 const std::string& thread_name_prefix) 542 const std::string& thread_name_prefix)
298 : client_(NULL), 543 : client_(NULL),
299 origin_loop_(base::MessageLoopProxy::current()), 544 origin_loop_(base::MessageLoopProxy::current()),
300 weak_ptr_factory_(this),
301 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), 545 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
302 check_for_completed_tasks_pending_(false), 546 check_for_completed_tasks_pending_(false),
547 in_dispatch_completion_callbacks_(false),
303 inner_(make_scoped_ptr(new Inner(this, 548 inner_(make_scoped_ptr(new Inner(this,
304 num_threads, 549 num_threads,
305 thread_name_prefix))) { 550 thread_name_prefix))) {
306 } 551 }
307 552
308 WorkerPool::~WorkerPool() { 553 WorkerPool::~WorkerPool() {
309 // Cancel all pending callbacks.
310 weak_ptr_factory_.InvalidateWeakPtrs();
311
312 DCHECK_EQ(0u, completed_tasks_.size());
313 } 554 }
314 555
315 void WorkerPool::Shutdown() { 556 void WorkerPool::Shutdown() {
557 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
558
559 DCHECK(!in_dispatch_completion_callbacks_);
560
316 inner_->Shutdown(); 561 inner_->Shutdown();
317 inner_->CollectCompletedTasks(); 562
318 DispatchCompletionCallbacks(); 563 TaskDeque completed_tasks;
564 inner_->CollectCompletedTasks(&completed_tasks);
565 DispatchCompletionCallbacks(&completed_tasks);
319 } 566 }
320 567
321 void WorkerPool::PostTaskAndReply( 568 void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
322 const Callback& task, const base::Closure& reply) {
323 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
324 task,
325 reply)).PassAs<internal::WorkerPoolTask>());
326 }
327
328 void WorkerPool::OnIdle() {
329 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); 569 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
330 570
331 DispatchCompletionCallbacks(); 571 DCHECK(!in_dispatch_completion_callbacks_);
572
573 DispatchCompletionCallbacks(completed_tasks);
574
575 // Cancel any pending check for completed tasks.
576 check_for_completed_tasks_callback_.Cancel();
577 check_for_completed_tasks_pending_ = false;
332 } 578 }
333 579
334 void WorkerPool::ScheduleCheckForCompletedTasks() { 580 void WorkerPool::ScheduleCheckForCompletedTasks() {
335 if (check_for_completed_tasks_pending_) 581 if (check_for_completed_tasks_pending_)
336 return; 582 return;
583 check_for_completed_tasks_callback_.Reset(
584 base::Bind(&WorkerPool::CheckForCompletedTasks,
585 base::Unretained(this)));
337 origin_loop_->PostDelayedTask( 586 origin_loop_->PostDelayedTask(
338 FROM_HERE, 587 FROM_HERE,
339 base::Bind(&WorkerPool::CheckForCompletedTasks, 588 check_for_completed_tasks_callback_.callback(),
340 weak_ptr_factory_.GetWeakPtr()),
341 check_for_completed_tasks_delay_); 589 check_for_completed_tasks_delay_);
342 check_for_completed_tasks_pending_ = true; 590 check_for_completed_tasks_pending_ = true;
343 } 591 }
344 592
345 void WorkerPool::CheckForCompletedTasks() { 593 void WorkerPool::CheckForCompletedTasks() {
346 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 594 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
347 DCHECK(check_for_completed_tasks_pending_); 595
596 DCHECK(!in_dispatch_completion_callbacks_);
597
598 check_for_completed_tasks_callback_.Cancel();
348 check_for_completed_tasks_pending_ = false; 599 check_for_completed_tasks_pending_ = false;
349 600
601 TaskDeque completed_tasks;
602
350 // Schedule another check for completed tasks if not idle. 603 // Schedule another check for completed tasks if not idle.
351 if (!inner_->CollectCompletedTasks()) 604 if (!inner_->CollectCompletedTasks(&completed_tasks))
352 ScheduleCheckForCompletedTasks(); 605 ScheduleCheckForCompletedTasks();
353 606
354 DispatchCompletionCallbacks(); 607 DispatchCompletionCallbacks(&completed_tasks);
355 } 608 }
356 609
357 void WorkerPool::DispatchCompletionCallbacks() { 610 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
358 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); 611 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
359 612
360 if (completed_tasks_.empty()) 613 // Early out when |completed_tasks| is empty to prevent unnecessary
614 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
615 if (completed_tasks->empty())
361 return; 616 return;
362 617
363 while (completed_tasks_.size()) { 618 // Worker pool instance is not reentrant while processing completed tasks.
364 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); 619 in_dispatch_completion_callbacks_ = true;
620
621 while (!completed_tasks->empty()) {
622 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
623 completed_tasks->pop_front();
365 task->DidComplete(); 624 task->DidComplete();
625 task->DispatchCompletionCallback();
366 } 626 }
367 627
628 in_dispatch_completion_callbacks_ = false;
629
368 DCHECK(client_); 630 DCHECK(client_);
369 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); 631 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
370 } 632 }
371 633
372 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 634 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
373 // Schedule check for completed tasks if not pending. 635 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
374 ScheduleCheckForCompletedTasks();
375 636
376 inner_->PostTask(task.Pass()); 637 DCHECK(!in_dispatch_completion_callbacks_);
638
639 // Schedule check for completed tasks.
640 if (root)
641 ScheduleCheckForCompletedTasks();
642
643 inner_->ScheduleTasks(root);
377 } 644 }
378 645
379 } // namespace cc 646 } // namespace cc
OLDNEW
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698