| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "cc/base/worker_pool.h" | |
| 6 | |
| 7 #if defined(OS_ANDROID) | |
| 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | |
| 9 #include <sys/resource.h> | |
| 10 #endif | |
| 11 | |
| 12 #include <map> | |
| 13 | |
| 14 #include "base/bind.h" | |
| 15 #include "base/containers/hash_tables.h" | |
| 16 #include "base/debug/trace_event.h" | |
| 17 #include "base/strings/stringprintf.h" | |
| 18 #include "base/threading/simple_thread.h" | |
| 19 #include "base/threading/thread_restrictions.h" | |
| 20 #include "cc/base/scoped_ptr_deque.h" | |
| 21 | |
| 22 namespace cc { | |
| 23 | |
| 24 namespace internal { | |
| 25 | |
| 26 WorkerPoolTask::WorkerPoolTask() | |
| 27 : did_schedule_(false), | |
| 28 did_run_(false), | |
| 29 did_complete_(false) { | |
| 30 } | |
| 31 | |
| 32 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) | |
| 33 : did_schedule_(false), | |
| 34 did_run_(false), | |
| 35 did_complete_(false) { | |
| 36 dependencies_.swap(*dependencies); | |
| 37 } | |
| 38 | |
| 39 WorkerPoolTask::~WorkerPoolTask() { | |
| 40 DCHECK_EQ(did_schedule_, did_complete_); | |
| 41 DCHECK(!did_run_ || did_schedule_); | |
| 42 DCHECK(!did_run_ || did_complete_); | |
| 43 } | |
| 44 | |
| 45 void WorkerPoolTask::DidSchedule() { | |
| 46 DCHECK(!did_complete_); | |
| 47 did_schedule_ = true; | |
| 48 } | |
| 49 | |
| 50 void WorkerPoolTask::WillRun() { | |
| 51 DCHECK(did_schedule_); | |
| 52 DCHECK(!did_complete_); | |
| 53 DCHECK(!did_run_); | |
| 54 } | |
| 55 | |
| 56 void WorkerPoolTask::DidRun() { | |
| 57 did_run_ = true; | |
| 58 } | |
| 59 | |
| 60 void WorkerPoolTask::DidComplete() { | |
| 61 DCHECK(did_schedule_); | |
| 62 DCHECK(!did_complete_); | |
| 63 did_complete_ = true; | |
| 64 } | |
| 65 | |
| 66 bool WorkerPoolTask::IsReadyToRun() const { | |
| 67 // TODO(reveman): Use counter to improve performance. | |
| 68 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); | |
| 69 it != dependencies_.rend(); ++it) { | |
| 70 WorkerPoolTask* dependency = it->get(); | |
| 71 if (!dependency->HasFinishedRunning()) | |
| 72 return false; | |
| 73 } | |
| 74 return true; | |
| 75 } | |
| 76 | |
| 77 bool WorkerPoolTask::HasFinishedRunning() const { | |
| 78 return did_run_; | |
| 79 } | |
| 80 | |
| 81 bool WorkerPoolTask::HasCompleted() const { | |
| 82 return did_complete_; | |
| 83 } | |
| 84 | |
| 85 } // namespace internal | |
| 86 | |
| 87 // Internal to the worker pool. Any data or logic that needs to be | |
| 88 // shared between threads lives in this class. All members are guarded | |
| 89 // by |lock_|. | |
| 90 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | |
| 91 public: | |
| 92 Inner(size_t num_threads, const std::string& thread_name_prefix); | |
| 93 virtual ~Inner(); | |
| 94 | |
| 95 void Shutdown(); | |
| 96 | |
| 97 // Schedule running of tasks in |graph|. Tasks previously scheduled but | |
| 98 // no longer needed will be canceled unless already running. Canceled | |
| 99 // tasks are moved to |completed_tasks_| without being run. The result | |
| 100 // is that once scheduled, a task is guaranteed to end up in the | |
| 101 // |completed_tasks_| queue even if they later get canceled by another | |
| 102 // call to SetTaskGraph(). | |
| 103 void SetTaskGraph(TaskGraph* graph); | |
| 104 | |
| 105 // Collect all completed tasks in |completed_tasks|. | |
| 106 void CollectCompletedTasks(TaskDeque* completed_tasks); | |
| 107 | |
| 108 private: | |
| 109 // Overridden from base::DelegateSimpleThread: | |
| 110 virtual void Run() OVERRIDE; | |
| 111 | |
| 112 // This lock protects all members of this class except | |
| 113 // |worker_pool_on_origin_thread_|. Do not read or modify anything | |
| 114 // without holding this lock. Do not block while holding this lock. | |
| 115 mutable base::Lock lock_; | |
| 116 | |
| 117 // Condition variable that is waited on by worker threads until new | |
| 118 // tasks are ready to run or shutdown starts. | |
| 119 base::ConditionVariable has_ready_to_run_tasks_cv_; | |
| 120 | |
| 121 // Provides each running thread loop with a unique index. First thread | |
| 122 // loop index is 0. | |
| 123 unsigned next_thread_index_; | |
| 124 | |
| 125 // Set during shutdown. Tells workers to exit when no more tasks | |
| 126 // are pending. | |
| 127 bool shutdown_; | |
| 128 | |
| 129 // This set contains all pending tasks. | |
| 130 GraphNodeMap pending_tasks_; | |
| 131 | |
| 132 // Ordered set of tasks that are ready to run. | |
| 133 // TODO(reveman): priority_queue might be more efficient. | |
| 134 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; | |
| 135 TaskMap ready_to_run_tasks_; | |
| 136 | |
| 137 // This set contains all currently running tasks. | |
| 138 GraphNodeMap running_tasks_; | |
| 139 | |
| 140 // Completed tasks not yet collected by origin thread. | |
| 141 TaskDeque completed_tasks_; | |
| 142 | |
| 143 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | |
| 144 | |
| 145 DISALLOW_COPY_AND_ASSIGN(Inner); | |
| 146 }; | |
| 147 | |
| 148 WorkerPool::Inner::Inner( | |
| 149 size_t num_threads, const std::string& thread_name_prefix) | |
| 150 : lock_(), | |
| 151 has_ready_to_run_tasks_cv_(&lock_), | |
| 152 next_thread_index_(0), | |
| 153 shutdown_(false) { | |
| 154 base::AutoLock lock(lock_); | |
| 155 | |
| 156 while (workers_.size() < num_threads) { | |
| 157 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | |
| 158 new base::DelegateSimpleThread( | |
| 159 this, | |
| 160 thread_name_prefix + | |
| 161 base::StringPrintf( | |
| 162 "Worker%u", | |
| 163 static_cast<unsigned>(workers_.size() + 1)).c_str())); | |
| 164 worker->Start(); | |
| 165 workers_.push_back(worker.Pass()); | |
| 166 } | |
| 167 } | |
| 168 | |
| 169 WorkerPool::Inner::~Inner() { | |
| 170 base::AutoLock lock(lock_); | |
| 171 | |
| 172 DCHECK(shutdown_); | |
| 173 | |
| 174 DCHECK_EQ(0u, pending_tasks_.size()); | |
| 175 DCHECK_EQ(0u, ready_to_run_tasks_.size()); | |
| 176 DCHECK_EQ(0u, running_tasks_.size()); | |
| 177 DCHECK_EQ(0u, completed_tasks_.size()); | |
| 178 } | |
| 179 | |
| 180 void WorkerPool::Inner::Shutdown() { | |
| 181 { | |
| 182 base::AutoLock lock(lock_); | |
| 183 | |
| 184 DCHECK(!shutdown_); | |
| 185 shutdown_ = true; | |
| 186 | |
| 187 // Wake up a worker so it knows it should exit. This will cause all workers | |
| 188 // to exit as each will wake up another worker before exiting. | |
| 189 has_ready_to_run_tasks_cv_.Signal(); | |
| 190 } | |
| 191 | |
| 192 while (workers_.size()) { | |
| 193 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | |
| 194 // http://crbug.com/240453 - Join() is considered IO and will block this | |
| 195 // thread. See also http://crbug.com/239423 for further ideas. | |
| 196 base::ThreadRestrictions::ScopedAllowIO allow_io; | |
| 197 worker->Join(); | |
| 198 } | |
| 199 } | |
| 200 | |
| 201 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { | |
| 202 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. | |
| 203 DCHECK(graph->empty() || !shutdown_); | |
| 204 | |
| 205 GraphNodeMap new_pending_tasks; | |
| 206 GraphNodeMap new_running_tasks; | |
| 207 TaskMap new_ready_to_run_tasks; | |
| 208 | |
| 209 new_pending_tasks.swap(*graph); | |
| 210 | |
| 211 { | |
| 212 base::AutoLock lock(lock_); | |
| 213 | |
| 214 // First remove all completed tasks from |new_pending_tasks|. | |
| 215 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
| 216 it != completed_tasks_.end(); ++it) { | |
| 217 internal::WorkerPoolTask* task = it->get(); | |
| 218 new_pending_tasks.take_and_erase(task); | |
| 219 } | |
| 220 | |
| 221 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. | |
| 222 for (GraphNodeMap::iterator it = pending_tasks_.begin(); | |
| 223 it != pending_tasks_.end(); ++it) { | |
| 224 internal::WorkerPoolTask* task = it->first; | |
| 225 | |
| 226 // Task has completed if not present in |new_pending_tasks|. | |
| 227 if (!new_pending_tasks.contains(task)) | |
| 228 completed_tasks_.push_back(task); | |
| 229 } | |
| 230 | |
| 231 // Build new running task set. | |
| 232 for (GraphNodeMap::iterator it = running_tasks_.begin(); | |
| 233 it != running_tasks_.end(); ++it) { | |
| 234 internal::WorkerPoolTask* task = it->first; | |
| 235 // Transfer scheduled task value from |new_pending_tasks| to | |
| 236 // |new_running_tasks| if currently running. Value must be set to | |
| 237 // NULL if |new_pending_tasks| doesn't contain task. This does | |
| 238 // the right in both cases. | |
| 239 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
| 240 } | |
| 241 | |
| 242 // Build new "ready to run" tasks queue. | |
| 243 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); | |
| 244 it != new_pending_tasks.end(); ++it) { | |
| 245 internal::WorkerPoolTask* task = it->first; | |
| 246 | |
| 247 // Completed tasks should not exist in |new_pending_tasks|. | |
| 248 DCHECK(!task->HasFinishedRunning()); | |
| 249 | |
| 250 // Call DidSchedule() to indicate that this task has been scheduled. | |
| 251 // Note: This is only for debugging purposes. | |
| 252 task->DidSchedule(); | |
| 253 | |
| 254 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); | |
| 255 if (task->IsReadyToRun()) | |
| 256 new_ready_to_run_tasks[it->second->priority()] = task; | |
| 257 } | |
| 258 | |
| 259 // Swap task sets. | |
| 260 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
| 261 pending_tasks_.swap(new_pending_tasks); | |
| 262 running_tasks_.swap(new_running_tasks); | |
| 263 ready_to_run_tasks_.swap(new_ready_to_run_tasks); | |
| 264 | |
| 265 // If there is more work available, wake up worker thread. | |
| 266 if (!ready_to_run_tasks_.empty()) | |
| 267 has_ready_to_run_tasks_cv_.Signal(); | |
| 268 } | |
| 269 } | |
| 270 | |
| 271 void WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { | |
| 272 base::AutoLock lock(lock_); | |
| 273 | |
| 274 DCHECK_EQ(0u, completed_tasks->size()); | |
| 275 completed_tasks->swap(completed_tasks_); | |
| 276 } | |
| 277 | |
| 278 void WorkerPool::Inner::Run() { | |
| 279 #if defined(OS_ANDROID) | |
| 280 base::PlatformThread::SetThreadPriority( | |
| 281 base::PlatformThread::CurrentHandle(), | |
| 282 base::kThreadPriority_Background); | |
| 283 #endif | |
| 284 | |
| 285 base::AutoLock lock(lock_); | |
| 286 | |
| 287 // Get a unique thread index. | |
| 288 int thread_index = next_thread_index_++; | |
| 289 | |
| 290 while (true) { | |
| 291 if (ready_to_run_tasks_.empty()) { | |
| 292 // Exit when shutdown is set and no more tasks are pending. | |
| 293 if (shutdown_ && pending_tasks_.empty()) | |
| 294 break; | |
| 295 | |
| 296 // Wait for more tasks. | |
| 297 has_ready_to_run_tasks_cv_.Wait(); | |
| 298 continue; | |
| 299 } | |
| 300 | |
| 301 // Take top priority task from |ready_to_run_tasks_|. | |
| 302 scoped_refptr<internal::WorkerPoolTask> task( | |
| 303 ready_to_run_tasks_.begin()->second); | |
| 304 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); | |
| 305 | |
| 306 // Move task from |pending_tasks_| to |running_tasks_|. | |
| 307 DCHECK(pending_tasks_.contains(task.get())); | |
| 308 DCHECK(!running_tasks_.contains(task.get())); | |
| 309 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); | |
| 310 | |
| 311 // There may be more work available, so wake up another worker thread. | |
| 312 has_ready_to_run_tasks_cv_.Signal(); | |
| 313 | |
| 314 // Call WillRun() before releasing |lock_| and running task. | |
| 315 task->WillRun(); | |
| 316 | |
| 317 { | |
| 318 base::AutoUnlock unlock(lock_); | |
| 319 | |
| 320 task->RunOnThread(thread_index); | |
| 321 } | |
| 322 | |
| 323 // This will mark task as finished running. | |
| 324 task->DidRun(); | |
| 325 | |
| 326 // Now iterate over all dependents to check if they are ready to run. | |
| 327 scoped_ptr<GraphNode> node = running_tasks_.take_and_erase(task.get()); | |
| 328 if (node) { | |
| 329 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 330 for (TaskVector::const_iterator it = node->dependents().begin(); | |
| 331 it != node->dependents().end(); ++it) { | |
| 332 GraphNodeMap::iterator dependent_it = pending_tasks_.find(it->get()); | |
| 333 DCHECK(dependent_it != pending_tasks_.end()); | |
| 334 | |
| 335 internal::WorkerPoolTask* dependent = dependent_it->first; | |
| 336 if (!dependent->IsReadyToRun()) | |
| 337 continue; | |
| 338 | |
| 339 // Task is ready. Add it to |ready_to_run_tasks_|. | |
| 340 GraphNode* dependent_node = dependent_it->second; | |
| 341 unsigned priority = dependent_node->priority(); | |
| 342 DCHECK(!ready_to_run_tasks_.count(priority) || | |
| 343 ready_to_run_tasks_[priority] == dependent); | |
| 344 ready_to_run_tasks_[priority] = dependent; | |
| 345 } | |
| 346 } | |
| 347 | |
| 348 // Finally add task to |completed_tasks_|. | |
| 349 completed_tasks_.push_back(task); | |
| 350 } | |
| 351 | |
| 352 // We noticed we should exit. Wake up the next worker so it knows it should | |
| 353 // exit as well (because the Shutdown() code only signals once). | |
| 354 has_ready_to_run_tasks_cv_.Signal(); | |
| 355 } | |
| 356 | |
| 357 WorkerPool::GraphNode::GraphNode( | |
| 358 internal::WorkerPoolTask* dependent, unsigned priority) | |
| 359 : priority_(priority) { | |
| 360 if (dependent) | |
| 361 dependents_.push_back(dependent); | |
| 362 } | |
| 363 | |
| 364 WorkerPool::GraphNode::~GraphNode() { | |
| 365 } | |
| 366 | |
| 367 void WorkerPool::GraphNode::AddDependent(internal::WorkerPoolTask* dependent) { | |
| 368 DCHECK(dependent); | |
| 369 dependents_.push_back(dependent); | |
| 370 } | |
| 371 | |
| 372 WorkerPool::WorkerPool(size_t num_threads, | |
| 373 const std::string& thread_name_prefix) | |
| 374 : in_dispatch_completion_callbacks_(false), | |
| 375 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) { | |
| 376 } | |
| 377 | |
| 378 WorkerPool::~WorkerPool() { | |
| 379 } | |
| 380 | |
| 381 void WorkerPool::Shutdown() { | |
| 382 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); | |
| 383 | |
| 384 DCHECK(!in_dispatch_completion_callbacks_); | |
| 385 | |
| 386 inner_->Shutdown(); | |
| 387 } | |
| 388 | |
| 389 void WorkerPool::CheckForCompletedTasks() { | |
| 390 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | |
| 391 | |
| 392 DCHECK(!in_dispatch_completion_callbacks_); | |
| 393 | |
| 394 TaskDeque completed_tasks; | |
| 395 inner_->CollectCompletedTasks(&completed_tasks); | |
| 396 DispatchCompletionCallbacks(&completed_tasks); | |
| 397 } | |
| 398 | |
| 399 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { | |
| 400 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | |
| 401 | |
| 402 // Worker pool instance is not reentrant while processing completed tasks. | |
| 403 in_dispatch_completion_callbacks_ = true; | |
| 404 | |
| 405 while (!completed_tasks->empty()) { | |
| 406 internal::WorkerPoolTask* task = completed_tasks->front().get(); | |
| 407 | |
| 408 task->DidComplete(); | |
| 409 task->DispatchCompletionCallback(); | |
| 410 | |
| 411 completed_tasks->pop_front(); | |
| 412 } | |
| 413 | |
| 414 in_dispatch_completion_callbacks_ = false; | |
| 415 } | |
| 416 | |
| 417 void WorkerPool::SetTaskGraph(TaskGraph* graph) { | |
| 418 TRACE_EVENT0("cc", "WorkerPool::SetTaskGraph"); | |
| 419 | |
| 420 DCHECK(!in_dispatch_completion_callbacks_); | |
| 421 | |
| 422 inner_->SetTaskGraph(graph); | |
| 423 } | |
| 424 | |
| 425 // static | |
| 426 unsigned WorkerPool::BuildTaskGraphRecursive( | |
| 427 internal::WorkerPoolTask* task, | |
| 428 internal::WorkerPoolTask* dependent, | |
| 429 unsigned priority, | |
| 430 TaskGraph* tasks) { | |
| 431 // Skip sub-tree if task has already completed. | |
| 432 if (task->HasCompleted()) | |
| 433 return priority; | |
| 434 | |
| 435 GraphNodeMap::iterator it = tasks->find(task); | |
| 436 if (it != tasks->end()) { | |
| 437 it->second->AddDependent(dependent); | |
| 438 return priority; | |
| 439 } | |
| 440 | |
| 441 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 442 for (TaskVector::iterator dependency_it = task->dependencies().begin(); | |
| 443 dependency_it != task->dependencies().end(); ++dependency_it) { | |
| 444 internal::WorkerPoolTask* dependency = dependency_it->get(); | |
| 445 priority = BuildTaskGraphRecursive(dependency, task, priority, tasks); | |
| 446 } | |
| 447 | |
| 448 tasks->set(task, make_scoped_ptr(new GraphNode(dependent, priority))); | |
| 449 | |
| 450 return priority + 1; | |
| 451 } | |
| 452 | |
| 453 // static | |
| 454 void WorkerPool::BuildTaskGraph( | |
| 455 internal::WorkerPoolTask* root, TaskGraph* tasks) { | |
| 456 const unsigned kBasePriority = 0u; | |
| 457 if (root) | |
| 458 BuildTaskGraphRecursive(root, NULL, kBasePriority, tasks); | |
| 459 } | |
| 460 | |
| 461 } // namespace cc | |
| OLD | NEW |