Index: cc/base/worker_pool.cc |
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc |
index 59dc828980e9b211b415ae95c1bd55d3a65458a3..bbf51c227b89b6b90263678f4c970e49f0b6914a 100644 |
--- a/cc/base/worker_pool.cc |
+++ b/cc/base/worker_pool.cc |
@@ -4,46 +4,93 @@ |
#include "cc/base/worker_pool.h" |
-#include <algorithm> |
+#if defined(OS_ANDROID) |
+// TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
+#include <sys/resource.h> |
+#endif |
+ |
+#include <map> |
#include "base/bind.h" |
#include "base/debug/trace_event.h" |
+#include "base/hash_tables.h" |
#include "base/stringprintf.h" |
-#include "base/synchronization/condition_variable.h" |
#include "base/threading/simple_thread.h" |
#include "base/threading/thread_restrictions.h" |
+#include "cc/base/scoped_ptr_deque.h" |
+#include "cc/base/scoped_ptr_hash_map.h" |
+ |
+#if defined(COMPILER_GCC) |
+namespace BASE_HASH_NAMESPACE { |
+template <> struct hash<cc::internal::WorkerPoolTask*> { |
+ size_t operator()(cc::internal::WorkerPoolTask* ptr) const { |
+ return hash<size_t>()(reinterpret_cast<size_t>(ptr)); |
+ } |
+}; |
+} // namespace BASE_HASH_NAMESPACE |
+#endif // COMPILER |
namespace cc { |
-namespace { |
- |
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
- public: |
- WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
- const base::Closure& reply) |
- : internal::WorkerPoolTask(reply), |
- task_(task) {} |
+namespace internal { |
- virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
- task_.Run(); |
- } |
+WorkerPoolTask::WorkerPoolTask() |
+ : did_schedule_(false), |
+ did_run_(false), |
+ did_complete_(false) { |
+} |
- private: |
- WorkerPool::Callback task_; |
-}; |
+WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) |
+ : did_schedule_(false), |
+ did_run_(false), |
+ did_complete_(false) { |
+ dependencies_.swap(*dependencies); |
+} |
-} // namespace |
+WorkerPoolTask::~WorkerPoolTask() { |
+ DCHECK_EQ(did_schedule_, did_complete_); |
+ DCHECK(!did_run_ || did_schedule_); |
+ DCHECK(!did_run_ || did_complete_); |
+} |
-namespace internal { |
+void WorkerPoolTask::DidSchedule() { |
+ DCHECK(!did_complete_); |
+ did_schedule_ = true; |
+} |
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
+void WorkerPoolTask::WillRun() { |
+ DCHECK(did_schedule_); |
+ DCHECK(!did_complete_); |
+ DCHECK(!did_run_); |
} |
-WorkerPoolTask::~WorkerPoolTask() { |
+void WorkerPoolTask::DidRun() { |
+ did_run_ = true; |
} |
void WorkerPoolTask::DidComplete() { |
- reply_.Run(); |
+ DCHECK(did_schedule_); |
+ DCHECK(!did_complete_); |
+ did_complete_ = true; |
+} |
+ |
+bool WorkerPoolTask::IsReadyToRun() const { |
+ // TODO(reveman): Use counter to improve performance. |
+ for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); |
+ it != dependencies_.rend(); ++it) { |
+ WorkerPoolTask* dependency = *it; |
+ if (!dependency->HasFinishedRunning()) |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+bool WorkerPoolTask::HasFinishedRunning() const { |
+ return did_run_; |
+} |
+ |
+bool WorkerPoolTask::HasCompleted() const { |
+ return did_complete_; |
} |
} // namespace internal |
@@ -60,17 +107,51 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
void Shutdown(); |
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
+ // Schedule running of |root| task and all its dependencies. Tasks |
+ // previously scheduled but no longer needed to run |root| will be |
+ // canceled unless already running. Canceled tasks are moved to |
+ // |completed_tasks_| without being run. The result is that once |
+ // scheduled, a task is guaranteed to end up in the |completed_tasks_| |
+ // queue even if they later get canceled by another call to |
+ // ScheduleTasks(). |
+ void ScheduleTasks(internal::WorkerPoolTask* root); |
- // Appends all completed tasks to worker pool's completed tasks queue |
- // and returns true if idle. |
- bool CollectCompletedTasks(); |
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
+ bool CollectCompletedTasks(TaskDeque* completed_tasks); |
private: |
- // Appends all completed tasks to |completed_tasks|. Lock must |
- // already be acquired before calling this function. |
- bool AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
+ class ScheduledTask { |
+ public: |
+ ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) |
+ : priority_(priority) { |
+ if (dependent) |
+ dependents_.push_back(dependent); |
+ } |
+ |
+ internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } |
+ unsigned priority() const { return priority_; } |
+ |
+ private: |
+ internal::WorkerPoolTask::TaskVector dependents_; |
+ unsigned priority_; |
+ }; |
+ typedef internal::WorkerPoolTask* ScheduledTaskMapKey; |
+ typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> |
+ ScheduledTaskMap; |
+ |
+ // This builds a ScheduledTaskMap from a root task. |
+ static unsigned BuildScheduledTaskMapRecursive( |
+ internal::WorkerPoolTask* task, |
+ internal::WorkerPoolTask* dependent, |
+ unsigned priority, |
+ ScheduledTaskMap* scheduled_tasks); |
+ static void BuildScheduledTaskMap( |
+ internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); |
+ |
+ // Collect all completed tasks by swapping the contents of |
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
+ // before calling this function. Returns true if idle. |
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
// Schedule an OnIdleOnOriginThread callback if not already pending. |
// Lock must already be acquired before calling this function. |
@@ -90,8 +171,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
mutable base::Lock lock_; |
// Condition variable that is waited on by worker threads until new |
- // tasks are posted or shutdown starts. |
- base::ConditionVariable has_pending_tasks_cv_; |
+ // tasks are ready to run or shutdown starts. |
+ base::ConditionVariable has_ready_to_run_tasks_cv_; |
// Target message loop used for posting callbacks. |
scoped_refptr<base::MessageLoopProxy> origin_loop_; |
@@ -106,15 +187,25 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
// loop index is 0. |
unsigned next_thread_index_; |
- // Number of tasks currently running. |
- unsigned running_task_count_; |
- |
// Set during shutdown. Tells workers to exit when no more tasks |
// are pending. |
bool shutdown_; |
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
- TaskDeque pending_tasks_; |
+ // The root task that is a dependent of all other tasks. |
+ scoped_refptr<internal::WorkerPoolTask> root_; |
+ |
+ // This set contains all pending tasks. |
+ ScheduledTaskMap pending_tasks_; |
+ |
+ // Ordered set of tasks that are ready to run. |
+ // TODO(reveman): priority_queue might be more efficient. |
+ typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; |
+ TaskMap ready_to_run_tasks_; |
+ |
+ // This set contains all currently running tasks. |
+ ScheduledTaskMap running_tasks_; |
+ |
+ // Completed tasks not yet collected by origin thread. |
TaskDeque completed_tasks_; |
ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
@@ -127,25 +218,24 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
const std::string& thread_name_prefix) |
: worker_pool_on_origin_thread_(worker_pool), |
lock_(), |
- has_pending_tasks_cv_(&lock_), |
+ has_ready_to_run_tasks_cv_(&lock_), |
origin_loop_(base::MessageLoopProxy::current()), |
weak_ptr_factory_(this), |
on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
weak_ptr_factory_.GetWeakPtr())), |
on_idle_pending_(false), |
next_thread_index_(0), |
- running_task_count_(0), |
shutdown_(false) { |
base::AutoLock lock(lock_); |
while (workers_.size() < num_threads) { |
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
new base::DelegateSimpleThread( |
- this, |
- thread_name_prefix + |
- base::StringPrintf( |
- "Worker%u", |
- static_cast<unsigned>(workers_.size() + 1)).c_str())); |
+ this, |
+ thread_name_prefix + |
+ base::StringPrintf( |
+ "Worker%u", |
+ static_cast<unsigned>(workers_.size() + 1)).c_str())); |
worker->Start(); |
workers_.push_back(worker.Pass()); |
} |
@@ -156,12 +246,10 @@ WorkerPool::Inner::~Inner() { |
DCHECK(shutdown_); |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
DCHECK_EQ(0u, pending_tasks_.size()); |
+ DCHECK_EQ(0u, ready_to_run_tasks_.size()); |
+ DCHECK_EQ(0u, running_tasks_.size()); |
DCHECK_EQ(0u, completed_tasks_.size()); |
- DCHECK_EQ(0u, running_task_count_); |
} |
void WorkerPool::Inner::Shutdown() { |
@@ -173,7 +261,7 @@ void WorkerPool::Inner::Shutdown() { |
// Wake up a worker so it knows it should exit. This will cause all workers |
// to exit as each will wake up another worker before exiting. |
- has_pending_tasks_cv_.Signal(); |
+ has_ready_to_run_tasks_cv_.Signal(); |
} |
while (workers_.size()) { |
@@ -183,32 +271,100 @@ void WorkerPool::Inner::Shutdown() { |
base::ThreadRestrictions::ScopedAllowIO allow_io; |
worker->Join(); |
} |
+ |
+ // Cancel any pending OnIdle callback. |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
} |
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- base::AutoLock lock(lock_); |
+void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { |
+ // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. |
+ DCHECK(!root || !shutdown_); |
+ |
+ scoped_refptr<internal::WorkerPoolTask> new_root(root); |
+ |
+ ScheduledTaskMap new_pending_tasks; |
+ ScheduledTaskMap new_running_tasks; |
+ TaskMap new_ready_to_run_tasks; |
+ |
+ // Build scheduled task map before acquiring |lock_|. |
+ if (root) |
+ BuildScheduledTaskMap(root, &new_pending_tasks); |
+ |
+ { |
+ base::AutoLock lock(lock_); |
+ |
+ // First remove all completed tasks from |new_pending_tasks|. |
+ for (TaskDeque::iterator it = completed_tasks_.begin(); |
+ it != completed_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = *it; |
+ new_pending_tasks.take_and_erase(task); |
+ } |
- pending_tasks_.push_back(task.Pass()); |
+ // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. |
+ for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); |
+ it != pending_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = it->first; |
- // There is more work available, so wake up worker thread. |
- has_pending_tasks_cv_.Signal(); |
+ // Task has completed if not present in |new_pending_tasks|. |
+ if (!new_pending_tasks.contains(task)) |
+ completed_tasks_.push_back(task); |
+ } |
+ |
+ // Build new running task set. |
+ for (ScheduledTaskMap::iterator it = running_tasks_.begin(); |
+ it != running_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = it->first; |
+ // Transfer scheduled task value from |new_pending_tasks| to |
+ // |new_running_tasks| if currently running. Value must be set to |
+ // NULL if |new_pending_tasks| doesn't contain task. This does |
+ // the right in both cases. |
+ new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); |
+ } |
+ |
+ // Build new "ready to run" tasks queue. |
+ for (ScheduledTaskMap::iterator it = new_pending_tasks.begin(); |
+ it != new_pending_tasks.end(); ++it) { |
+ internal::WorkerPoolTask* task = it->first; |
+ |
+ // Completed tasks should not exist in |new_pending_tasks|. |
+ DCHECK(!task->HasFinishedRunning()); |
+ |
+ // Call DidSchedule() to indicate that this task has been scheduled. |
+ // Note: This is only for debugging purposes. |
+ task->DidSchedule(); |
+ |
+ DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); |
+ if (task->IsReadyToRun()) |
+ new_ready_to_run_tasks[it->second->priority()] = task; |
+ } |
+ |
+ // Swap root taskand task sets. |
+ // Note: old tasks are intentionally destroyed after releasing |lock_|. |
+ root_.swap(new_root); |
+ pending_tasks_.swap(new_pending_tasks); |
+ running_tasks_.swap(new_running_tasks); |
+ ready_to_run_tasks_.swap(new_ready_to_run_tasks); |
+ |
+ // If there is more work available, wake up worker thread. |
+ if (!ready_to_run_tasks_.empty()) |
+ has_ready_to_run_tasks_cv_.Signal(); |
+ } |
} |
-bool WorkerPool::Inner::CollectCompletedTasks() { |
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
base::AutoLock lock(lock_); |
- return AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ return CollectCompletedTasksWithLockAcquired(completed_tasks); |
} |
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
+ TaskDeque* completed_tasks) { |
lock_.AssertAcquired(); |
- while (completed_tasks_.size()) |
- completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
+ DCHECK_EQ(0u, completed_tasks->size()); |
+ completed_tasks->swap(completed_tasks_); |
- return !running_task_count_ && pending_tasks_.empty(); |
+ return running_tasks_.empty() && pending_tasks_.empty(); |
} |
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
@@ -221,6 +377,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
} |
void WorkerPool::Inner::OnIdleOnOriginThread() { |
+ TaskDeque completed_tasks; |
+ |
{ |
base::AutoLock lock(lock_); |
@@ -228,14 +386,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { |
on_idle_pending_ = false; |
// Early out if no longer idle. |
- if (running_task_count_ || !pending_tasks_.empty()) |
+ if (!running_tasks_.empty() || !pending_tasks_.empty()) |
return; |
- AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ CollectCompletedTasksWithLockAcquired(&completed_tasks); |
} |
- worker_pool_on_origin_thread_->OnIdle(); |
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
} |
void WorkerPool::Inner::Run() { |
@@ -251,29 +408,37 @@ void WorkerPool::Inner::Run() { |
int thread_index = next_thread_index_++; |
while (true) { |
- if (pending_tasks_.empty()) { |
- // Exit when shutdown is set and no more tasks are pending. |
- if (shutdown_) |
- break; |
- |
- // Schedule an idle callback if requested and not pending. |
- if (!running_task_count_) |
- ScheduleOnIdleWithLockAcquired(); |
- |
- // Wait for new pending tasks. |
- has_pending_tasks_cv_.Wait(); |
+ if (ready_to_run_tasks_.empty()) { |
+ if (pending_tasks_.empty()) { |
+ // Exit when shutdown is set and no more tasks are pending. |
+ if (shutdown_) |
+ break; |
+ |
+ // Schedule an idle callback if no tasks are running. |
+ if (running_tasks_.empty()) |
+ ScheduleOnIdleWithLockAcquired(); |
+ } |
+ |
+ // Wait for more tasks. |
+ has_ready_to_run_tasks_cv_.Wait(); |
continue; |
} |
- // Get next task. |
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
+ // Take top priority task from |ready_to_run_tasks_|. |
+ scoped_refptr<internal::WorkerPoolTask> task( |
+ ready_to_run_tasks_.begin()->second); |
+ ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); |
+ |
+ // Move task from |pending_tasks_| to |running_tasks_|. |
+ DCHECK(pending_tasks_.contains(task)); |
+ DCHECK(!running_tasks_.contains(task)); |
+ running_tasks_.set(task, pending_tasks_.take_and_erase(task)); |
- // Increment |running_task_count_| before starting to run task. |
- running_task_count_++; |
+ // There may be more work available, so wake up another worker thread. |
+ has_ready_to_run_tasks_cv_.Signal(); |
- // There may be more work available, so wake up another |
- // worker thread. |
- has_pending_tasks_cv_.Signal(); |
+ // Call WillRun() before releasing |lock_| and running task. |
+ task->WillRun(); |
{ |
base::AutoUnlock unlock(lock_); |
@@ -281,15 +446,95 @@ void WorkerPool::Inner::Run() { |
task->RunOnThread(thread_index); |
} |
- completed_tasks_.push_back(task.Pass()); |
+ // This will mark task as finished running. |
+ task->DidRun(); |
+ |
+ // Now iterate over all dependents to check if they are ready to run. |
+ scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( |
+ task); |
+ if (scheduled_task) { |
+ typedef internal::WorkerPoolTask::TaskVector TaskVector; |
+ for (TaskVector::iterator it = scheduled_task->dependents().begin(); |
+ it != scheduled_task->dependents().end(); ++it) { |
+ internal::WorkerPoolTask* dependent = *it; |
+ if (!dependent->IsReadyToRun()) |
+ continue; |
+ |
+ // Task is ready. Add it to |ready_to_run_tasks_|. |
+ DCHECK(pending_tasks_.contains(dependent)); |
+ unsigned priority = pending_tasks_.get(dependent)->priority(); |
+ DCHECK(!ready_to_run_tasks_.count(priority) || |
+ ready_to_run_tasks_[priority] == dependent); |
+ ready_to_run_tasks_[priority] = dependent; |
+ } |
+ } |
- // Decrement |running_task_count_| now that we are done running task. |
- running_task_count_--; |
+ // Finally add task to |completed_tasks_|. |
+ completed_tasks_.push_back(task); |
} |
// We noticed we should exit. Wake up the next worker so it knows it should |
// exit as well (because the Shutdown() code only signals once). |
- has_pending_tasks_cv_.Signal(); |
+ has_ready_to_run_tasks_cv_.Signal(); |
+} |
+ |
+// BuildScheduledTaskMap() takes a task tree as input and constructs |
+// a unique set of tasks with edges between dependencies pointing in |
+// the direction of the dependents. Each task is given a unique priority |
+// which is currently the same as the DFS traversal order. |
+// |
+// Input: Output: |
+// |
+// root task4 Task | Priority (lower is better) |
+// / \ / \ -------+--------------------------- |
+// task1 task2 task3 task2 root | 4 |
+// | | | | task1 | 2 |
+// task3 | task1 | task2 | 3 |
+// | | \ / task3 | 1 |
+// task4 task4 root task4 | 0 |
+// |
+// The output can be used to efficiently maintain a queue of |
+// "ready to run" tasks. |
+ |
+// static |
+unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( |
+ internal::WorkerPoolTask* task, |
+ internal::WorkerPoolTask* dependent, |
+ unsigned priority, |
+ ScheduledTaskMap* scheduled_tasks) { |
+ // Skip sub-tree if task has already completed. |
+ if (task->HasCompleted()) |
+ return priority; |
+ |
+ ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); |
+ if (scheduled_it != scheduled_tasks->end()) { |
+ DCHECK(dependent); |
+ scheduled_it->second->dependents().push_back(dependent); |
+ return priority; |
+ } |
+ |
+ typedef internal::WorkerPoolTask::TaskVector TaskVector; |
+ for (TaskVector::iterator it = task->dependencies().begin(); |
+ it != task->dependencies().end(); ++it) { |
+ internal::WorkerPoolTask* dependency = *it; |
+ priority = BuildScheduledTaskMapRecursive( |
+ dependency, task, priority, scheduled_tasks); |
+ } |
+ |
+ scheduled_tasks->set(task, |
+ make_scoped_ptr(new ScheduledTask(dependent, |
+ priority))); |
+ |
+ return priority + 1; |
+} |
+ |
+// static |
+void WorkerPool::Inner::BuildScheduledTaskMap( |
+ internal::WorkerPoolTask* root, |
+ ScheduledTaskMap* scheduled_tasks) { |
+ const unsigned kBasePriority = 0u; |
+ DCHECK(root); |
+ BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); |
} |
WorkerPool::WorkerPool(size_t num_threads, |
@@ -297,83 +542,105 @@ WorkerPool::WorkerPool(size_t num_threads, |
const std::string& thread_name_prefix) |
: client_(NULL), |
origin_loop_(base::MessageLoopProxy::current()), |
- weak_ptr_factory_(this), |
check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
check_for_completed_tasks_pending_(false), |
+ in_dispatch_completion_callbacks_(false), |
inner_(make_scoped_ptr(new Inner(this, |
num_threads, |
thread_name_prefix))) { |
} |
WorkerPool::~WorkerPool() { |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
- DCHECK_EQ(0u, completed_tasks_.size()); |
} |
void WorkerPool::Shutdown() { |
+ TRACE_EVENT0("cc", "WorkerPool::Shutdown"); |
+ |
+ DCHECK(!in_dispatch_completion_callbacks_); |
+ |
inner_->Shutdown(); |
- inner_->CollectCompletedTasks(); |
- DispatchCompletionCallbacks(); |
-} |
-void WorkerPool::PostTaskAndReply( |
- const Callback& task, const base::Closure& reply) { |
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
- task, |
- reply)).PassAs<internal::WorkerPoolTask>()); |
+ TaskDeque completed_tasks; |
+ inner_->CollectCompletedTasks(&completed_tasks); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::OnIdle() { |
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
- DispatchCompletionCallbacks(); |
+ DCHECK(!in_dispatch_completion_callbacks_); |
+ |
+ DispatchCompletionCallbacks(completed_tasks); |
+ |
+ // Cancel any pending check for completed tasks. |
+ check_for_completed_tasks_callback_.Cancel(); |
+ check_for_completed_tasks_pending_ = false; |
} |
void WorkerPool::ScheduleCheckForCompletedTasks() { |
if (check_for_completed_tasks_pending_) |
return; |
+ check_for_completed_tasks_callback_.Reset( |
+ base::Bind(&WorkerPool::CheckForCompletedTasks, |
+ base::Unretained(this))); |
origin_loop_->PostDelayedTask( |
FROM_HERE, |
- base::Bind(&WorkerPool::CheckForCompletedTasks, |
- weak_ptr_factory_.GetWeakPtr()), |
+ check_for_completed_tasks_callback_.callback(), |
check_for_completed_tasks_delay_); |
check_for_completed_tasks_pending_ = true; |
} |
void WorkerPool::CheckForCompletedTasks() { |
TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
- DCHECK(check_for_completed_tasks_pending_); |
+ |
+ DCHECK(!in_dispatch_completion_callbacks_); |
+ |
+ check_for_completed_tasks_callback_.Cancel(); |
check_for_completed_tasks_pending_ = false; |
+ TaskDeque completed_tasks; |
+ |
// Schedule another check for completed tasks if not idle. |
- if (!inner_->CollectCompletedTasks()) |
+ if (!inner_->CollectCompletedTasks(&completed_tasks)) |
ScheduleCheckForCompletedTasks(); |
- DispatchCompletionCallbacks(); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::DispatchCompletionCallbacks() { |
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
- if (completed_tasks_.empty()) |
+ // Early out when |completed_tasks| is empty to prevent unnecessary |
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
+ if (completed_tasks->empty()) |
return; |
- while (completed_tasks_.size()) { |
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
+ // Worker pool instance is not reentrant while processing completed tasks. |
+ in_dispatch_completion_callbacks_ = true; |
+ |
+ while (!completed_tasks->empty()) { |
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
+ completed_tasks->pop_front(); |
task->DidComplete(); |
+ task->DispatchCompletionCallback(); |
} |
+ in_dispatch_completion_callbacks_ = false; |
+ |
DCHECK(client_); |
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
} |
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- // Schedule check for completed tasks if not pending. |
- ScheduleCheckForCompletedTasks(); |
+void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { |
+ TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
+ |
+ DCHECK(!in_dispatch_completion_callbacks_); |
+ |
+ // Schedule check for completed tasks. |
+ if (root) |
+ ScheduleCheckForCompletedTasks(); |
- inner_->PostTask(task.Pass()); |
+ inner_->ScheduleTasks(root); |
} |
} // namespace cc |