OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #include <algorithm> | 7 #if defined(OS_ANDROID) |
| 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| 9 #include <sys/resource.h> |
| 10 #endif |
| 11 |
| 12 #include <map> |
8 | 13 |
9 #include "base/bind.h" | 14 #include "base/bind.h" |
10 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
| 16 #include "base/hash_tables.h" |
11 #include "base/stringprintf.h" | 17 #include "base/stringprintf.h" |
12 #include "base/synchronization/condition_variable.h" | |
13 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
14 #include "base/threading/thread_restrictions.h" | 19 #include "base/threading/thread_restrictions.h" |
| 20 #include "cc/base/scoped_ptr_deque.h" |
| 21 #include "cc/base/scoped_ptr_hash_map.h" |
| 22 |
| 23 #if defined(COMPILER_GCC) |
| 24 namespace BASE_HASH_NAMESPACE { |
| 25 template <> struct hash<cc::internal::WorkerPoolTask*> { |
| 26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { |
| 27 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); |
| 28 } |
| 29 }; |
| 30 } // namespace BASE_HASH_NAMESPACE |
| 31 #endif // COMPILER |
15 | 32 |
16 namespace cc { | 33 namespace cc { |
17 | 34 |
18 namespace { | |
19 | |
20 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | |
21 public: | |
22 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | |
23 const base::Closure& reply) | |
24 : internal::WorkerPoolTask(reply), | |
25 task_(task) {} | |
26 | |
27 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | |
28 task_.Run(); | |
29 } | |
30 | |
31 private: | |
32 WorkerPool::Callback task_; | |
33 }; | |
34 | |
35 } // namespace | |
36 | |
37 namespace internal { | 35 namespace internal { |
38 | 36 |
39 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | 37 WorkerPoolTask::WorkerPoolTask() |
| 38 : did_schedule_(false), |
| 39 did_run_(false), |
| 40 did_complete_(false) { |
| 41 } |
| 42 |
| 43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) |
| 44 : did_schedule_(false), |
| 45 did_run_(false), |
| 46 did_complete_(false) { |
| 47 dependencies_.swap(*dependencies); |
40 } | 48 } |
41 | 49 |
42 WorkerPoolTask::~WorkerPoolTask() { | 50 WorkerPoolTask::~WorkerPoolTask() { |
| 51 DCHECK_EQ(did_schedule_, did_complete_); |
| 52 DCHECK(!did_run_ || did_schedule_); |
| 53 DCHECK(!did_run_ || did_complete_); |
| 54 } |
| 55 |
| 56 void WorkerPoolTask::DidSchedule() { |
| 57 DCHECK(!did_complete_); |
| 58 did_schedule_ = true; |
| 59 } |
| 60 |
| 61 void WorkerPoolTask::WillRun() { |
| 62 DCHECK(did_schedule_); |
| 63 DCHECK(!did_complete_); |
| 64 DCHECK(!did_run_); |
| 65 } |
| 66 |
| 67 void WorkerPoolTask::DidRun() { |
| 68 did_run_ = true; |
43 } | 69 } |
44 | 70 |
45 void WorkerPoolTask::DidComplete() { | 71 void WorkerPoolTask::DidComplete() { |
46 reply_.Run(); | 72 DCHECK(did_schedule_); |
| 73 DCHECK(!did_complete_); |
| 74 did_complete_ = true; |
| 75 } |
| 76 |
| 77 bool WorkerPoolTask::IsReadyToRun() const { |
| 78 // TODO(reveman): Use counter to improve performance. |
| 79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); |
| 80 it != dependencies_.rend(); ++it) { |
| 81 WorkerPoolTask* dependency = *it; |
| 82 if (!dependency->HasFinishedRunning()) |
| 83 return false; |
| 84 } |
| 85 return true; |
| 86 } |
| 87 |
| 88 bool WorkerPoolTask::HasFinishedRunning() const { |
| 89 return did_run_; |
| 90 } |
| 91 |
| 92 bool WorkerPoolTask::HasCompleted() const { |
| 93 return did_complete_; |
47 } | 94 } |
48 | 95 |
49 } // namespace internal | 96 } // namespace internal |
50 | 97 |
51 // Internal to the worker pool. Any data or logic that needs to be | 98 // Internal to the worker pool. Any data or logic that needs to be |
52 // shared between threads lives in this class. All members are guarded | 99 // shared between threads lives in this class. All members are guarded |
53 // by |lock_|. | 100 // by |lock_|. |
54 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
55 public: | 102 public: |
56 Inner(WorkerPool* worker_pool, | 103 Inner(WorkerPool* worker_pool, |
57 size_t num_threads, | 104 size_t num_threads, |
58 const std::string& thread_name_prefix); | 105 const std::string& thread_name_prefix); |
59 virtual ~Inner(); | 106 virtual ~Inner(); |
60 | 107 |
61 void Shutdown(); | 108 void Shutdown(); |
62 | 109 |
63 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 110 // Schedule running of |root| task and all its dependencies. Tasks |
| 111 // previously scheduled but no longer needed to run |root| will be |
| 112 // canceled unless already running. Canceled tasks are moved to |
| 113 // |completed_tasks_| without being run. The result is that once |
| 114 // scheduled, a task is guaranteed to end up in the |completed_tasks_| |
| 115 // queue even if they later get canceled by another call to |
| 116 // ScheduleTasks(). |
| 117 void ScheduleTasks(internal::WorkerPoolTask* root); |
64 | 118 |
65 // Appends all completed tasks to worker pool's completed tasks queue | 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
66 // and returns true if idle. | 120 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
67 bool CollectCompletedTasks(); | |
68 | 121 |
69 private: | 122 private: |
70 // Appends all completed tasks to |completed_tasks|. Lock must | 123 class ScheduledTask { |
71 // already be acquired before calling this function. | 124 public: |
72 bool AppendCompletedTasksWithLockAcquired( | 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) |
73 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 126 : priority_(priority) { |
| 127 if (dependent) |
| 128 dependents_.push_back(dependent); |
| 129 } |
| 130 |
| 131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } |
| 132 unsigned priority() const { return priority_; } |
| 133 |
| 134 private: |
| 135 internal::WorkerPoolTask::TaskVector dependents_; |
| 136 unsigned priority_; |
| 137 }; |
| 138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey; |
| 139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> |
| 140 ScheduledTaskMap; |
| 141 |
| 142 // This builds a ScheduledTaskMap from a root task. |
| 143 static unsigned BuildScheduledTaskMapRecursive( |
| 144 internal::WorkerPoolTask* task, |
| 145 internal::WorkerPoolTask* dependent, |
| 146 unsigned priority, |
| 147 ScheduledTaskMap* scheduled_tasks); |
| 148 static void BuildScheduledTaskMap( |
| 149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); |
| 150 |
| 151 // Collect all completed tasks by swapping the contents of |
| 152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
| 153 // before calling this function. Returns true if idle. |
| 154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
74 | 155 |
75 // Schedule an OnIdleOnOriginThread callback if not already pending. | 156 // Schedule an OnIdleOnOriginThread callback if not already pending. |
76 // Lock must already be acquired before calling this function. | 157 // Lock must already be acquired before calling this function. |
77 void ScheduleOnIdleWithLockAcquired(); | 158 void ScheduleOnIdleWithLockAcquired(); |
78 void OnIdleOnOriginThread(); | 159 void OnIdleOnOriginThread(); |
79 | 160 |
80 // Overridden from base::DelegateSimpleThread: | 161 // Overridden from base::DelegateSimpleThread: |
81 virtual void Run() OVERRIDE; | 162 virtual void Run() OVERRIDE; |
82 | 163 |
83 // Pointer to worker pool. Can only be used on origin thread. | 164 // Pointer to worker pool. Can only be used on origin thread. |
84 // Not guarded by |lock_|. | 165 // Not guarded by |lock_|. |
85 WorkerPool* worker_pool_on_origin_thread_; | 166 WorkerPool* worker_pool_on_origin_thread_; |
86 | 167 |
87 // This lock protects all members of this class except | 168 // This lock protects all members of this class except |
88 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
89 // without holding this lock. Do not block while holding this lock. | 170 // without holding this lock. Do not block while holding this lock. |
90 mutable base::Lock lock_; | 171 mutable base::Lock lock_; |
91 | 172 |
92 // Condition variable that is waited on by worker threads until new | 173 // Condition variable that is waited on by worker threads until new |
93 // tasks are posted or shutdown starts. | 174 // tasks are ready to run or shutdown starts. |
94 base::ConditionVariable has_pending_tasks_cv_; | 175 base::ConditionVariable has_ready_to_run_tasks_cv_; |
95 | 176 |
96 // Target message loop used for posting callbacks. | 177 // Target message loop used for posting callbacks. |
97 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 178 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
98 | 179 |
99 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 180 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
100 | 181 |
101 const base::Closure on_idle_callback_; | 182 const base::Closure on_idle_callback_; |
102 // Set when a OnIdleOnOriginThread() callback is pending. | 183 // Set when a OnIdleOnOriginThread() callback is pending. |
103 bool on_idle_pending_; | 184 bool on_idle_pending_; |
104 | 185 |
105 // Provides each running thread loop with a unique index. First thread | 186 // Provides each running thread loop with a unique index. First thread |
106 // loop index is 0. | 187 // loop index is 0. |
107 unsigned next_thread_index_; | 188 unsigned next_thread_index_; |
108 | 189 |
109 // Number of tasks currently running. | |
110 unsigned running_task_count_; | |
111 | |
112 // Set during shutdown. Tells workers to exit when no more tasks | 190 // Set during shutdown. Tells workers to exit when no more tasks |
113 // are pending. | 191 // are pending. |
114 bool shutdown_; | 192 bool shutdown_; |
115 | 193 |
116 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 194 // The root task that is a dependent of all other tasks. |
117 TaskDeque pending_tasks_; | 195 scoped_refptr<internal::WorkerPoolTask> root_; |
| 196 |
| 197 // This set contains all pending tasks. |
| 198 ScheduledTaskMap pending_tasks_; |
| 199 |
| 200 // Ordered set of tasks that are ready to run. |
| 201 // TODO(reveman): priority_queue might be more efficient. |
| 202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; |
| 203 TaskMap ready_to_run_tasks_; |
| 204 |
| 205 // This set contains all currently running tasks. |
| 206 ScheduledTaskMap running_tasks_; |
| 207 |
| 208 // Completed tasks not yet collected by origin thread. |
118 TaskDeque completed_tasks_; | 209 TaskDeque completed_tasks_; |
119 | 210 |
120 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
121 | 212 |
122 DISALLOW_COPY_AND_ASSIGN(Inner); | 213 DISALLOW_COPY_AND_ASSIGN(Inner); |
123 }; | 214 }; |
124 | 215 |
125 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
126 size_t num_threads, | 217 size_t num_threads, |
127 const std::string& thread_name_prefix) | 218 const std::string& thread_name_prefix) |
128 : worker_pool_on_origin_thread_(worker_pool), | 219 : worker_pool_on_origin_thread_(worker_pool), |
129 lock_(), | 220 lock_(), |
130 has_pending_tasks_cv_(&lock_), | 221 has_ready_to_run_tasks_cv_(&lock_), |
131 origin_loop_(base::MessageLoopProxy::current()), | 222 origin_loop_(base::MessageLoopProxy::current()), |
132 weak_ptr_factory_(this), | 223 weak_ptr_factory_(this), |
133 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
134 weak_ptr_factory_.GetWeakPtr())), | 225 weak_ptr_factory_.GetWeakPtr())), |
135 on_idle_pending_(false), | 226 on_idle_pending_(false), |
136 next_thread_index_(0), | 227 next_thread_index_(0), |
137 running_task_count_(0), | |
138 shutdown_(false) { | 228 shutdown_(false) { |
139 base::AutoLock lock(lock_); | 229 base::AutoLock lock(lock_); |
140 | 230 |
141 while (workers_.size() < num_threads) { | 231 while (workers_.size() < num_threads) { |
142 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
143 new base::DelegateSimpleThread( | 233 new base::DelegateSimpleThread( |
144 this, | 234 this, |
145 thread_name_prefix + | 235 thread_name_prefix + |
146 base::StringPrintf( | 236 base::StringPrintf( |
147 "Worker%u", | 237 "Worker%u", |
148 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 238 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
149 worker->Start(); | 239 worker->Start(); |
150 workers_.push_back(worker.Pass()); | 240 workers_.push_back(worker.Pass()); |
151 } | 241 } |
152 } | 242 } |
153 | 243 |
154 WorkerPool::Inner::~Inner() { | 244 WorkerPool::Inner::~Inner() { |
155 base::AutoLock lock(lock_); | 245 base::AutoLock lock(lock_); |
156 | 246 |
157 DCHECK(shutdown_); | 247 DCHECK(shutdown_); |
158 | 248 |
159 // Cancel all pending callbacks. | |
160 weak_ptr_factory_.InvalidateWeakPtrs(); | |
161 | |
162 DCHECK_EQ(0u, pending_tasks_.size()); | 249 DCHECK_EQ(0u, pending_tasks_.size()); |
| 250 DCHECK_EQ(0u, ready_to_run_tasks_.size()); |
| 251 DCHECK_EQ(0u, running_tasks_.size()); |
163 DCHECK_EQ(0u, completed_tasks_.size()); | 252 DCHECK_EQ(0u, completed_tasks_.size()); |
164 DCHECK_EQ(0u, running_task_count_); | |
165 } | 253 } |
166 | 254 |
167 void WorkerPool::Inner::Shutdown() { | 255 void WorkerPool::Inner::Shutdown() { |
168 { | 256 { |
169 base::AutoLock lock(lock_); | 257 base::AutoLock lock(lock_); |
170 | 258 |
171 DCHECK(!shutdown_); | 259 DCHECK(!shutdown_); |
172 shutdown_ = true; | 260 shutdown_ = true; |
173 | 261 |
174 // Wake up a worker so it knows it should exit. This will cause all workers | 262 // Wake up a worker so it knows it should exit. This will cause all workers |
175 // to exit as each will wake up another worker before exiting. | 263 // to exit as each will wake up another worker before exiting. |
176 has_pending_tasks_cv_.Signal(); | 264 has_ready_to_run_tasks_cv_.Signal(); |
177 } | 265 } |
178 | 266 |
179 while (workers_.size()) { | 267 while (workers_.size()) { |
180 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
181 // http://crbug.com/240453 - Join() is considered IO and will block this | 269 // http://crbug.com/240453 - Join() is considered IO and will block this |
182 // thread. See also http://crbug.com/239423 for further ideas. | 270 // thread. See also http://crbug.com/239423 for further ideas. |
183 base::ThreadRestrictions::ScopedAllowIO allow_io; | 271 base::ThreadRestrictions::ScopedAllowIO allow_io; |
184 worker->Join(); | 272 worker->Join(); |
185 } | 273 } |
| 274 |
| 275 // Cancel any pending OnIdle callback. |
| 276 weak_ptr_factory_.InvalidateWeakPtrs(); |
186 } | 277 } |
187 | 278 |
188 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { |
| 280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. |
| 281 DCHECK(!root || !shutdown_); |
| 282 |
| 283 scoped_refptr<internal::WorkerPoolTask> new_root(root); |
| 284 |
| 285 ScheduledTaskMap new_pending_tasks; |
| 286 ScheduledTaskMap new_running_tasks; |
| 287 TaskMap new_ready_to_run_tasks; |
| 288 |
| 289 // Build scheduled task map before acquiring |lock_|. |
| 290 if (root) |
| 291 BuildScheduledTaskMap(root, &new_pending_tasks); |
| 292 |
| 293 { |
| 294 base::AutoLock lock(lock_); |
| 295 |
| 296 // First remove all completed tasks from |new_pending_tasks|. |
| 297 for (TaskDeque::iterator it = completed_tasks_.begin(); |
| 298 it != completed_tasks_.end(); ++it) { |
| 299 internal::WorkerPoolTask* task = *it; |
| 300 new_pending_tasks.take_and_erase(task); |
| 301 } |
| 302 |
| 303 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. |
| 304 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); |
| 305 it != pending_tasks_.end(); ++it) { |
| 306 internal::WorkerPoolTask* task = it->first; |
| 307 |
| 308 // Task has completed if not present in |new_pending_tasks|. |
| 309 if (!new_pending_tasks.contains(task)) |
| 310 completed_tasks_.push_back(task); |
| 311 } |
| 312 |
| 313 // Build new running task set. |
| 314 for (ScheduledTaskMap::iterator it = running_tasks_.begin(); |
| 315 it != running_tasks_.end(); ++it) { |
| 316 internal::WorkerPoolTask* task = it->first; |
| 317 // Transfer scheduled task value from |new_pending_tasks| to |
| 318 // |new_running_tasks| if currently running. Value must be set to |
| 319 // NULL if |new_pending_tasks| doesn't contain task. This does |
| 320 // the right in both cases. |
| 321 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); |
| 322 } |
| 323 |
| 324 // Build new "ready to run" tasks queue. |
| 325 for (ScheduledTaskMap::iterator it = new_pending_tasks.begin(); |
| 326 it != new_pending_tasks.end(); ++it) { |
| 327 internal::WorkerPoolTask* task = it->first; |
| 328 |
| 329 // Completed tasks should not exist in |new_pending_tasks|. |
| 330 DCHECK(!task->HasFinishedRunning()); |
| 331 |
| 332 // Call DidSchedule() to indicate that this task has been scheduled. |
| 333 // Note: This is only for debugging purposes. |
| 334 task->DidSchedule(); |
| 335 |
| 336 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); |
| 337 if (task->IsReadyToRun()) |
| 338 new_ready_to_run_tasks[it->second->priority()] = task; |
| 339 } |
| 340 |
| 341 // Swap root taskand task sets. |
| 342 // Note: old tasks are intentionally destroyed after releasing |lock_|. |
| 343 root_.swap(new_root); |
| 344 pending_tasks_.swap(new_pending_tasks); |
| 345 running_tasks_.swap(new_running_tasks); |
| 346 ready_to_run_tasks_.swap(new_ready_to_run_tasks); |
| 347 |
| 348 // If there is more work available, wake up worker thread. |
| 349 if (!ready_to_run_tasks_.empty()) |
| 350 has_ready_to_run_tasks_cv_.Signal(); |
| 351 } |
| 352 } |
| 353 |
| 354 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
189 base::AutoLock lock(lock_); | 355 base::AutoLock lock(lock_); |
190 | 356 |
191 pending_tasks_.push_back(task.Pass()); | 357 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
192 | |
193 // There is more work available, so wake up worker thread. | |
194 has_pending_tasks_cv_.Signal(); | |
195 } | 358 } |
196 | 359 |
197 bool WorkerPool::Inner::CollectCompletedTasks() { | 360 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
198 base::AutoLock lock(lock_); | 361 TaskDeque* completed_tasks) { |
199 | |
200 return AppendCompletedTasksWithLockAcquired( | |
201 &worker_pool_on_origin_thread_->completed_tasks_); | |
202 } | |
203 | |
204 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | |
205 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | |
206 lock_.AssertAcquired(); | 362 lock_.AssertAcquired(); |
207 | 363 |
208 while (completed_tasks_.size()) | 364 DCHECK_EQ(0u, completed_tasks->size()); |
209 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 365 completed_tasks->swap(completed_tasks_); |
210 | 366 |
211 return !running_task_count_ && pending_tasks_.empty(); | 367 return running_tasks_.empty() && pending_tasks_.empty(); |
212 } | 368 } |
213 | 369 |
214 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 370 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
215 lock_.AssertAcquired(); | 371 lock_.AssertAcquired(); |
216 | 372 |
217 if (on_idle_pending_) | 373 if (on_idle_pending_) |
218 return; | 374 return; |
219 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 375 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
220 on_idle_pending_ = true; | 376 on_idle_pending_ = true; |
221 } | 377 } |
222 | 378 |
223 void WorkerPool::Inner::OnIdleOnOriginThread() { | 379 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 380 TaskDeque completed_tasks; |
| 381 |
224 { | 382 { |
225 base::AutoLock lock(lock_); | 383 base::AutoLock lock(lock_); |
226 | 384 |
227 DCHECK(on_idle_pending_); | 385 DCHECK(on_idle_pending_); |
228 on_idle_pending_ = false; | 386 on_idle_pending_ = false; |
229 | 387 |
230 // Early out if no longer idle. | 388 // Early out if no longer idle. |
231 if (running_task_count_ || !pending_tasks_.empty()) | 389 if (!running_tasks_.empty() || !pending_tasks_.empty()) |
232 return; | 390 return; |
233 | 391 |
234 AppendCompletedTasksWithLockAcquired( | 392 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
235 &worker_pool_on_origin_thread_->completed_tasks_); | |
236 } | 393 } |
237 | 394 |
238 worker_pool_on_origin_thread_->OnIdle(); | 395 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
239 } | 396 } |
240 | 397 |
241 void WorkerPool::Inner::Run() { | 398 void WorkerPool::Inner::Run() { |
242 #if defined(OS_ANDROID) | 399 #if defined(OS_ANDROID) |
243 base::PlatformThread::SetThreadPriority( | 400 base::PlatformThread::SetThreadPriority( |
244 base::PlatformThread::CurrentHandle(), | 401 base::PlatformThread::CurrentHandle(), |
245 base::kThreadPriority_Background); | 402 base::kThreadPriority_Background); |
246 #endif | 403 #endif |
247 | 404 |
248 base::AutoLock lock(lock_); | 405 base::AutoLock lock(lock_); |
249 | 406 |
250 // Get a unique thread index. | 407 // Get a unique thread index. |
251 int thread_index = next_thread_index_++; | 408 int thread_index = next_thread_index_++; |
252 | 409 |
253 while (true) { | 410 while (true) { |
254 if (pending_tasks_.empty()) { | 411 if (ready_to_run_tasks_.empty()) { |
255 // Exit when shutdown is set and no more tasks are pending. | 412 if (pending_tasks_.empty()) { |
256 if (shutdown_) | 413 // Exit when shutdown is set and no more tasks are pending. |
257 break; | 414 if (shutdown_) |
| 415 break; |
258 | 416 |
259 // Schedule an idle callback if requested and not pending. | 417 // Schedule an idle callback if no tasks are running. |
260 if (!running_task_count_) | 418 if (running_tasks_.empty()) |
261 ScheduleOnIdleWithLockAcquired(); | 419 ScheduleOnIdleWithLockAcquired(); |
| 420 } |
262 | 421 |
263 // Wait for new pending tasks. | 422 // Wait for more tasks. |
264 has_pending_tasks_cv_.Wait(); | 423 has_ready_to_run_tasks_cv_.Wait(); |
265 continue; | 424 continue; |
266 } | 425 } |
267 | 426 |
268 // Get next task. | 427 // Take top priority task from |ready_to_run_tasks_|. |
269 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 428 scoped_refptr<internal::WorkerPoolTask> task( |
| 429 ready_to_run_tasks_.begin()->second); |
| 430 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); |
270 | 431 |
271 // Increment |running_task_count_| before starting to run task. | 432 // Move task from |pending_tasks_| to |running_tasks_|. |
272 running_task_count_++; | 433 DCHECK(pending_tasks_.contains(task)); |
| 434 DCHECK(!running_tasks_.contains(task)); |
| 435 running_tasks_.set(task, pending_tasks_.take_and_erase(task)); |
273 | 436 |
274 // There may be more work available, so wake up another | 437 // There may be more work available, so wake up another worker thread. |
275 // worker thread. | 438 has_ready_to_run_tasks_cv_.Signal(); |
276 has_pending_tasks_cv_.Signal(); | 439 |
| 440 // Call WillRun() before releasing |lock_| and running task. |
| 441 task->WillRun(); |
277 | 442 |
278 { | 443 { |
279 base::AutoUnlock unlock(lock_); | 444 base::AutoUnlock unlock(lock_); |
280 | 445 |
281 task->RunOnThread(thread_index); | 446 task->RunOnThread(thread_index); |
282 } | 447 } |
283 | 448 |
284 completed_tasks_.push_back(task.Pass()); | 449 // This will mark task as finished running. |
| 450 task->DidRun(); |
285 | 451 |
286 // Decrement |running_task_count_| now that we are done running task. | 452 // Now iterate over all dependents to check if they are ready to run. |
287 running_task_count_--; | 453 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( |
| 454 task); |
| 455 if (scheduled_task) { |
| 456 typedef internal::WorkerPoolTask::TaskVector TaskVector; |
| 457 for (TaskVector::iterator it = scheduled_task->dependents().begin(); |
| 458 it != scheduled_task->dependents().end(); ++it) { |
| 459 internal::WorkerPoolTask* dependent = *it; |
| 460 if (!dependent->IsReadyToRun()) |
| 461 continue; |
| 462 |
| 463 // Task is ready. Add it to |ready_to_run_tasks_|. |
| 464 DCHECK(pending_tasks_.contains(dependent)); |
| 465 unsigned priority = pending_tasks_.get(dependent)->priority(); |
| 466 DCHECK(!ready_to_run_tasks_.count(priority) || |
| 467 ready_to_run_tasks_[priority] == dependent); |
| 468 ready_to_run_tasks_[priority] = dependent; |
| 469 } |
| 470 } |
| 471 |
| 472 // Finally add task to |completed_tasks_|. |
| 473 completed_tasks_.push_back(task); |
288 } | 474 } |
289 | 475 |
290 // We noticed we should exit. Wake up the next worker so it knows it should | 476 // We noticed we should exit. Wake up the next worker so it knows it should |
291 // exit as well (because the Shutdown() code only signals once). | 477 // exit as well (because the Shutdown() code only signals once). |
292 has_pending_tasks_cv_.Signal(); | 478 has_ready_to_run_tasks_cv_.Signal(); |
| 479 } |
| 480 |
| 481 // BuildScheduledTaskMap() takes a task tree as input and constructs |
| 482 // a unique set of tasks with edges between dependencies pointing in |
| 483 // the direction of the dependents. Each task is given a unique priority |
| 484 // which is currently the same as the DFS traversal order. |
| 485 // |
| 486 // Input: Output: |
| 487 // |
| 488 // root task4 Task | Priority (lower is better) |
| 489 // / \ / \ -------+--------------------------- |
| 490 // task1 task2 task3 task2 root | 4 |
| 491 // | | | | task1 | 2 |
| 492 // task3 | task1 | task2 | 3 |
| 493 // | | \ / task3 | 1 |
| 494 // task4 task4 root task4 | 0 |
| 495 // |
| 496 // The output can be used to efficiently maintain a queue of |
| 497 // "ready to run" tasks. |
| 498 |
| 499 // static |
| 500 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( |
| 501 internal::WorkerPoolTask* task, |
| 502 internal::WorkerPoolTask* dependent, |
| 503 unsigned priority, |
| 504 ScheduledTaskMap* scheduled_tasks) { |
| 505 // Skip sub-tree if task has already completed. |
| 506 if (task->HasCompleted()) |
| 507 return priority; |
| 508 |
| 509 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); |
| 510 if (scheduled_it != scheduled_tasks->end()) { |
| 511 DCHECK(dependent); |
| 512 scheduled_it->second->dependents().push_back(dependent); |
| 513 return priority; |
| 514 } |
| 515 |
| 516 typedef internal::WorkerPoolTask::TaskVector TaskVector; |
| 517 for (TaskVector::iterator it = task->dependencies().begin(); |
| 518 it != task->dependencies().end(); ++it) { |
| 519 internal::WorkerPoolTask* dependency = *it; |
| 520 priority = BuildScheduledTaskMapRecursive( |
| 521 dependency, task, priority, scheduled_tasks); |
| 522 } |
| 523 |
| 524 scheduled_tasks->set(task, |
| 525 make_scoped_ptr(new ScheduledTask(dependent, |
| 526 priority))); |
| 527 |
| 528 return priority + 1; |
| 529 } |
| 530 |
| 531 // static |
| 532 void WorkerPool::Inner::BuildScheduledTaskMap( |
| 533 internal::WorkerPoolTask* root, |
| 534 ScheduledTaskMap* scheduled_tasks) { |
| 535 const unsigned kBasePriority = 0u; |
| 536 DCHECK(root); |
| 537 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); |
293 } | 538 } |
294 | 539 |
295 WorkerPool::WorkerPool(size_t num_threads, | 540 WorkerPool::WorkerPool(size_t num_threads, |
296 base::TimeDelta check_for_completed_tasks_delay, | 541 base::TimeDelta check_for_completed_tasks_delay, |
297 const std::string& thread_name_prefix) | 542 const std::string& thread_name_prefix) |
298 : client_(NULL), | 543 : client_(NULL), |
299 origin_loop_(base::MessageLoopProxy::current()), | 544 origin_loop_(base::MessageLoopProxy::current()), |
300 weak_ptr_factory_(this), | |
301 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 545 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
302 check_for_completed_tasks_pending_(false), | 546 check_for_completed_tasks_pending_(false), |
| 547 in_dispatch_completion_callbacks_(false), |
303 inner_(make_scoped_ptr(new Inner(this, | 548 inner_(make_scoped_ptr(new Inner(this, |
304 num_threads, | 549 num_threads, |
305 thread_name_prefix))) { | 550 thread_name_prefix))) { |
306 } | 551 } |
307 | 552 |
308 WorkerPool::~WorkerPool() { | 553 WorkerPool::~WorkerPool() { |
309 // Cancel all pending callbacks. | |
310 weak_ptr_factory_.InvalidateWeakPtrs(); | |
311 | |
312 DCHECK_EQ(0u, completed_tasks_.size()); | |
313 } | 554 } |
314 | 555 |
315 void WorkerPool::Shutdown() { | 556 void WorkerPool::Shutdown() { |
| 557 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); |
| 558 |
| 559 DCHECK(!in_dispatch_completion_callbacks_); |
| 560 |
316 inner_->Shutdown(); | 561 inner_->Shutdown(); |
317 inner_->CollectCompletedTasks(); | 562 |
318 DispatchCompletionCallbacks(); | 563 TaskDeque completed_tasks; |
| 564 inner_->CollectCompletedTasks(&completed_tasks); |
| 565 DispatchCompletionCallbacks(&completed_tasks); |
319 } | 566 } |
320 | 567 |
321 void WorkerPool::PostTaskAndReply( | 568 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
322 const Callback& task, const base::Closure& reply) { | |
323 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
324 task, | |
325 reply)).PassAs<internal::WorkerPoolTask>()); | |
326 } | |
327 | |
328 void WorkerPool::OnIdle() { | |
329 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 569 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
330 | 570 |
331 DispatchCompletionCallbacks(); | 571 DCHECK(!in_dispatch_completion_callbacks_); |
| 572 |
| 573 DispatchCompletionCallbacks(completed_tasks); |
| 574 |
| 575 // Cancel any pending check for completed tasks. |
| 576 check_for_completed_tasks_callback_.Cancel(); |
| 577 check_for_completed_tasks_pending_ = false; |
332 } | 578 } |
333 | 579 |
334 void WorkerPool::ScheduleCheckForCompletedTasks() { | 580 void WorkerPool::ScheduleCheckForCompletedTasks() { |
335 if (check_for_completed_tasks_pending_) | 581 if (check_for_completed_tasks_pending_) |
336 return; | 582 return; |
| 583 check_for_completed_tasks_callback_.Reset( |
| 584 base::Bind(&WorkerPool::CheckForCompletedTasks, |
| 585 base::Unretained(this))); |
337 origin_loop_->PostDelayedTask( | 586 origin_loop_->PostDelayedTask( |
338 FROM_HERE, | 587 FROM_HERE, |
339 base::Bind(&WorkerPool::CheckForCompletedTasks, | 588 check_for_completed_tasks_callback_.callback(), |
340 weak_ptr_factory_.GetWeakPtr()), | |
341 check_for_completed_tasks_delay_); | 589 check_for_completed_tasks_delay_); |
342 check_for_completed_tasks_pending_ = true; | 590 check_for_completed_tasks_pending_ = true; |
343 } | 591 } |
344 | 592 |
345 void WorkerPool::CheckForCompletedTasks() { | 593 void WorkerPool::CheckForCompletedTasks() { |
346 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 594 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
347 DCHECK(check_for_completed_tasks_pending_); | 595 |
| 596 DCHECK(!in_dispatch_completion_callbacks_); |
| 597 |
| 598 check_for_completed_tasks_callback_.Cancel(); |
348 check_for_completed_tasks_pending_ = false; | 599 check_for_completed_tasks_pending_ = false; |
349 | 600 |
| 601 TaskDeque completed_tasks; |
| 602 |
350 // Schedule another check for completed tasks if not idle. | 603 // Schedule another check for completed tasks if not idle. |
351 if (!inner_->CollectCompletedTasks()) | 604 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
352 ScheduleCheckForCompletedTasks(); | 605 ScheduleCheckForCompletedTasks(); |
353 | 606 |
354 DispatchCompletionCallbacks(); | 607 DispatchCompletionCallbacks(&completed_tasks); |
355 } | 608 } |
356 | 609 |
357 void WorkerPool::DispatchCompletionCallbacks() { | 610 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
358 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 611 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
359 | 612 |
360 if (completed_tasks_.empty()) | 613 // Early out when |completed_tasks| is empty to prevent unnecessary |
| 614 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
| 615 if (completed_tasks->empty()) |
361 return; | 616 return; |
362 | 617 |
363 while (completed_tasks_.size()) { | 618 // Worker pool instance is not reentrant while processing completed tasks. |
364 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 619 in_dispatch_completion_callbacks_ = true; |
| 620 |
| 621 while (!completed_tasks->empty()) { |
| 622 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| 623 completed_tasks->pop_front(); |
365 task->DidComplete(); | 624 task->DidComplete(); |
| 625 task->DispatchCompletionCallback(); |
366 } | 626 } |
367 | 627 |
| 628 in_dispatch_completion_callbacks_ = false; |
| 629 |
368 DCHECK(client_); | 630 DCHECK(client_); |
369 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 631 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
370 } | 632 } |
371 | 633 |
372 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 634 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { |
373 // Schedule check for completed tasks if not pending. | 635 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
374 ScheduleCheckForCompletedTasks(); | |
375 | 636 |
376 inner_->PostTask(task.Pass()); | 637 DCHECK(!in_dispatch_completion_callbacks_); |
| 638 |
| 639 // Schedule check for completed tasks. |
| 640 if (root) |
| 641 ScheduleCheckForCompletedTasks(); |
| 642 |
| 643 inner_->ScheduleTasks(root); |
377 } | 644 } |
378 | 645 |
379 } // namespace cc | 646 } // namespace cc |
OLD | NEW |