Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1783)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 9651026: Clean up condition variable usage in SequencedWorkerPool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address nits Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 62431aec4a7d6450a27014232b46ee3236704c48..d542733291a4351fbfabd2a1abe2bccd3ca5a285 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner {
void FlushForTesting();
+ void TriggerSpuriousWorkSignalForTesting();
+
+ int GetWorkSignalCountForTesting() const;
+
void Shutdown();
void SetTestingObserver(TestingObserver* observer);
@@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner {
void ThreadLoop(Worker* this_worker);
private:
+ // Returns whether there are no more pending tasks and all threads
+ // are idle. Must be called under lock.
+ bool IsIdle() const;
+
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
int LockedGetNamedTokenID(const std::string& name);
@@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner {
// the lock to avoid blocking important work starting a thread in the lock.
void FinishStartingAdditionalThread(int thread_number);
+ // Signal |has_work_| and increment |has_work_signal_count_|.
+ void SignalHasWork();
+
// Checks whether there is work left that's blocking shutdown. Must be
// called inside the lock.
bool CanShutdown() const;
@@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner {
// lock.
mutable Lock lock_;
- // Condition variable used to wake up worker threads when a task is runnable.
- ConditionVariable cond_var_;
+ // Condition variable that is waited on by worker threads until new
+ // tasks are posted or shutdown starts.
+ ConditionVariable has_work_cv_;
+
+ // Number of times |has_work_| has been signalled. Used for testing.
+ int has_work_signal_count_;
+
+ // Condition variable that is waited on by non-worker threads (in
+ // FlushForTesting()) until IsIdle() goes to true.
+ ConditionVariable is_idle_cv_;
+
+ // Condition variable that is waited on by non-worker threads (in
+ // Shutdown()) until CanShutdown() goes to true.
+ ConditionVariable can_shutdown_cv_;
// The maximum number of worker threads we'll create.
const size_t max_threads_;
@@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner(
: worker_pool_(worker_pool),
last_sequence_number_(0),
lock_(),
- cond_var_(&lock_),
+ has_work_cv_(&lock_),
+ has_work_signal_count_(0),
+ is_idle_cv_(&lock_),
+ can_shutdown_cv_(&lock_),
max_threads_(max_threads),
thread_name_prefix_(thread_name_prefix),
thread_being_created_(false),
@@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask(
if (create_thread_id)
FinishStartingAdditionalThread(create_thread_id);
else
- cond_var_.Signal();
+ SignalHasWork();
return true;
}
@@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
}
void SequencedWorkerPool::Inner::FlushForTesting() {
- {
- AutoLock lock(lock_);
- while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
- cond_var_.Wait();
- }
- cond_var_.Signal();
+ AutoLock lock(lock_);
+ while (!IsIdle())
+ is_idle_cv_.Wait();
+}
+
+void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() {
+ SignalHasWork();
+}
+
+int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const {
+ AutoLock lock(lock_);
+ return has_work_signal_count_;
}
void SequencedWorkerPool::Inner::Shutdown() {
@@ -351,27 +383,25 @@ void SequencedWorkerPool::Inner::Shutdown() {
// Tickle the threads. This will wake up a waiting one so it will know that
// it can exit, which in turn will wake up any other waiting ones.
- cond_var_.Signal();
+ has_work_cv_.Signal();
// There are no pending or running tasks blocking shutdown, we're done.
if (CanShutdown())
return;
}
- // If we get here, we know we're either waiting on a blocking task that's
- // currently running, waiting on a blocking task that hasn't been scheduled
- // yet, or both. Block on the "queue empty" event to know when all tasks are
- // complete. This must be done outside the lock.
+ // If we're here, then something is blocking shutdown. So wait for
+ // CanShutdown() to go to true.
+
if (testing_observer_)
testing_observer_->WillWaitForShutdown();
TimeTicks shutdown_wait_begin = TimeTicks::Now();
- // Wait for no more tasks.
{
AutoLock lock(lock_);
while (!CanShutdown())
- cond_var_.Wait();
+ can_shutdown_cv_.Wait();
}
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
TimeTicks::Now() - shutdown_wait_begin);
@@ -401,7 +431,11 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
int new_thread_id = WillRunWorkerTask(task);
{
AutoUnlock unlock(lock_);
- cond_var_.Signal();
+ // There may be more work available, so wake up another
+ // worker thread. (Technically not required, since we
+ // already get a signal for each new task, but it doesn't
+ // hurt.)
+ has_work_cv_.Signal();
delete_these_outside_lock.clear();
// Complete thread creation outside the lock if necessary.
@@ -424,17 +458,26 @@ void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
if (shutdown_called_)
break;
waiting_thread_count_++;
- cond_var_.Signal(); // For Flush() that may be waiting on the
- // waiting thread count to go up.
- cond_var_.Wait();
+ // This is the only time that IsIdle() can go to true.
+ if (IsIdle())
+ is_idle_cv_.Signal();
+ has_work_cv_.Wait();
waiting_thread_count_--;
}
}
- }
+ } // Release lock_.
// We noticed we should exit. Wake up the next worker so it knows it should
// exit as well (because the Shutdown() code only signals once).
- cond_var_.Signal();
+ has_work_cv_.Signal();
+
+ // Possibly unblock shutdown.
+ can_shutdown_cv_.Signal();
+}
+
+bool SequencedWorkerPool::Inner::IsIdle() const {
+ lock_.AssertAcquired();
+ return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size();
}
int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
@@ -518,8 +561,9 @@ bool SequencedWorkerPool::Inner::GetWork(
*task = *i;
i = pending_tasks_.erase(i);
pending_task_count_--;
- if (task->shutdown_behavior == BLOCK_SHUTDOWN)
+ if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
blocking_shutdown_pending_task_count_--;
+ }
found_task = true;
break;
@@ -640,6 +684,14 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
+void SequencedWorkerPool::Inner::SignalHasWork() {
+ has_work_cv_.Signal();
+ {
+ AutoLock lock(lock_);
+ ++has_work_signal_count_;
+ }
+}
+
bool SequencedWorkerPool::Inner::CanShutdown() const {
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
@@ -665,11 +717,10 @@ void SequencedWorkerPool::OnDestruct() const {
// TODO(akalin): Once we can easily check if we're on a worker
// thread or not, use that instead of restricting destruction to
// only the constructor message loop.
- if (constructor_message_loop_->BelongsToCurrentThread()) {
+ if (constructor_message_loop_->BelongsToCurrentThread())
delete this;
- } else {
+ else
constructor_message_loop_->DeleteSoon(FROM_HERE, this);
- }
}
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
@@ -748,7 +799,16 @@ void SequencedWorkerPool::FlushForTesting() {
inner_->FlushForTesting();
}
+void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() {
+ inner_->TriggerSpuriousWorkSignalForTesting();
+}
+
+int SequencedWorkerPool::GetWorkSignalCountForTesting() const {
+ return inner_->GetWorkSignalCountForTesting();
+}
+
void SequencedWorkerPool::Shutdown() {
+ DCHECK(constructor_message_loop_->BelongsToCurrentThread());
inner_->Shutdown();
}
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698