Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1331)

Unified Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Check and prevent worker pool reentrancy during dispatch of completion callbacks Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: cc/base/worker_pool.cc
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
index 59dc828980e9b211b415ae95c1bd55d3a65458a3..bbf51c227b89b6b90263678f4c970e49f0b6914a 100644
--- a/cc/base/worker_pool.cc
+++ b/cc/base/worker_pool.cc
@@ -4,46 +4,93 @@
#include "cc/base/worker_pool.h"
-#include <algorithm>
+#if defined(OS_ANDROID)
+// TODO(epenner): Move thread priorities to base. (crbug.com/170549)
+#include <sys/resource.h>
+#endif
+
+#include <map>
#include "base/bind.h"
#include "base/debug/trace_event.h"
+#include "base/hash_tables.h"
#include "base/stringprintf.h"
-#include "base/synchronization/condition_variable.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_restrictions.h"
+#include "cc/base/scoped_ptr_deque.h"
+#include "cc/base/scoped_ptr_hash_map.h"
+
+#if defined(COMPILER_GCC)
+namespace BASE_HASH_NAMESPACE {
+template <> struct hash<cc::internal::WorkerPoolTask*> {
+ size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
+ return hash<size_t>()(reinterpret_cast<size_t>(ptr));
+ }
+};
+} // namespace BASE_HASH_NAMESPACE
+#endif // COMPILER
namespace cc {
-namespace {
-
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
- public:
- WorkerPoolTaskImpl(const WorkerPool::Callback& task,
- const base::Closure& reply)
- : internal::WorkerPoolTask(reply),
- task_(task) {}
+namespace internal {
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- task_.Run();
- }
+WorkerPoolTask::WorkerPoolTask()
+ : did_schedule_(false),
+ did_run_(false),
+ did_complete_(false) {
+}
- private:
- WorkerPool::Callback task_;
-};
+WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
+ : did_schedule_(false),
+ did_run_(false),
+ did_complete_(false) {
+ dependencies_.swap(*dependencies);
+}
-} // namespace
+WorkerPoolTask::~WorkerPoolTask() {
+ DCHECK_EQ(did_schedule_, did_complete_);
+ DCHECK(!did_run_ || did_schedule_);
+ DCHECK(!did_run_ || did_complete_);
+}
-namespace internal {
+void WorkerPoolTask::DidSchedule() {
+ DCHECK(!did_complete_);
+ did_schedule_ = true;
+}
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
+void WorkerPoolTask::WillRun() {
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ DCHECK(!did_run_);
}
-WorkerPoolTask::~WorkerPoolTask() {
+void WorkerPoolTask::DidRun() {
+ did_run_ = true;
}
void WorkerPoolTask::DidComplete() {
- reply_.Run();
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ did_complete_ = true;
+}
+
+bool WorkerPoolTask::IsReadyToRun() const {
+ // TODO(reveman): Use counter to improve performance.
+ for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
+ it != dependencies_.rend(); ++it) {
+ WorkerPoolTask* dependency = *it;
+ if (!dependency->HasFinishedRunning())
+ return false;
+ }
+ return true;
+}
+
+bool WorkerPoolTask::HasFinishedRunning() const {
+ return did_run_;
+}
+
+bool WorkerPoolTask::HasCompleted() const {
+ return did_complete_;
}
} // namespace internal
@@ -60,17 +107,51 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
void Shutdown();
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
+ // Schedule running of |root| task and all its dependencies. Tasks
+ // previously scheduled but no longer needed to run |root| will be
+ // canceled unless already running. Canceled tasks are moved to
+ // |completed_tasks_| without being run. The result is that once
+ // scheduled, a task is guaranteed to end up in the |completed_tasks_|
+ // queue even if they later get canceled by another call to
+ // ScheduleTasks().
+ void ScheduleTasks(internal::WorkerPoolTask* root);
- // Appends all completed tasks to worker pool's completed tasks queue
- // and returns true if idle.
- bool CollectCompletedTasks();
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle.
+ bool CollectCompletedTasks(TaskDeque* completed_tasks);
private:
- // Appends all completed tasks to |completed_tasks|. Lock must
- // already be acquired before calling this function.
- bool AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
+ class ScheduledTask {
+ public:
+ ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
+ : priority_(priority) {
+ if (dependent)
+ dependents_.push_back(dependent);
+ }
+
+ internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
+ unsigned priority() const { return priority_; }
+
+ private:
+ internal::WorkerPoolTask::TaskVector dependents_;
+ unsigned priority_;
+ };
+ typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
+ typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
+ ScheduledTaskMap;
+
+ // This builds a ScheduledTaskMap from a root task.
+ static unsigned BuildScheduledTaskMapRecursive(
+ internal::WorkerPoolTask* task,
+ internal::WorkerPoolTask* dependent,
+ unsigned priority,
+ ScheduledTaskMap* scheduled_tasks);
+ static void BuildScheduledTaskMap(
+ internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
+
+ // Collect all completed tasks by swapping the contents of
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired
+ // before calling this function. Returns true if idle.
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
// Schedule an OnIdleOnOriginThread callback if not already pending.
// Lock must already be acquired before calling this function.
@@ -90,8 +171,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
mutable base::Lock lock_;
// Condition variable that is waited on by worker threads until new
- // tasks are posted or shutdown starts.
- base::ConditionVariable has_pending_tasks_cv_;
+ // tasks are ready to run or shutdown starts.
+ base::ConditionVariable has_ready_to_run_tasks_cv_;
// Target message loop used for posting callbacks.
scoped_refptr<base::MessageLoopProxy> origin_loop_;
@@ -106,15 +187,25 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
// loop index is 0.
unsigned next_thread_index_;
- // Number of tasks currently running.
- unsigned running_task_count_;
-
// Set during shutdown. Tells workers to exit when no more tasks
// are pending.
bool shutdown_;
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
- TaskDeque pending_tasks_;
+ // The root task that is a dependent of all other tasks.
+ scoped_refptr<internal::WorkerPoolTask> root_;
+
+ // This set contains all pending tasks.
+ ScheduledTaskMap pending_tasks_;
+
+ // Ordered set of tasks that are ready to run.
+ // TODO(reveman): priority_queue might be more efficient.
+ typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
+ TaskMap ready_to_run_tasks_;
+
+ // This set contains all currently running tasks.
+ ScheduledTaskMap running_tasks_;
+
+ // Completed tasks not yet collected by origin thread.
TaskDeque completed_tasks_;
ScopedPtrDeque<base::DelegateSimpleThread> workers_;
@@ -127,25 +218,24 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
const std::string& thread_name_prefix)
: worker_pool_on_origin_thread_(worker_pool),
lock_(),
- has_pending_tasks_cv_(&lock_),
+ has_ready_to_run_tasks_cv_(&lock_),
origin_loop_(base::MessageLoopProxy::current()),
weak_ptr_factory_(this),
on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
weak_ptr_factory_.GetWeakPtr())),
on_idle_pending_(false),
next_thread_index_(0),
- running_task_count_(0),
shutdown_(false) {
base::AutoLock lock(lock_);
while (workers_.size() < num_threads) {
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
new base::DelegateSimpleThread(
- this,
- thread_name_prefix +
- base::StringPrintf(
- "Worker%u",
- static_cast<unsigned>(workers_.size() + 1)).c_str()));
+ this,
+ thread_name_prefix +
+ base::StringPrintf(
+ "Worker%u",
+ static_cast<unsigned>(workers_.size() + 1)).c_str()));
worker->Start();
workers_.push_back(worker.Pass());
}
@@ -156,12 +246,10 @@ WorkerPool::Inner::~Inner() {
DCHECK(shutdown_);
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
DCHECK_EQ(0u, pending_tasks_.size());
+ DCHECK_EQ(0u, ready_to_run_tasks_.size());
+ DCHECK_EQ(0u, running_tasks_.size());
DCHECK_EQ(0u, completed_tasks_.size());
- DCHECK_EQ(0u, running_task_count_);
}
void WorkerPool::Inner::Shutdown() {
@@ -173,7 +261,7 @@ void WorkerPool::Inner::Shutdown() {
// Wake up a worker so it knows it should exit. This will cause all workers
// to exit as each will wake up another worker before exiting.
- has_pending_tasks_cv_.Signal();
+ has_ready_to_run_tasks_cv_.Signal();
}
while (workers_.size()) {
@@ -183,32 +271,100 @@ void WorkerPool::Inner::Shutdown() {
base::ThreadRestrictions::ScopedAllowIO allow_io;
worker->Join();
}
+
+ // Cancel any pending OnIdle callback.
+ weak_ptr_factory_.InvalidateWeakPtrs();
}
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- base::AutoLock lock(lock_);
+void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
+ // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
+ DCHECK(!root || !shutdown_);
+
+ scoped_refptr<internal::WorkerPoolTask> new_root(root);
+
+ ScheduledTaskMap new_pending_tasks;
+ ScheduledTaskMap new_running_tasks;
+ TaskMap new_ready_to_run_tasks;
+
+ // Build scheduled task map before acquiring |lock_|.
+ if (root)
+ BuildScheduledTaskMap(root, &new_pending_tasks);
+
+ {
+ base::AutoLock lock(lock_);
+
+ // First remove all completed tasks from |new_pending_tasks|.
+ for (TaskDeque::iterator it = completed_tasks_.begin();
+ it != completed_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = *it;
+ new_pending_tasks.take_and_erase(task);
+ }
- pending_tasks_.push_back(task.Pass());
+ // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
+ for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
+ it != pending_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
- // There is more work available, so wake up worker thread.
- has_pending_tasks_cv_.Signal();
+ // Task has completed if not present in |new_pending_tasks|.
+ if (!new_pending_tasks.contains(task))
+ completed_tasks_.push_back(task);
+ }
+
+ // Build new running task set.
+ for (ScheduledTaskMap::iterator it = running_tasks_.begin();
+ it != running_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
+ // Transfer scheduled task value from |new_pending_tasks| to
+ // |new_running_tasks| if currently running. Value must be set to
+ // NULL if |new_pending_tasks| doesn't contain task. This does
+ // the right in both cases.
+ new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
+ }
+
+ // Build new "ready to run" tasks queue.
+ for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
+ it != new_pending_tasks.end(); ++it) {
+ internal::WorkerPoolTask* task = it->first;
+
+ // Completed tasks should not exist in |new_pending_tasks|.
+ DCHECK(!task->HasFinishedRunning());
+
+ // Call DidSchedule() to indicate that this task has been scheduled.
+ // Note: This is only for debugging purposes.
+ task->DidSchedule();
+
+ DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
+ if (task->IsReadyToRun())
+ new_ready_to_run_tasks[it->second->priority()] = task;
+ }
+
+ // Swap root taskand task sets.
+ // Note: old tasks are intentionally destroyed after releasing |lock_|.
+ root_.swap(new_root);
+ pending_tasks_.swap(new_pending_tasks);
+ running_tasks_.swap(new_running_tasks);
+ ready_to_run_tasks_.swap(new_ready_to_run_tasks);
+
+ // If there is more work available, wake up worker thread.
+ if (!ready_to_run_tasks_.empty())
+ has_ready_to_run_tasks_cv_.Signal();
+ }
}
-bool WorkerPool::Inner::CollectCompletedTasks() {
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
base::AutoLock lock(lock_);
- return AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ return CollectCompletedTasksWithLockAcquired(completed_tasks);
}
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
+ TaskDeque* completed_tasks) {
lock_.AssertAcquired();
- while (completed_tasks_.size())
- completed_tasks->push_back(completed_tasks_.take_front().Pass());
+ DCHECK_EQ(0u, completed_tasks->size());
+ completed_tasks->swap(completed_tasks_);
- return !running_task_count_ && pending_tasks_.empty();
+ return running_tasks_.empty() && pending_tasks_.empty();
}
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
@@ -221,6 +377,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
}
void WorkerPool::Inner::OnIdleOnOriginThread() {
+ TaskDeque completed_tasks;
+
{
base::AutoLock lock(lock_);
@@ -228,14 +386,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
on_idle_pending_ = false;
// Early out if no longer idle.
- if (running_task_count_ || !pending_tasks_.empty())
+ if (!running_tasks_.empty() || !pending_tasks_.empty())
return;
- AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ CollectCompletedTasksWithLockAcquired(&completed_tasks);
}
- worker_pool_on_origin_thread_->OnIdle();
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
}
void WorkerPool::Inner::Run() {
@@ -251,29 +408,37 @@ void WorkerPool::Inner::Run() {
int thread_index = next_thread_index_++;
while (true) {
- if (pending_tasks_.empty()) {
- // Exit when shutdown is set and no more tasks are pending.
- if (shutdown_)
- break;
-
- // Schedule an idle callback if requested and not pending.
- if (!running_task_count_)
- ScheduleOnIdleWithLockAcquired();
-
- // Wait for new pending tasks.
- has_pending_tasks_cv_.Wait();
+ if (ready_to_run_tasks_.empty()) {
+ if (pending_tasks_.empty()) {
+ // Exit when shutdown is set and no more tasks are pending.
+ if (shutdown_)
+ break;
+
+ // Schedule an idle callback if no tasks are running.
+ if (running_tasks_.empty())
+ ScheduleOnIdleWithLockAcquired();
+ }
+
+ // Wait for more tasks.
+ has_ready_to_run_tasks_cv_.Wait();
continue;
}
- // Get next task.
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
+ // Take top priority task from |ready_to_run_tasks_|.
+ scoped_refptr<internal::WorkerPoolTask> task(
+ ready_to_run_tasks_.begin()->second);
+ ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
+
+ // Move task from |pending_tasks_| to |running_tasks_|.
+ DCHECK(pending_tasks_.contains(task));
+ DCHECK(!running_tasks_.contains(task));
+ running_tasks_.set(task, pending_tasks_.take_and_erase(task));
- // Increment |running_task_count_| before starting to run task.
- running_task_count_++;
+ // There may be more work available, so wake up another worker thread.
+ has_ready_to_run_tasks_cv_.Signal();
- // There may be more work available, so wake up another
- // worker thread.
- has_pending_tasks_cv_.Signal();
+ // Call WillRun() before releasing |lock_| and running task.
+ task->WillRun();
{
base::AutoUnlock unlock(lock_);
@@ -281,15 +446,95 @@ void WorkerPool::Inner::Run() {
task->RunOnThread(thread_index);
}
- completed_tasks_.push_back(task.Pass());
+ // This will mark task as finished running.
+ task->DidRun();
+
+ // Now iterate over all dependents to check if they are ready to run.
+ scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
+ task);
+ if (scheduled_task) {
+ typedef internal::WorkerPoolTask::TaskVector TaskVector;
+ for (TaskVector::iterator it = scheduled_task->dependents().begin();
+ it != scheduled_task->dependents().end(); ++it) {
+ internal::WorkerPoolTask* dependent = *it;
+ if (!dependent->IsReadyToRun())
+ continue;
+
+ // Task is ready. Add it to |ready_to_run_tasks_|.
+ DCHECK(pending_tasks_.contains(dependent));
+ unsigned priority = pending_tasks_.get(dependent)->priority();
+ DCHECK(!ready_to_run_tasks_.count(priority) ||
+ ready_to_run_tasks_[priority] == dependent);
+ ready_to_run_tasks_[priority] = dependent;
+ }
+ }
- // Decrement |running_task_count_| now that we are done running task.
- running_task_count_--;
+ // Finally add task to |completed_tasks_|.
+ completed_tasks_.push_back(task);
}
// We noticed we should exit. Wake up the next worker so it knows it should
// exit as well (because the Shutdown() code only signals once).
- has_pending_tasks_cv_.Signal();
+ has_ready_to_run_tasks_cv_.Signal();
+}
+
+// BuildScheduledTaskMap() takes a task tree as input and constructs
+// a unique set of tasks with edges between dependencies pointing in
+// the direction of the dependents. Each task is given a unique priority
+// which is currently the same as the DFS traversal order.
+//
+// Input: Output:
+//
+// root task4 Task | Priority (lower is better)
+// / \ / \ -------+---------------------------
+// task1 task2 task3 task2 root | 4
+// | | | | task1 | 2
+// task3 | task1 | task2 | 3
+// | | \ / task3 | 1
+// task4 task4 root task4 | 0
+//
+// The output can be used to efficiently maintain a queue of
+// "ready to run" tasks.
+
+// static
+unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
+ internal::WorkerPoolTask* task,
+ internal::WorkerPoolTask* dependent,
+ unsigned priority,
+ ScheduledTaskMap* scheduled_tasks) {
+ // Skip sub-tree if task has already completed.
+ if (task->HasCompleted())
+ return priority;
+
+ ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
+ if (scheduled_it != scheduled_tasks->end()) {
+ DCHECK(dependent);
+ scheduled_it->second->dependents().push_back(dependent);
+ return priority;
+ }
+
+ typedef internal::WorkerPoolTask::TaskVector TaskVector;
+ for (TaskVector::iterator it = task->dependencies().begin();
+ it != task->dependencies().end(); ++it) {
+ internal::WorkerPoolTask* dependency = *it;
+ priority = BuildScheduledTaskMapRecursive(
+ dependency, task, priority, scheduled_tasks);
+ }
+
+ scheduled_tasks->set(task,
+ make_scoped_ptr(new ScheduledTask(dependent,
+ priority)));
+
+ return priority + 1;
+}
+
+// static
+void WorkerPool::Inner::BuildScheduledTaskMap(
+ internal::WorkerPoolTask* root,
+ ScheduledTaskMap* scheduled_tasks) {
+ const unsigned kBasePriority = 0u;
+ DCHECK(root);
+ BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
}
WorkerPool::WorkerPool(size_t num_threads,
@@ -297,83 +542,105 @@ WorkerPool::WorkerPool(size_t num_threads,
const std::string& thread_name_prefix)
: client_(NULL),
origin_loop_(base::MessageLoopProxy::current()),
- weak_ptr_factory_(this),
check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
check_for_completed_tasks_pending_(false),
+ in_dispatch_completion_callbacks_(false),
inner_(make_scoped_ptr(new Inner(this,
num_threads,
thread_name_prefix))) {
}
WorkerPool::~WorkerPool() {
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- DCHECK_EQ(0u, completed_tasks_.size());
}
void WorkerPool::Shutdown() {
+ TRACE_EVENT0("cc", "WorkerPool::Shutdown");
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
inner_->Shutdown();
- inner_->CollectCompletedTasks();
- DispatchCompletionCallbacks();
-}
-void WorkerPool::PostTaskAndReply(
- const Callback& task, const base::Closure& reply) {
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
- task,
- reply)).PassAs<internal::WorkerPoolTask>());
+ TaskDeque completed_tasks;
+ inner_->CollectCompletedTasks(&completed_tasks);
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::OnIdle() {
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::OnIdle");
- DispatchCompletionCallbacks();
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ DispatchCompletionCallbacks(completed_tasks);
+
+ // Cancel any pending check for completed tasks.
+ check_for_completed_tasks_callback_.Cancel();
+ check_for_completed_tasks_pending_ = false;
}
void WorkerPool::ScheduleCheckForCompletedTasks() {
if (check_for_completed_tasks_pending_)
return;
+ check_for_completed_tasks_callback_.Reset(
+ base::Bind(&WorkerPool::CheckForCompletedTasks,
+ base::Unretained(this)));
origin_loop_->PostDelayedTask(
FROM_HERE,
- base::Bind(&WorkerPool::CheckForCompletedTasks,
- weak_ptr_factory_.GetWeakPtr()),
+ check_for_completed_tasks_callback_.callback(),
check_for_completed_tasks_delay_);
check_for_completed_tasks_pending_ = true;
}
void WorkerPool::CheckForCompletedTasks() {
TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
- DCHECK(check_for_completed_tasks_pending_);
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ check_for_completed_tasks_callback_.Cancel();
check_for_completed_tasks_pending_ = false;
+ TaskDeque completed_tasks;
+
// Schedule another check for completed tasks if not idle.
- if (!inner_->CollectCompletedTasks())
+ if (!inner_->CollectCompletedTasks(&completed_tasks))
ScheduleCheckForCompletedTasks();
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::DispatchCompletionCallbacks() {
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
- if (completed_tasks_.empty())
+ // Early out when |completed_tasks| is empty to prevent unnecessary
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
+ if (completed_tasks->empty())
return;
- while (completed_tasks_.size()) {
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
+ // Worker pool instance is not reentrant while processing completed tasks.
+ in_dispatch_completion_callbacks_ = true;
+
+ while (!completed_tasks->empty()) {
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
+ completed_tasks->pop_front();
task->DidComplete();
+ task->DispatchCompletionCallback();
}
+ in_dispatch_completion_callbacks_ = false;
+
DCHECK(client_);
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
}
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- // Schedule check for completed tasks if not pending.
- ScheduleCheckForCompletedTasks();
+void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
+ TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
+
+ DCHECK(!in_dispatch_completion_callbacks_);
+
+ // Schedule check for completed tasks.
+ if (root)
+ ScheduleCheckForCompletedTasks();
- inner_->PostTask(task.Pass());
+ inner_->ScheduleTasks(root);
}
} // namespace cc
« no previous file with comments | « cc/base/worker_pool.h ('k') | cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698