| Index: cc/base/worker_pool.cc
|
| diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
|
| index 59dc828980e9b211b415ae95c1bd55d3a65458a3..bbf51c227b89b6b90263678f4c970e49f0b6914a 100644
|
| --- a/cc/base/worker_pool.cc
|
| +++ b/cc/base/worker_pool.cc
|
| @@ -4,46 +4,93 @@
|
|
|
| #include "cc/base/worker_pool.h"
|
|
|
| -#include <algorithm>
|
| +#if defined(OS_ANDROID)
|
| +// TODO(epenner): Move thread priorities to base. (crbug.com/170549)
|
| +#include <sys/resource.h>
|
| +#endif
|
| +
|
| +#include <map>
|
|
|
| #include "base/bind.h"
|
| #include "base/debug/trace_event.h"
|
| +#include "base/hash_tables.h"
|
| #include "base/stringprintf.h"
|
| -#include "base/synchronization/condition_variable.h"
|
| #include "base/threading/simple_thread.h"
|
| #include "base/threading/thread_restrictions.h"
|
| +#include "cc/base/scoped_ptr_deque.h"
|
| +#include "cc/base/scoped_ptr_hash_map.h"
|
| +
|
| +#if defined(COMPILER_GCC)
|
| +namespace BASE_HASH_NAMESPACE {
|
| +template <> struct hash<cc::internal::WorkerPoolTask*> {
|
| + size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
|
| + return hash<size_t>()(reinterpret_cast<size_t>(ptr));
|
| + }
|
| +};
|
| +} // namespace BASE_HASH_NAMESPACE
|
| +#endif // COMPILER
|
|
|
| namespace cc {
|
|
|
| -namespace {
|
| -
|
| -class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
|
| - public:
|
| - WorkerPoolTaskImpl(const WorkerPool::Callback& task,
|
| - const base::Closure& reply)
|
| - : internal::WorkerPoolTask(reply),
|
| - task_(task) {}
|
| +namespace internal {
|
|
|
| - virtual void RunOnThread(unsigned thread_index) OVERRIDE {
|
| - task_.Run();
|
| - }
|
| +WorkerPoolTask::WorkerPoolTask()
|
| + : did_schedule_(false),
|
| + did_run_(false),
|
| + did_complete_(false) {
|
| +}
|
|
|
| - private:
|
| - WorkerPool::Callback task_;
|
| -};
|
| +WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
|
| + : did_schedule_(false),
|
| + did_run_(false),
|
| + did_complete_(false) {
|
| + dependencies_.swap(*dependencies);
|
| +}
|
|
|
| -} // namespace
|
| +WorkerPoolTask::~WorkerPoolTask() {
|
| + DCHECK_EQ(did_schedule_, did_complete_);
|
| + DCHECK(!did_run_ || did_schedule_);
|
| + DCHECK(!did_run_ || did_complete_);
|
| +}
|
|
|
| -namespace internal {
|
| +void WorkerPoolTask::DidSchedule() {
|
| + DCHECK(!did_complete_);
|
| + did_schedule_ = true;
|
| +}
|
|
|
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
|
| +void WorkerPoolTask::WillRun() {
|
| + DCHECK(did_schedule_);
|
| + DCHECK(!did_complete_);
|
| + DCHECK(!did_run_);
|
| }
|
|
|
| -WorkerPoolTask::~WorkerPoolTask() {
|
| +void WorkerPoolTask::DidRun() {
|
| + did_run_ = true;
|
| }
|
|
|
| void WorkerPoolTask::DidComplete() {
|
| - reply_.Run();
|
| + DCHECK(did_schedule_);
|
| + DCHECK(!did_complete_);
|
| + did_complete_ = true;
|
| +}
|
| +
|
| +bool WorkerPoolTask::IsReadyToRun() const {
|
| + // TODO(reveman): Use counter to improve performance.
|
| + for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
|
| + it != dependencies_.rend(); ++it) {
|
| + WorkerPoolTask* dependency = *it;
|
| + if (!dependency->HasFinishedRunning())
|
| + return false;
|
| + }
|
| + return true;
|
| +}
|
| +
|
| +bool WorkerPoolTask::HasFinishedRunning() const {
|
| + return did_run_;
|
| +}
|
| +
|
| +bool WorkerPoolTask::HasCompleted() const {
|
| + return did_complete_;
|
| }
|
|
|
| } // namespace internal
|
| @@ -60,17 +107,51 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
|
|
|
| void Shutdown();
|
|
|
| - void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
|
| + // Schedule running of |root| task and all its dependencies. Tasks
|
| + // previously scheduled but no longer needed to run |root| will be
|
| + // canceled unless already running. Canceled tasks are moved to
|
| + // |completed_tasks_| without being run. The result is that once
|
| + // scheduled, a task is guaranteed to end up in the |completed_tasks_|
|
| + // queue even if they later get canceled by another call to
|
| + // ScheduleTasks().
|
| + void ScheduleTasks(internal::WorkerPoolTask* root);
|
|
|
| - // Appends all completed tasks to worker pool's completed tasks queue
|
| - // and returns true if idle.
|
| - bool CollectCompletedTasks();
|
| + // Collect all completed tasks in |completed_tasks|. Returns true if idle.
|
| + bool CollectCompletedTasks(TaskDeque* completed_tasks);
|
|
|
| private:
|
| - // Appends all completed tasks to |completed_tasks|. Lock must
|
| - // already be acquired before calling this function.
|
| - bool AppendCompletedTasksWithLockAcquired(
|
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
|
| + class ScheduledTask {
|
| + public:
|
| + ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
|
| + : priority_(priority) {
|
| + if (dependent)
|
| + dependents_.push_back(dependent);
|
| + }
|
| +
|
| + internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
|
| + unsigned priority() const { return priority_; }
|
| +
|
| + private:
|
| + internal::WorkerPoolTask::TaskVector dependents_;
|
| + unsigned priority_;
|
| + };
|
| + typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
|
| + typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
|
| + ScheduledTaskMap;
|
| +
|
| + // This builds a ScheduledTaskMap from a root task.
|
| + static unsigned BuildScheduledTaskMapRecursive(
|
| + internal::WorkerPoolTask* task,
|
| + internal::WorkerPoolTask* dependent,
|
| + unsigned priority,
|
| + ScheduledTaskMap* scheduled_tasks);
|
| + static void BuildScheduledTaskMap(
|
| + internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
|
| +
|
| + // Collect all completed tasks by swapping the contents of
|
| + // |completed_tasks| and |completed_tasks_|. Lock must be acquired
|
| + // before calling this function. Returns true if idle.
|
| + bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
|
|
|
| // Schedule an OnIdleOnOriginThread callback if not already pending.
|
| // Lock must already be acquired before calling this function.
|
| @@ -90,8 +171,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
|
| mutable base::Lock lock_;
|
|
|
| // Condition variable that is waited on by worker threads until new
|
| - // tasks are posted or shutdown starts.
|
| - base::ConditionVariable has_pending_tasks_cv_;
|
| + // tasks are ready to run or shutdown starts.
|
| + base::ConditionVariable has_ready_to_run_tasks_cv_;
|
|
|
| // Target message loop used for posting callbacks.
|
| scoped_refptr<base::MessageLoopProxy> origin_loop_;
|
| @@ -106,15 +187,25 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
|
| // loop index is 0.
|
| unsigned next_thread_index_;
|
|
|
| - // Number of tasks currently running.
|
| - unsigned running_task_count_;
|
| -
|
| // Set during shutdown. Tells workers to exit when no more tasks
|
| // are pending.
|
| bool shutdown_;
|
|
|
| - typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
|
| - TaskDeque pending_tasks_;
|
| + // The root task that is a dependent of all other tasks.
|
| + scoped_refptr<internal::WorkerPoolTask> root_;
|
| +
|
| + // This set contains all pending tasks.
|
| + ScheduledTaskMap pending_tasks_;
|
| +
|
| + // Ordered set of tasks that are ready to run.
|
| + // TODO(reveman): priority_queue might be more efficient.
|
| + typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
|
| + TaskMap ready_to_run_tasks_;
|
| +
|
| + // This set contains all currently running tasks.
|
| + ScheduledTaskMap running_tasks_;
|
| +
|
| + // Completed tasks not yet collected by origin thread.
|
| TaskDeque completed_tasks_;
|
|
|
| ScopedPtrDeque<base::DelegateSimpleThread> workers_;
|
| @@ -127,25 +218,24 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
|
| const std::string& thread_name_prefix)
|
| : worker_pool_on_origin_thread_(worker_pool),
|
| lock_(),
|
| - has_pending_tasks_cv_(&lock_),
|
| + has_ready_to_run_tasks_cv_(&lock_),
|
| origin_loop_(base::MessageLoopProxy::current()),
|
| weak_ptr_factory_(this),
|
| on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
|
| weak_ptr_factory_.GetWeakPtr())),
|
| on_idle_pending_(false),
|
| next_thread_index_(0),
|
| - running_task_count_(0),
|
| shutdown_(false) {
|
| base::AutoLock lock(lock_);
|
|
|
| while (workers_.size() < num_threads) {
|
| scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
|
| new base::DelegateSimpleThread(
|
| - this,
|
| - thread_name_prefix +
|
| - base::StringPrintf(
|
| - "Worker%u",
|
| - static_cast<unsigned>(workers_.size() + 1)).c_str()));
|
| + this,
|
| + thread_name_prefix +
|
| + base::StringPrintf(
|
| + "Worker%u",
|
| + static_cast<unsigned>(workers_.size() + 1)).c_str()));
|
| worker->Start();
|
| workers_.push_back(worker.Pass());
|
| }
|
| @@ -156,12 +246,10 @@ WorkerPool::Inner::~Inner() {
|
|
|
| DCHECK(shutdown_);
|
|
|
| - // Cancel all pending callbacks.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -
|
| DCHECK_EQ(0u, pending_tasks_.size());
|
| + DCHECK_EQ(0u, ready_to_run_tasks_.size());
|
| + DCHECK_EQ(0u, running_tasks_.size());
|
| DCHECK_EQ(0u, completed_tasks_.size());
|
| - DCHECK_EQ(0u, running_task_count_);
|
| }
|
|
|
| void WorkerPool::Inner::Shutdown() {
|
| @@ -173,7 +261,7 @@ void WorkerPool::Inner::Shutdown() {
|
|
|
| // Wake up a worker so it knows it should exit. This will cause all workers
|
| // to exit as each will wake up another worker before exiting.
|
| - has_pending_tasks_cv_.Signal();
|
| + has_ready_to_run_tasks_cv_.Signal();
|
| }
|
|
|
| while (workers_.size()) {
|
| @@ -183,32 +271,100 @@ void WorkerPool::Inner::Shutdown() {
|
| base::ThreadRestrictions::ScopedAllowIO allow_io;
|
| worker->Join();
|
| }
|
| +
|
| + // Cancel any pending OnIdle callback.
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
| }
|
|
|
| -void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
|
| - base::AutoLock lock(lock_);
|
| +void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
|
| + // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
|
| + DCHECK(!root || !shutdown_);
|
| +
|
| + scoped_refptr<internal::WorkerPoolTask> new_root(root);
|
| +
|
| + ScheduledTaskMap new_pending_tasks;
|
| + ScheduledTaskMap new_running_tasks;
|
| + TaskMap new_ready_to_run_tasks;
|
| +
|
| + // Build scheduled task map before acquiring |lock_|.
|
| + if (root)
|
| + BuildScheduledTaskMap(root, &new_pending_tasks);
|
| +
|
| + {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + // First remove all completed tasks from |new_pending_tasks|.
|
| + for (TaskDeque::iterator it = completed_tasks_.begin();
|
| + it != completed_tasks_.end(); ++it) {
|
| + internal::WorkerPoolTask* task = *it;
|
| + new_pending_tasks.take_and_erase(task);
|
| + }
|
|
|
| - pending_tasks_.push_back(task.Pass());
|
| + // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
|
| + for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
|
| + it != pending_tasks_.end(); ++it) {
|
| + internal::WorkerPoolTask* task = it->first;
|
|
|
| - // There is more work available, so wake up worker thread.
|
| - has_pending_tasks_cv_.Signal();
|
| + // Task has completed if not present in |new_pending_tasks|.
|
| + if (!new_pending_tasks.contains(task))
|
| + completed_tasks_.push_back(task);
|
| + }
|
| +
|
| + // Build new running task set.
|
| + for (ScheduledTaskMap::iterator it = running_tasks_.begin();
|
| + it != running_tasks_.end(); ++it) {
|
| + internal::WorkerPoolTask* task = it->first;
|
| + // Transfer scheduled task value from |new_pending_tasks| to
|
| + // |new_running_tasks| if currently running. Value must be set to
|
| + // NULL if |new_pending_tasks| doesn't contain task. This does
|
| + // the right in both cases.
|
| + new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
|
| + }
|
| +
|
| + // Build new "ready to run" tasks queue.
|
| + for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
|
| + it != new_pending_tasks.end(); ++it) {
|
| + internal::WorkerPoolTask* task = it->first;
|
| +
|
| + // Completed tasks should not exist in |new_pending_tasks|.
|
| + DCHECK(!task->HasFinishedRunning());
|
| +
|
| + // Call DidSchedule() to indicate that this task has been scheduled.
|
| + // Note: This is only for debugging purposes.
|
| + task->DidSchedule();
|
| +
|
| + DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
|
| + if (task->IsReadyToRun())
|
| + new_ready_to_run_tasks[it->second->priority()] = task;
|
| + }
|
| +
|
| + // Swap root taskand task sets.
|
| + // Note: old tasks are intentionally destroyed after releasing |lock_|.
|
| + root_.swap(new_root);
|
| + pending_tasks_.swap(new_pending_tasks);
|
| + running_tasks_.swap(new_running_tasks);
|
| + ready_to_run_tasks_.swap(new_ready_to_run_tasks);
|
| +
|
| + // If there is more work available, wake up worker thread.
|
| + if (!ready_to_run_tasks_.empty())
|
| + has_ready_to_run_tasks_cv_.Signal();
|
| + }
|
| }
|
|
|
| -bool WorkerPool::Inner::CollectCompletedTasks() {
|
| +bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
|
| base::AutoLock lock(lock_);
|
|
|
| - return AppendCompletedTasksWithLockAcquired(
|
| - &worker_pool_on_origin_thread_->completed_tasks_);
|
| + return CollectCompletedTasksWithLockAcquired(completed_tasks);
|
| }
|
|
|
| -bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
|
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
|
| +bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
|
| + TaskDeque* completed_tasks) {
|
| lock_.AssertAcquired();
|
|
|
| - while (completed_tasks_.size())
|
| - completed_tasks->push_back(completed_tasks_.take_front().Pass());
|
| + DCHECK_EQ(0u, completed_tasks->size());
|
| + completed_tasks->swap(completed_tasks_);
|
|
|
| - return !running_task_count_ && pending_tasks_.empty();
|
| + return running_tasks_.empty() && pending_tasks_.empty();
|
| }
|
|
|
| void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
|
| @@ -221,6 +377,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
|
| }
|
|
|
| void WorkerPool::Inner::OnIdleOnOriginThread() {
|
| + TaskDeque completed_tasks;
|
| +
|
| {
|
| base::AutoLock lock(lock_);
|
|
|
| @@ -228,14 +386,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
|
| on_idle_pending_ = false;
|
|
|
| // Early out if no longer idle.
|
| - if (running_task_count_ || !pending_tasks_.empty())
|
| + if (!running_tasks_.empty() || !pending_tasks_.empty())
|
| return;
|
|
|
| - AppendCompletedTasksWithLockAcquired(
|
| - &worker_pool_on_origin_thread_->completed_tasks_);
|
| + CollectCompletedTasksWithLockAcquired(&completed_tasks);
|
| }
|
|
|
| - worker_pool_on_origin_thread_->OnIdle();
|
| + worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
|
| }
|
|
|
| void WorkerPool::Inner::Run() {
|
| @@ -251,29 +408,37 @@ void WorkerPool::Inner::Run() {
|
| int thread_index = next_thread_index_++;
|
|
|
| while (true) {
|
| - if (pending_tasks_.empty()) {
|
| - // Exit when shutdown is set and no more tasks are pending.
|
| - if (shutdown_)
|
| - break;
|
| -
|
| - // Schedule an idle callback if requested and not pending.
|
| - if (!running_task_count_)
|
| - ScheduleOnIdleWithLockAcquired();
|
| -
|
| - // Wait for new pending tasks.
|
| - has_pending_tasks_cv_.Wait();
|
| + if (ready_to_run_tasks_.empty()) {
|
| + if (pending_tasks_.empty()) {
|
| + // Exit when shutdown is set and no more tasks are pending.
|
| + if (shutdown_)
|
| + break;
|
| +
|
| + // Schedule an idle callback if no tasks are running.
|
| + if (running_tasks_.empty())
|
| + ScheduleOnIdleWithLockAcquired();
|
| + }
|
| +
|
| + // Wait for more tasks.
|
| + has_ready_to_run_tasks_cv_.Wait();
|
| continue;
|
| }
|
|
|
| - // Get next task.
|
| - scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
|
| + // Take top priority task from |ready_to_run_tasks_|.
|
| + scoped_refptr<internal::WorkerPoolTask> task(
|
| + ready_to_run_tasks_.begin()->second);
|
| + ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
|
| +
|
| + // Move task from |pending_tasks_| to |running_tasks_|.
|
| + DCHECK(pending_tasks_.contains(task));
|
| + DCHECK(!running_tasks_.contains(task));
|
| + running_tasks_.set(task, pending_tasks_.take_and_erase(task));
|
|
|
| - // Increment |running_task_count_| before starting to run task.
|
| - running_task_count_++;
|
| + // There may be more work available, so wake up another worker thread.
|
| + has_ready_to_run_tasks_cv_.Signal();
|
|
|
| - // There may be more work available, so wake up another
|
| - // worker thread.
|
| - has_pending_tasks_cv_.Signal();
|
| + // Call WillRun() before releasing |lock_| and running task.
|
| + task->WillRun();
|
|
|
| {
|
| base::AutoUnlock unlock(lock_);
|
| @@ -281,15 +446,95 @@ void WorkerPool::Inner::Run() {
|
| task->RunOnThread(thread_index);
|
| }
|
|
|
| - completed_tasks_.push_back(task.Pass());
|
| + // This will mark task as finished running.
|
| + task->DidRun();
|
| +
|
| + // Now iterate over all dependents to check if they are ready to run.
|
| + scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
|
| + task);
|
| + if (scheduled_task) {
|
| + typedef internal::WorkerPoolTask::TaskVector TaskVector;
|
| + for (TaskVector::iterator it = scheduled_task->dependents().begin();
|
| + it != scheduled_task->dependents().end(); ++it) {
|
| + internal::WorkerPoolTask* dependent = *it;
|
| + if (!dependent->IsReadyToRun())
|
| + continue;
|
| +
|
| + // Task is ready. Add it to |ready_to_run_tasks_|.
|
| + DCHECK(pending_tasks_.contains(dependent));
|
| + unsigned priority = pending_tasks_.get(dependent)->priority();
|
| + DCHECK(!ready_to_run_tasks_.count(priority) ||
|
| + ready_to_run_tasks_[priority] == dependent);
|
| + ready_to_run_tasks_[priority] = dependent;
|
| + }
|
| + }
|
|
|
| - // Decrement |running_task_count_| now that we are done running task.
|
| - running_task_count_--;
|
| + // Finally add task to |completed_tasks_|.
|
| + completed_tasks_.push_back(task);
|
| }
|
|
|
| // We noticed we should exit. Wake up the next worker so it knows it should
|
| // exit as well (because the Shutdown() code only signals once).
|
| - has_pending_tasks_cv_.Signal();
|
| + has_ready_to_run_tasks_cv_.Signal();
|
| +}
|
| +
|
| +// BuildScheduledTaskMap() takes a task tree as input and constructs
|
| +// a unique set of tasks with edges between dependencies pointing in
|
| +// the direction of the dependents. Each task is given a unique priority
|
| +// which is currently the same as the DFS traversal order.
|
| +//
|
| +// Input: Output:
|
| +//
|
| +// root task4 Task | Priority (lower is better)
|
| +// / \ / \ -------+---------------------------
|
| +// task1 task2 task3 task2 root | 4
|
| +// | | | | task1 | 2
|
| +// task3 | task1 | task2 | 3
|
| +// | | \ / task3 | 1
|
| +// task4 task4 root task4 | 0
|
| +//
|
| +// The output can be used to efficiently maintain a queue of
|
| +// "ready to run" tasks.
|
| +
|
| +// static
|
| +unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
|
| + internal::WorkerPoolTask* task,
|
| + internal::WorkerPoolTask* dependent,
|
| + unsigned priority,
|
| + ScheduledTaskMap* scheduled_tasks) {
|
| + // Skip sub-tree if task has already completed.
|
| + if (task->HasCompleted())
|
| + return priority;
|
| +
|
| + ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
|
| + if (scheduled_it != scheduled_tasks->end()) {
|
| + DCHECK(dependent);
|
| + scheduled_it->second->dependents().push_back(dependent);
|
| + return priority;
|
| + }
|
| +
|
| + typedef internal::WorkerPoolTask::TaskVector TaskVector;
|
| + for (TaskVector::iterator it = task->dependencies().begin();
|
| + it != task->dependencies().end(); ++it) {
|
| + internal::WorkerPoolTask* dependency = *it;
|
| + priority = BuildScheduledTaskMapRecursive(
|
| + dependency, task, priority, scheduled_tasks);
|
| + }
|
| +
|
| + scheduled_tasks->set(task,
|
| + make_scoped_ptr(new ScheduledTask(dependent,
|
| + priority)));
|
| +
|
| + return priority + 1;
|
| +}
|
| +
|
| +// static
|
| +void WorkerPool::Inner::BuildScheduledTaskMap(
|
| + internal::WorkerPoolTask* root,
|
| + ScheduledTaskMap* scheduled_tasks) {
|
| + const unsigned kBasePriority = 0u;
|
| + DCHECK(root);
|
| + BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
|
| }
|
|
|
| WorkerPool::WorkerPool(size_t num_threads,
|
| @@ -297,83 +542,105 @@ WorkerPool::WorkerPool(size_t num_threads,
|
| const std::string& thread_name_prefix)
|
| : client_(NULL),
|
| origin_loop_(base::MessageLoopProxy::current()),
|
| - weak_ptr_factory_(this),
|
| check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
|
| check_for_completed_tasks_pending_(false),
|
| + in_dispatch_completion_callbacks_(false),
|
| inner_(make_scoped_ptr(new Inner(this,
|
| num_threads,
|
| thread_name_prefix))) {
|
| }
|
|
|
| WorkerPool::~WorkerPool() {
|
| - // Cancel all pending callbacks.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -
|
| - DCHECK_EQ(0u, completed_tasks_.size());
|
| }
|
|
|
| void WorkerPool::Shutdown() {
|
| + TRACE_EVENT0("cc", "WorkerPool::Shutdown");
|
| +
|
| + DCHECK(!in_dispatch_completion_callbacks_);
|
| +
|
| inner_->Shutdown();
|
| - inner_->CollectCompletedTasks();
|
| - DispatchCompletionCallbacks();
|
| -}
|
|
|
| -void WorkerPool::PostTaskAndReply(
|
| - const Callback& task, const base::Closure& reply) {
|
| - PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
|
| - task,
|
| - reply)).PassAs<internal::WorkerPoolTask>());
|
| + TaskDeque completed_tasks;
|
| + inner_->CollectCompletedTasks(&completed_tasks);
|
| + DispatchCompletionCallbacks(&completed_tasks);
|
| }
|
|
|
| -void WorkerPool::OnIdle() {
|
| +void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
|
| TRACE_EVENT0("cc", "WorkerPool::OnIdle");
|
|
|
| - DispatchCompletionCallbacks();
|
| + DCHECK(!in_dispatch_completion_callbacks_);
|
| +
|
| + DispatchCompletionCallbacks(completed_tasks);
|
| +
|
| + // Cancel any pending check for completed tasks.
|
| + check_for_completed_tasks_callback_.Cancel();
|
| + check_for_completed_tasks_pending_ = false;
|
| }
|
|
|
| void WorkerPool::ScheduleCheckForCompletedTasks() {
|
| if (check_for_completed_tasks_pending_)
|
| return;
|
| + check_for_completed_tasks_callback_.Reset(
|
| + base::Bind(&WorkerPool::CheckForCompletedTasks,
|
| + base::Unretained(this)));
|
| origin_loop_->PostDelayedTask(
|
| FROM_HERE,
|
| - base::Bind(&WorkerPool::CheckForCompletedTasks,
|
| - weak_ptr_factory_.GetWeakPtr()),
|
| + check_for_completed_tasks_callback_.callback(),
|
| check_for_completed_tasks_delay_);
|
| check_for_completed_tasks_pending_ = true;
|
| }
|
|
|
| void WorkerPool::CheckForCompletedTasks() {
|
| TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
|
| - DCHECK(check_for_completed_tasks_pending_);
|
| +
|
| + DCHECK(!in_dispatch_completion_callbacks_);
|
| +
|
| + check_for_completed_tasks_callback_.Cancel();
|
| check_for_completed_tasks_pending_ = false;
|
|
|
| + TaskDeque completed_tasks;
|
| +
|
| // Schedule another check for completed tasks if not idle.
|
| - if (!inner_->CollectCompletedTasks())
|
| + if (!inner_->CollectCompletedTasks(&completed_tasks))
|
| ScheduleCheckForCompletedTasks();
|
|
|
| - DispatchCompletionCallbacks();
|
| + DispatchCompletionCallbacks(&completed_tasks);
|
| }
|
|
|
| -void WorkerPool::DispatchCompletionCallbacks() {
|
| +void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
|
| TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
|
|
|
| - if (completed_tasks_.empty())
|
| + // Early out when |completed_tasks| is empty to prevent unnecessary
|
| + // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
|
| + if (completed_tasks->empty())
|
| return;
|
|
|
| - while (completed_tasks_.size()) {
|
| - scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
|
| + // Worker pool instance is not reentrant while processing completed tasks.
|
| + in_dispatch_completion_callbacks_ = true;
|
| +
|
| + while (!completed_tasks->empty()) {
|
| + scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
|
| + completed_tasks->pop_front();
|
| task->DidComplete();
|
| + task->DispatchCompletionCallback();
|
| }
|
|
|
| + in_dispatch_completion_callbacks_ = false;
|
| +
|
| DCHECK(client_);
|
| client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
|
| }
|
|
|
| -void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
|
| - // Schedule check for completed tasks if not pending.
|
| - ScheduleCheckForCompletedTasks();
|
| +void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
|
| + TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
|
| +
|
| + DCHECK(!in_dispatch_completion_callbacks_);
|
| +
|
| + // Schedule check for completed tasks.
|
| + if (root)
|
| + ScheduleCheckForCompletedTasks();
|
|
|
| - inner_->PostTask(task.Pass());
|
| + inner_->ScheduleTasks(root);
|
| }
|
|
|
| } // namespace cc
|
|
|