Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2513)

Unified Diff: base/threading/sequenced_worker_pool.cc

Issue 9651026: Clean up condition variable usage in SequencedWorkerPool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/threading/sequenced_worker_pool.cc
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc
index 62431aec4a7d6450a27014232b46ee3236704c48..f43eb3c758a8d9e870556e2ed602f83668bd9f73 100644
--- a/base/threading/sequenced_worker_pool.cc
+++ b/base/threading/sequenced_worker_pool.cc
@@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner {
void FlushForTesting();
+ void TriggerSpuriousWorkSignalForTesting();
+
+ int GetWorkSignalCountForTesting() const;
+
void Shutdown();
void SetTestingObserver(TestingObserver* observer);
@@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner {
void ThreadLoop(Worker* this_worker);
private:
+ // Returns whether there are no more pending tasks and all threads
+ // are idle. Must be called under lock.
+ bool IsIdle() const;
+
// Called from within the lock, this converts the given token name into a
// token ID, creating a new one if necessary.
int LockedGetNamedTokenID(const std::string& name);
@@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner {
// the lock to avoid blocking important work starting a thread in the lock.
void FinishStartingAdditionalThread(int thread_number);
+ // Signal |has_work_| and increment |has_work_signal_count_|.
+ void SignalHasWork();
+
// Checks whether there is work left that's blocking shutdown. Must be
// called inside the lock.
bool CanShutdown() const;
@@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner {
// lock.
mutable Lock lock_;
- // Condition variable used to wake up worker threads when a task is runnable.
- ConditionVariable cond_var_;
+ // Condition variable that is signalled whenever a new task is
+ // posted or when Shutdown() is called.
+ ConditionVariable has_work_cv_;
+
+ // Number of times |has_work_| has been signalled. Used for testing.
+ int has_work_signal_count_;
+
+ // Condition variable that is signalled whenever IsIdle() goes to
+ // true.
+ ConditionVariable is_idle_cv_;
+
+ // Condition variable that is signalled whwnever CanShutdown() goes
+ // to true.
+ ConditionVariable can_shutdown_cv_;
// The maximum number of worker threads we'll create.
const size_t max_threads_;
@@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner(
: worker_pool_(worker_pool),
last_sequence_number_(0),
lock_(),
- cond_var_(&lock_),
+ has_work_cv_(&lock_),
+ has_work_signal_count_(0),
+ is_idle_cv_(&lock_),
+ can_shutdown_cv_(&lock_),
max_threads_(max_threads),
thread_name_prefix_(thread_name_prefix),
thread_being_created_(false),
@@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask(
if (create_thread_id)
FinishStartingAdditionalThread(create_thread_id);
else
- cond_var_.Signal();
+ SignalHasWork();
return true;
}
@@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
}
void SequencedWorkerPool::Inner::FlushForTesting() {
- {
- AutoLock lock(lock_);
- while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
- cond_var_.Wait();
- }
- cond_var_.Signal();
+ AutoLock lock(lock_);
+ while (!IsIdle())
+ is_idle_cv_.Wait();
+}
+
+void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() {
+ SignalHasWork();
+}
+
+int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const {
+ AutoLock lock(lock_);
+ return has_work_signal_count_;
}
void SequencedWorkerPool::Inner::Shutdown() {
jar (doing other things) 2012/03/12 17:39:13 IMO, it would be helpful if you commented, or asse
akalin 2012/03/12 18:34:34 Done. (Added comment, and assert is in SWP::Shutdo
@@ -349,9 +381,8 @@ void SequencedWorkerPool::Inner::Shutdown() {
return;
shutdown_called_ = true;
- // Tickle the threads. This will wake up a waiting one so it will know that
- // it can exit, which in turn will wake up any other waiting ones.
- cond_var_.Signal();
+ // Wake up all waiting threads.
+ has_work_cv_.Broadcast();
jar (doing other things) 2012/03/12 17:39:13 I liked the original code better, relying on signa
akalin 2012/03/12 18:34:34 Okay, moved back to signal chain.
// There are no pending or running tasks blocking shutdown, we're done.
if (CanShutdown())
@@ -371,7 +402,7 @@ void SequencedWorkerPool::Inner::Shutdown() {
{
AutoLock lock(lock_);
while (!CanShutdown())
- cond_var_.Wait();
+ can_shutdown_cv_.Wait();
}
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
TimeTicks::Now() - shutdown_wait_begin);
@@ -384,57 +415,59 @@ void SequencedWorkerPool::Inner::SetTestingObserver(
}
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
- {
jar (doing other things) 2012/03/12 17:39:13 nit: You are dead right that this curly is not nee
akalin 2012/03/12 18:34:34 Done. (It ended up being needed anyway when addres
- AutoLock lock(lock_);
- DCHECK(thread_being_created_);
- thread_being_created_ = false;
- std::pair<ThreadMap::iterator, bool> result =
- threads_.insert(
- std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
- DCHECK(result.second);
-
- while (true) {
- // See GetWork for what delete_these_outside_lock is doing.
- SequencedTask task;
- std::vector<Closure> delete_these_outside_lock;
- if (GetWork(&task, &delete_these_outside_lock)) {
- int new_thread_id = WillRunWorkerTask(task);
- {
- AutoUnlock unlock(lock_);
- cond_var_.Signal();
jar (doing other things) 2012/03/12 17:39:13 This signal is basically asking that anyone else t
akalin 2012/03/12 18:34:34 Hmm. If every task posted causes a signal, then w
jar (doing other things) 2012/03/12 19:42:36 I *think* you are correct. This is a slightly (IM
akalin 2012/03/12 20:42:15 I see. Okay, I just added a comment as to why it'
- delete_these_outside_lock.clear();
-
- // Complete thread creation outside the lock if necessary.
- if (new_thread_id)
- FinishStartingAdditionalThread(new_thread_id);
-
- task.task.Run();
-
- // Make sure our task is erased outside the lock for the same reason
- // we do this with delete_these_oustide_lock.
- task.task = Closure();
- }
- DidRunWorkerTask(task); // Must be done inside the lock.
- } else {
- // When we're terminating and there's no more work, we can
- // shut down. You can't get more tasks posted once
- // shutdown_called_ is set. There may be some tasks stuck
- // behind running ones with the same sequence token, but
- // additional threads won't help this case.
- if (shutdown_called_)
- break;
- waiting_thread_count_++;
- cond_var_.Signal(); // For Flush() that may be waiting on the
- // waiting thread count to go up.
- cond_var_.Wait();
- waiting_thread_count_--;
+ AutoLock lock(lock_);
+ DCHECK(thread_being_created_);
+ // This can make CanShutdown() go to true.
+ thread_being_created_ = false;
+ if (CanShutdown())
jar (doing other things) 2012/03/12 17:39:13 I don't think anyone cares about CanShutdown() unl
akalin 2012/03/12 18:34:34 Done. Moved can_shutdown_cv_ signal to end of blo
+ can_shutdown_cv_.Signal();
+ std::pair<ThreadMap::iterator, bool> result =
+ threads_.insert(
+ std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
+ DCHECK(result.second);
+
+ while (true) {
+ // See GetWork for what delete_these_outside_lock is doing.
+ SequencedTask task;
+ std::vector<Closure> delete_these_outside_lock;
+ if (GetWork(&task, &delete_these_outside_lock)) {
+ int new_thread_id = WillRunWorkerTask(task);
+ {
+ AutoUnlock unlock(lock_);
+ delete_these_outside_lock.clear();
+
+ // Complete thread creation outside the lock if necessary.
+ if (new_thread_id)
+ FinishStartingAdditionalThread(new_thread_id);
+
+ task.task.Run();
+
+ // Make sure our task is erased outside the lock for the same reason
+ // we do this with delete_these_oustide_lock.
+ task.task = Closure();
}
+ DidRunWorkerTask(task); // Must be done inside the lock.
+ } else {
+ // When we're terminating and there's no more work, we can
+ // shut down. You can't get more tasks posted once
+ // shutdown_called_ is set. There may be some tasks stuck
+ // behind running ones with the same sequence token, but
+ // additional threads won't help this case.
+ if (shutdown_called_)
+ break;
+ waiting_thread_count_++;
+ // This is the only time that IsIdle() can go to true.
+ if (IsIdle())
+ is_idle_cv_.Signal();
+ has_work_cv_.Wait();
+ waiting_thread_count_--;
}
}
+}
- // We noticed we should exit. Wake up the next worker so it knows it should
- // exit as well (because the Shutdown() code only signals once).
- cond_var_.Signal();
jar (doing other things) 2012/03/12 17:39:13 This is another fairly critical Signal() call. Thi
akalin 2012/03/12 18:34:34 This was deleted because of the change to using Br
+bool SequencedWorkerPool::Inner::IsIdle() const {
+ lock_.AssertAcquired();
+ return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size();
}
int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
@@ -518,8 +551,12 @@ bool SequencedWorkerPool::Inner::GetWork(
*task = *i;
i = pending_tasks_.erase(i);
pending_task_count_--;
- if (task->shutdown_behavior == BLOCK_SHUTDOWN)
+ if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
+ // This can make CanShutdown() go to true.
blocking_shutdown_pending_task_count_--;
+ if (CanShutdown())
+ can_shutdown_cv_.Signal();
jar (doing other things) 2012/03/12 17:39:13 Again, I don't like seeing special case code inter
akalin 2012/03/12 18:34:34 Done (see above)
+ }
found_task = true;
break;
@@ -567,7 +604,10 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
if (task.shutdown_behavior == BLOCK_SHUTDOWN) {
DCHECK_GT(blocking_shutdown_thread_count_, 0u);
+ // This can make CanShutdown() go to true.
blocking_shutdown_thread_count_--;
+ if (CanShutdown())
+ can_shutdown_cv_.Signal();
jar (doing other things) 2012/03/12 17:39:13 Again, please don't add special case code for this
akalin 2012/03/12 18:34:34 Done (see above)
}
if (task.sequence_token_id)
@@ -640,6 +680,11 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
new Worker(worker_pool_, thread_number, thread_name_prefix_);
}
+void SequencedWorkerPool::Inner::SignalHasWork() {
+ has_work_cv_.Signal();
+ ++has_work_signal_count_;
jar (doing other things) 2012/03/12 17:39:13 You are not generally inside a lock, but are manip
akalin 2012/03/12 18:34:34 My mistake. This testing variable is kind of hack
+}
+
bool SequencedWorkerPool::Inner::CanShutdown() const {
lock_.AssertAcquired();
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
@@ -665,11 +710,10 @@ void SequencedWorkerPool::OnDestruct() const {
// TODO(akalin): Once we can easily check if we're on a worker
// thread or not, use that instead of restricting destruction to
// only the constructor message loop.
- if (constructor_message_loop_->BelongsToCurrentThread()) {
+ if (constructor_message_loop_->BelongsToCurrentThread())
delete this;
- } else {
+ else
constructor_message_loop_->DeleteSoon(FROM_HERE, this);
- }
}
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
@@ -748,6 +792,14 @@ void SequencedWorkerPool::FlushForTesting() {
inner_->FlushForTesting();
}
+void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() {
+ inner_->TriggerSpuriousWorkSignalForTesting();
+}
+
+int SequencedWorkerPool::GetWorkSignalCountForTesting() const {
+ return inner_->GetWorkSignalCountForTesting();
+}
+
void SequencedWorkerPool::Shutdown() {
inner_->Shutdown();
}

Powered by Google App Engine
This is Rietveld 408576698