| Index: base/threading/sequenced_worker_pool.cc
|
| diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
|
| index 62431aec4a7d6450a27014232b46ee3236704c48..d542733291a4351fbfabd2a1abe2bccd3ca5a285 100644
|
| --- a/base/threading/sequenced_worker_pool.cc
|
| +++ b/base/threading/sequenced_worker_pool.cc
|
| @@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner {
|
|
|
| void FlushForTesting();
|
|
|
| + void TriggerSpuriousWorkSignalForTesting();
|
| +
|
| + int GetWorkSignalCountForTesting() const;
|
| +
|
| void Shutdown();
|
|
|
| void SetTestingObserver(TestingObserver* observer);
|
| @@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner {
|
| void ThreadLoop(Worker* this_worker);
|
|
|
| private:
|
| + // Returns whether there are no more pending tasks and all threads
|
| + // are idle. Must be called under lock.
|
| + bool IsIdle() const;
|
| +
|
| // Called from within the lock, this converts the given token name into a
|
| // token ID, creating a new one if necessary.
|
| int LockedGetNamedTokenID(const std::string& name);
|
| @@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner {
|
| // the lock to avoid blocking important work starting a thread in the lock.
|
| void FinishStartingAdditionalThread(int thread_number);
|
|
|
| + // Signal |has_work_| and increment |has_work_signal_count_|.
|
| + void SignalHasWork();
|
| +
|
| // Checks whether there is work left that's blocking shutdown. Must be
|
| // called inside the lock.
|
| bool CanShutdown() const;
|
| @@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner {
|
| // lock.
|
| mutable Lock lock_;
|
|
|
| - // Condition variable used to wake up worker threads when a task is runnable.
|
| - ConditionVariable cond_var_;
|
| + // Condition variable that is waited on by worker threads until new
|
| + // tasks are posted or shutdown starts.
|
| + ConditionVariable has_work_cv_;
|
| +
|
| + // Number of times |has_work_| has been signalled. Used for testing.
|
| + int has_work_signal_count_;
|
| +
|
| + // Condition variable that is waited on by non-worker threads (in
|
| + // FlushForTesting()) until IsIdle() goes to true.
|
| + ConditionVariable is_idle_cv_;
|
| +
|
| + // Condition variable that is waited on by non-worker threads (in
|
| + // Shutdown()) until CanShutdown() goes to true.
|
| + ConditionVariable can_shutdown_cv_;
|
|
|
| // The maximum number of worker threads we'll create.
|
| const size_t max_threads_;
|
| @@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner(
|
| : worker_pool_(worker_pool),
|
| last_sequence_number_(0),
|
| lock_(),
|
| - cond_var_(&lock_),
|
| + has_work_cv_(&lock_),
|
| + has_work_signal_count_(0),
|
| + is_idle_cv_(&lock_),
|
| + can_shutdown_cv_(&lock_),
|
| max_threads_(max_threads),
|
| thread_name_prefix_(thread_name_prefix),
|
| thread_being_created_(false),
|
| @@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask(
|
| if (create_thread_id)
|
| FinishStartingAdditionalThread(create_thread_id);
|
| else
|
| - cond_var_.Signal();
|
| + SignalHasWork();
|
|
|
| return true;
|
| }
|
| @@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
|
| }
|
|
|
| void SequencedWorkerPool::Inner::FlushForTesting() {
|
| - {
|
| - AutoLock lock(lock_);
|
| - while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
|
| - cond_var_.Wait();
|
| - }
|
| - cond_var_.Signal();
|
| + AutoLock lock(lock_);
|
| + while (!IsIdle())
|
| + is_idle_cv_.Wait();
|
| +}
|
| +
|
| +void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() {
|
| + SignalHasWork();
|
| +}
|
| +
|
| +int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const {
|
| + AutoLock lock(lock_);
|
| + return has_work_signal_count_;
|
| }
|
|
|
| void SequencedWorkerPool::Inner::Shutdown() {
|
| @@ -351,27 +383,25 @@ void SequencedWorkerPool::Inner::Shutdown() {
|
|
|
| // Tickle the threads. This will wake up a waiting one so it will know that
|
| // it can exit, which in turn will wake up any other waiting ones.
|
| - cond_var_.Signal();
|
| + has_work_cv_.Signal();
|
|
|
| // There are no pending or running tasks blocking shutdown, we're done.
|
| if (CanShutdown())
|
| return;
|
| }
|
|
|
| - // If we get here, we know we're either waiting on a blocking task that's
|
| - // currently running, waiting on a blocking task that hasn't been scheduled
|
| - // yet, or both. Block on the "queue empty" event to know when all tasks are
|
| - // complete. This must be done outside the lock.
|
| + // If we're here, then something is blocking shutdown. So wait for
|
| + // CanShutdown() to go to true.
|
| +
|
| if (testing_observer_)
|
| testing_observer_->WillWaitForShutdown();
|
|
|
| TimeTicks shutdown_wait_begin = TimeTicks::Now();
|
|
|
| - // Wait for no more tasks.
|
| {
|
| AutoLock lock(lock_);
|
| while (!CanShutdown())
|
| - cond_var_.Wait();
|
| + can_shutdown_cv_.Wait();
|
| }
|
| UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
|
| TimeTicks::Now() - shutdown_wait_begin);
|
| @@ -401,7 +431,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| int new_thread_id = WillRunWorkerTask(task);
|
| {
|
| AutoUnlock unlock(lock_);
|
| - cond_var_.Signal();
|
| + // There may be more work available, so wake up another
|
| + // worker thread. (Technically not required, since we
|
| + // already get a signal for each new task, but it doesn't
|
| + // hurt.)
|
| + has_work_cv_.Signal();
|
| delete_these_outside_lock.clear();
|
|
|
| // Complete thread creation outside the lock if necessary.
|
| @@ -424,17 +458,26 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
|
| if (shutdown_called_)
|
| break;
|
| waiting_thread_count_++;
|
| - cond_var_.Signal(); // For Flush() that may be waiting on the
|
| - // waiting thread count to go up.
|
| - cond_var_.Wait();
|
| + // This is the only time that IsIdle() can go to true.
|
| + if (IsIdle())
|
| + is_idle_cv_.Signal();
|
| + has_work_cv_.Wait();
|
| waiting_thread_count_--;
|
| }
|
| }
|
| - }
|
| + } // Release lock_.
|
|
|
| // We noticed we should exit. Wake up the next worker so it knows it should
|
| // exit as well (because the Shutdown() code only signals once).
|
| - cond_var_.Signal();
|
| + has_work_cv_.Signal();
|
| +
|
| + // Possibly unblock shutdown.
|
| + can_shutdown_cv_.Signal();
|
| +}
|
| +
|
| +bool SequencedWorkerPool::Inner::IsIdle() const {
|
| + lock_.AssertAcquired();
|
| + return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size();
|
| }
|
|
|
| int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
|
| @@ -518,8 +561,9 @@ bool SequencedWorkerPool::Inner::GetWork(
|
| *task = *i;
|
| i = pending_tasks_.erase(i);
|
| pending_task_count_--;
|
| - if (task->shutdown_behavior == BLOCK_SHUTDOWN)
|
| + if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
|
| blocking_shutdown_pending_task_count_--;
|
| + }
|
|
|
| found_task = true;
|
| break;
|
| @@ -640,6 +684,14 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
|
| new Worker(worker_pool_, thread_number, thread_name_prefix_);
|
| }
|
|
|
| +void SequencedWorkerPool::Inner::SignalHasWork() {
|
| + has_work_cv_.Signal();
|
| + {
|
| + AutoLock lock(lock_);
|
| + ++has_work_signal_count_;
|
| + }
|
| +}
|
| +
|
| bool SequencedWorkerPool::Inner::CanShutdown() const {
|
| lock_.AssertAcquired();
|
| // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
|
| @@ -665,11 +717,10 @@ void SequencedWorkerPool::OnDestruct() const {
|
| // TODO(akalin): Once we can easily check if we're on a worker
|
| // thread or not, use that instead of restricting destruction to
|
| // only the constructor message loop.
|
| - if (constructor_message_loop_->BelongsToCurrentThread()) {
|
| + if (constructor_message_loop_->BelongsToCurrentThread())
|
| delete this;
|
| - } else {
|
| + else
|
| constructor_message_loop_->DeleteSoon(FROM_HERE, this);
|
| - }
|
| }
|
|
|
| SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
|
| @@ -748,7 +799,16 @@ void SequencedWorkerPool::FlushForTesting() {
|
| inner_->FlushForTesting();
|
| }
|
|
|
| +void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() {
|
| + inner_->TriggerSpuriousWorkSignalForTesting();
|
| +}
|
| +
|
| +int SequencedWorkerPool::GetWorkSignalCountForTesting() const {
|
| + return inner_->GetWorkSignalCountForTesting();
|
| +}
|
| +
|
| void SequencedWorkerPool::Shutdown() {
|
| + DCHECK(constructor_message_loop_->BelongsToCurrentThread());
|
| inner_->Shutdown();
|
| }
|
|
|
|
|