Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 62431aec4a7d6450a27014232b46ee3236704c48..d542733291a4351fbfabd2a1abe2bccd3ca5a285 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner { |
void FlushForTesting(); |
+ void TriggerSpuriousWorkSignalForTesting(); |
+ |
+ int GetWorkSignalCountForTesting() const; |
+ |
void Shutdown(); |
void SetTestingObserver(TestingObserver* observer); |
@@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner { |
void ThreadLoop(Worker* this_worker); |
private: |
+ // Returns whether there are no more pending tasks and all threads |
+ // are idle. Must be called under lock. |
+ bool IsIdle() const; |
+ |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
int LockedGetNamedTokenID(const std::string& name); |
@@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner { |
// the lock to avoid blocking important work starting a thread in the lock. |
void FinishStartingAdditionalThread(int thread_number); |
+ // Signal |has_work_| and increment |has_work_signal_count_|. |
+ void SignalHasWork(); |
+ |
// Checks whether there is work left that's blocking shutdown. Must be |
// called inside the lock. |
bool CanShutdown() const; |
@@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner { |
// lock. |
mutable Lock lock_; |
- // Condition variable used to wake up worker threads when a task is runnable. |
- ConditionVariable cond_var_; |
+ // Condition variable that is waited on by worker threads until new |
+ // tasks are posted or shutdown starts. |
+ ConditionVariable has_work_cv_; |
+ |
+ // Number of times |has_work_| has been signalled. Used for testing. |
+ int has_work_signal_count_; |
+ |
+ // Condition variable that is waited on by non-worker threads (in |
+ // FlushForTesting()) until IsIdle() goes to true. |
+ ConditionVariable is_idle_cv_; |
+ |
+ // Condition variable that is waited on by non-worker threads (in |
+ // Shutdown()) until CanShutdown() goes to true. |
+ ConditionVariable can_shutdown_cv_; |
// The maximum number of worker threads we'll create. |
const size_t max_threads_; |
@@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner( |
: worker_pool_(worker_pool), |
last_sequence_number_(0), |
lock_(), |
- cond_var_(&lock_), |
+ has_work_cv_(&lock_), |
+ has_work_signal_count_(0), |
+ is_idle_cv_(&lock_), |
+ can_shutdown_cv_(&lock_), |
max_threads_(max_threads), |
thread_name_prefix_(thread_name_prefix), |
thread_being_created_(false), |
@@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (create_thread_id) |
FinishStartingAdditionalThread(create_thread_id); |
else |
- cond_var_.Signal(); |
+ SignalHasWork(); |
return true; |
} |
@@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
} |
void SequencedWorkerPool::Inner::FlushForTesting() { |
- { |
- AutoLock lock(lock_); |
- while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
- cond_var_.Wait(); |
- } |
- cond_var_.Signal(); |
+ AutoLock lock(lock_); |
+ while (!IsIdle()) |
+ is_idle_cv_.Wait(); |
+} |
+ |
+void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
+ SignalHasWork(); |
+} |
+ |
+int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { |
+ AutoLock lock(lock_); |
+ return has_work_signal_count_; |
} |
void SequencedWorkerPool::Inner::Shutdown() { |
@@ -351,27 +383,25 @@ void SequencedWorkerPool::Inner::Shutdown() { |
// Tickle the threads. This will wake up a waiting one so it will know that |
// it can exit, which in turn will wake up any other waiting ones. |
- cond_var_.Signal(); |
+ has_work_cv_.Signal(); |
// There are no pending or running tasks blocking shutdown, we're done. |
if (CanShutdown()) |
return; |
} |
- // If we get here, we know we're either waiting on a blocking task that's |
- // currently running, waiting on a blocking task that hasn't been scheduled |
- // yet, or both. Block on the "queue empty" event to know when all tasks are |
- // complete. This must be done outside the lock. |
+ // If we're here, then something is blocking shutdown. So wait for |
+ // CanShutdown() to go to true. |
+ |
if (testing_observer_) |
testing_observer_->WillWaitForShutdown(); |
TimeTicks shutdown_wait_begin = TimeTicks::Now(); |
- // Wait for no more tasks. |
{ |
AutoLock lock(lock_); |
while (!CanShutdown()) |
- cond_var_.Wait(); |
+ can_shutdown_cv_.Wait(); |
} |
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
TimeTicks::Now() - shutdown_wait_begin); |
@@ -401,7 +431,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
int new_thread_id = WillRunWorkerTask(task); |
{ |
AutoUnlock unlock(lock_); |
- cond_var_.Signal(); |
+ // There may be more work available, so wake up another |
+ // worker thread. (Technically not required, since we |
+ // already get a signal for each new task, but it doesn't |
+ // hurt.) |
+ has_work_cv_.Signal(); |
delete_these_outside_lock.clear(); |
// Complete thread creation outside the lock if necessary. |
@@ -424,17 +458,26 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
if (shutdown_called_) |
break; |
waiting_thread_count_++; |
- cond_var_.Signal(); // For Flush() that may be waiting on the |
- // waiting thread count to go up. |
- cond_var_.Wait(); |
+ // This is the only time that IsIdle() can go to true. |
+ if (IsIdle()) |
+ is_idle_cv_.Signal(); |
+ has_work_cv_.Wait(); |
waiting_thread_count_--; |
} |
} |
- } |
+ } // Release lock_. |
// We noticed we should exit. Wake up the next worker so it knows it should |
// exit as well (because the Shutdown() code only signals once). |
- cond_var_.Signal(); |
+ has_work_cv_.Signal(); |
+ |
+ // Possibly unblock shutdown. |
+ can_shutdown_cv_.Signal(); |
+} |
+ |
+bool SequencedWorkerPool::Inner::IsIdle() const { |
+ lock_.AssertAcquired(); |
+ return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
} |
int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
@@ -518,8 +561,9 @@ bool SequencedWorkerPool::Inner::GetWork( |
*task = *i; |
i = pending_tasks_.erase(i); |
pending_task_count_--; |
- if (task->shutdown_behavior == BLOCK_SHUTDOWN) |
+ if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
blocking_shutdown_pending_task_count_--; |
+ } |
found_task = true; |
break; |
@@ -640,6 +684,14 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
new Worker(worker_pool_, thread_number, thread_name_prefix_); |
} |
+void SequencedWorkerPool::Inner::SignalHasWork() { |
+ has_work_cv_.Signal(); |
+ { |
+ AutoLock lock(lock_); |
+ ++has_work_signal_count_; |
+ } |
+} |
+ |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
@@ -665,11 +717,10 @@ void SequencedWorkerPool::OnDestruct() const { |
// TODO(akalin): Once we can easily check if we're on a worker |
// thread or not, use that instead of restricting destruction to |
// only the constructor message loop. |
- if (constructor_message_loop_->BelongsToCurrentThread()) { |
+ if (constructor_message_loop_->BelongsToCurrentThread()) |
delete this; |
- } else { |
+ else |
constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
- } |
} |
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
@@ -748,7 +799,16 @@ void SequencedWorkerPool::FlushForTesting() { |
inner_->FlushForTesting(); |
} |
+void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { |
+ inner_->TriggerSpuriousWorkSignalForTesting(); |
+} |
+ |
+int SequencedWorkerPool::GetWorkSignalCountForTesting() const { |
+ return inner_->GetWorkSignalCountForTesting(); |
+} |
+ |
void SequencedWorkerPool::Shutdown() { |
+ DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
inner_->Shutdown(); |
} |