Index: base/threading/sequenced_worker_pool.cc |
diff --git a/base/threading/sequenced_worker_pool.cc b/base/threading/sequenced_worker_pool.cc |
index 62431aec4a7d6450a27014232b46ee3236704c48..f43eb3c758a8d9e870556e2ed602f83668bd9f73 100644 |
--- a/base/threading/sequenced_worker_pool.cc |
+++ b/base/threading/sequenced_worker_pool.cc |
@@ -93,6 +93,10 @@ class SequencedWorkerPool::Inner { |
void FlushForTesting(); |
+ void TriggerSpuriousWorkSignalForTesting(); |
+ |
+ int GetWorkSignalCountForTesting() const; |
+ |
void Shutdown(); |
void SetTestingObserver(TestingObserver* observer); |
@@ -101,6 +105,10 @@ class SequencedWorkerPool::Inner { |
void ThreadLoop(Worker* this_worker); |
private: |
+ // Returns whether there are no more pending tasks and all threads |
+ // are idle. Must be called under lock. |
+ bool IsIdle() const; |
+ |
// Called from within the lock, this converts the given token name into a |
// token ID, creating a new one if necessary. |
int LockedGetNamedTokenID(const std::string& name); |
@@ -142,6 +150,9 @@ class SequencedWorkerPool::Inner { |
// the lock to avoid blocking important work starting a thread in the lock. |
void FinishStartingAdditionalThread(int thread_number); |
+ // Signal |has_work_| and increment |has_work_signal_count_|. |
+ void SignalHasWork(); |
+ |
// Checks whether there is work left that's blocking shutdown. Must be |
// called inside the lock. |
bool CanShutdown() const; |
@@ -158,8 +169,20 @@ class SequencedWorkerPool::Inner { |
// lock. |
mutable Lock lock_; |
- // Condition variable used to wake up worker threads when a task is runnable. |
- ConditionVariable cond_var_; |
+ // Condition variable that is signalled whenever a new task is |
+ // posted or when Shutdown() is called. |
+ ConditionVariable has_work_cv_; |
+ |
+ // Number of times |has_work_| has been signalled. Used for testing. |
+ int has_work_signal_count_; |
+ |
+ // Condition variable that is signalled whenever IsIdle() goes to |
+ // true. |
+ ConditionVariable is_idle_cv_; |
+ |
+ // Condition variable that is signalled whwnever CanShutdown() goes |
+ // to true. |
+ ConditionVariable can_shutdown_cv_; |
// The maximum number of worker threads we'll create. |
const size_t max_threads_; |
@@ -245,7 +268,10 @@ SequencedWorkerPool::Inner::Inner( |
: worker_pool_(worker_pool), |
last_sequence_number_(0), |
lock_(), |
- cond_var_(&lock_), |
+ has_work_cv_(&lock_), |
+ has_work_signal_count_(0), |
+ is_idle_cv_(&lock_), |
+ can_shutdown_cv_(&lock_), |
max_threads_(max_threads), |
thread_name_prefix_(thread_name_prefix), |
thread_being_created_(false), |
@@ -318,7 +344,7 @@ bool SequencedWorkerPool::Inner::PostTask( |
if (create_thread_id) |
FinishStartingAdditionalThread(create_thread_id); |
else |
- cond_var_.Signal(); |
+ SignalHasWork(); |
return true; |
} |
@@ -329,12 +355,18 @@ bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
} |
void SequencedWorkerPool::Inner::FlushForTesting() { |
- { |
- AutoLock lock(lock_); |
- while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
- cond_var_.Wait(); |
- } |
- cond_var_.Signal(); |
+ AutoLock lock(lock_); |
+ while (!IsIdle()) |
+ is_idle_cv_.Wait(); |
+} |
+ |
+void SequencedWorkerPool::Inner::TriggerSpuriousWorkSignalForTesting() { |
+ SignalHasWork(); |
+} |
+ |
+int SequencedWorkerPool::Inner::GetWorkSignalCountForTesting() const { |
+ AutoLock lock(lock_); |
+ return has_work_signal_count_; |
} |
void SequencedWorkerPool::Inner::Shutdown() { |
jar (doing other things)
2012/03/12 17:39:13
IMO, it would be helpful if you commented, or asse
akalin
2012/03/12 18:34:34
Done. (Added comment, and assert is in SWP::Shutdo
|
@@ -349,9 +381,8 @@ void SequencedWorkerPool::Inner::Shutdown() { |
return; |
shutdown_called_ = true; |
- // Tickle the threads. This will wake up a waiting one so it will know that |
- // it can exit, which in turn will wake up any other waiting ones. |
- cond_var_.Signal(); |
+ // Wake up all waiting threads. |
+ has_work_cv_.Broadcast(); |
jar (doing other things)
2012/03/12 17:39:13
I liked the original code better, relying on signa
akalin
2012/03/12 18:34:34
Okay, moved back to signal chain.
|
// There are no pending or running tasks blocking shutdown, we're done. |
if (CanShutdown()) |
@@ -371,7 +402,7 @@ void SequencedWorkerPool::Inner::Shutdown() { |
{ |
AutoLock lock(lock_); |
while (!CanShutdown()) |
- cond_var_.Wait(); |
+ can_shutdown_cv_.Wait(); |
} |
UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", |
TimeTicks::Now() - shutdown_wait_begin); |
@@ -384,57 +415,59 @@ void SequencedWorkerPool::Inner::SetTestingObserver( |
} |
void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
- { |
jar (doing other things)
2012/03/12 17:39:13
nit: You are dead right that this curly is not nee
akalin
2012/03/12 18:34:34
Done. (It ended up being needed anyway when addres
|
- AutoLock lock(lock_); |
- DCHECK(thread_being_created_); |
- thread_being_created_ = false; |
- std::pair<ThreadMap::iterator, bool> result = |
- threads_.insert( |
- std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
- DCHECK(result.second); |
- |
- while (true) { |
- // See GetWork for what delete_these_outside_lock is doing. |
- SequencedTask task; |
- std::vector<Closure> delete_these_outside_lock; |
- if (GetWork(&task, &delete_these_outside_lock)) { |
- int new_thread_id = WillRunWorkerTask(task); |
- { |
- AutoUnlock unlock(lock_); |
- cond_var_.Signal(); |
jar (doing other things)
2012/03/12 17:39:13
This signal is basically asking that anyone else t
akalin
2012/03/12 18:34:34
Hmm. If every task posted causes a signal, then w
jar (doing other things)
2012/03/12 19:42:36
I *think* you are correct. This is a slightly (IM
akalin
2012/03/12 20:42:15
I see. Okay, I just added a comment as to why it'
|
- delete_these_outside_lock.clear(); |
- |
- // Complete thread creation outside the lock if necessary. |
- if (new_thread_id) |
- FinishStartingAdditionalThread(new_thread_id); |
- |
- task.task.Run(); |
- |
- // Make sure our task is erased outside the lock for the same reason |
- // we do this with delete_these_oustide_lock. |
- task.task = Closure(); |
- } |
- DidRunWorkerTask(task); // Must be done inside the lock. |
- } else { |
- // When we're terminating and there's no more work, we can |
- // shut down. You can't get more tasks posted once |
- // shutdown_called_ is set. There may be some tasks stuck |
- // behind running ones with the same sequence token, but |
- // additional threads won't help this case. |
- if (shutdown_called_) |
- break; |
- waiting_thread_count_++; |
- cond_var_.Signal(); // For Flush() that may be waiting on the |
- // waiting thread count to go up. |
- cond_var_.Wait(); |
- waiting_thread_count_--; |
+ AutoLock lock(lock_); |
+ DCHECK(thread_being_created_); |
+ // This can make CanShutdown() go to true. |
+ thread_being_created_ = false; |
+ if (CanShutdown()) |
jar (doing other things)
2012/03/12 17:39:13
I don't think anyone cares about CanShutdown() unl
akalin
2012/03/12 18:34:34
Done. Moved can_shutdown_cv_ signal to end of blo
|
+ can_shutdown_cv_.Signal(); |
+ std::pair<ThreadMap::iterator, bool> result = |
+ threads_.insert( |
+ std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
+ DCHECK(result.second); |
+ |
+ while (true) { |
+ // See GetWork for what delete_these_outside_lock is doing. |
+ SequencedTask task; |
+ std::vector<Closure> delete_these_outside_lock; |
+ if (GetWork(&task, &delete_these_outside_lock)) { |
+ int new_thread_id = WillRunWorkerTask(task); |
+ { |
+ AutoUnlock unlock(lock_); |
+ delete_these_outside_lock.clear(); |
+ |
+ // Complete thread creation outside the lock if necessary. |
+ if (new_thread_id) |
+ FinishStartingAdditionalThread(new_thread_id); |
+ |
+ task.task.Run(); |
+ |
+ // Make sure our task is erased outside the lock for the same reason |
+ // we do this with delete_these_oustide_lock. |
+ task.task = Closure(); |
} |
+ DidRunWorkerTask(task); // Must be done inside the lock. |
+ } else { |
+ // When we're terminating and there's no more work, we can |
+ // shut down. You can't get more tasks posted once |
+ // shutdown_called_ is set. There may be some tasks stuck |
+ // behind running ones with the same sequence token, but |
+ // additional threads won't help this case. |
+ if (shutdown_called_) |
+ break; |
+ waiting_thread_count_++; |
+ // This is the only time that IsIdle() can go to true. |
+ if (IsIdle()) |
+ is_idle_cv_.Signal(); |
+ has_work_cv_.Wait(); |
+ waiting_thread_count_--; |
} |
} |
+} |
- // We noticed we should exit. Wake up the next worker so it knows it should |
- // exit as well (because the Shutdown() code only signals once). |
- cond_var_.Signal(); |
jar (doing other things)
2012/03/12 17:39:13
This is another fairly critical Signal() call. Thi
akalin
2012/03/12 18:34:34
This was deleted because of the change to using Br
|
+bool SequencedWorkerPool::Inner::IsIdle() const { |
+ lock_.AssertAcquired(); |
+ return pending_task_count_ == 0 && waiting_thread_count_ == threads_.size(); |
} |
int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
@@ -518,8 +551,12 @@ bool SequencedWorkerPool::Inner::GetWork( |
*task = *i; |
i = pending_tasks_.erase(i); |
pending_task_count_--; |
- if (task->shutdown_behavior == BLOCK_SHUTDOWN) |
+ if (task->shutdown_behavior == BLOCK_SHUTDOWN) { |
+ // This can make CanShutdown() go to true. |
blocking_shutdown_pending_task_count_--; |
+ if (CanShutdown()) |
+ can_shutdown_cv_.Signal(); |
jar (doing other things)
2012/03/12 17:39:13
Again, I don't like seeing special case code inter
akalin
2012/03/12 18:34:34
Done (see above)
|
+ } |
found_task = true; |
break; |
@@ -567,7 +604,10 @@ void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { |
if (task.shutdown_behavior == BLOCK_SHUTDOWN) { |
DCHECK_GT(blocking_shutdown_thread_count_, 0u); |
+ // This can make CanShutdown() go to true. |
blocking_shutdown_thread_count_--; |
+ if (CanShutdown()) |
+ can_shutdown_cv_.Signal(); |
jar (doing other things)
2012/03/12 17:39:13
Again, please don't add special case code for this
akalin
2012/03/12 18:34:34
Done (see above)
|
} |
if (task.sequence_token_id) |
@@ -640,6 +680,11 @@ void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( |
new Worker(worker_pool_, thread_number, thread_name_prefix_); |
} |
+void SequencedWorkerPool::Inner::SignalHasWork() { |
+ has_work_cv_.Signal(); |
+ ++has_work_signal_count_; |
jar (doing other things)
2012/03/12 17:39:13
You are not generally inside a lock, but are manip
akalin
2012/03/12 18:34:34
My mistake. This testing variable is kind of hack
|
+} |
+ |
bool SequencedWorkerPool::Inner::CanShutdown() const { |
lock_.AssertAcquired(); |
// See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. |
@@ -665,11 +710,10 @@ void SequencedWorkerPool::OnDestruct() const { |
// TODO(akalin): Once we can easily check if we're on a worker |
// thread or not, use that instead of restricting destruction to |
// only the constructor message loop. |
- if (constructor_message_loop_->BelongsToCurrentThread()) { |
+ if (constructor_message_loop_->BelongsToCurrentThread()) |
delete this; |
- } else { |
+ else |
constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
- } |
} |
SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
@@ -748,6 +792,14 @@ void SequencedWorkerPool::FlushForTesting() { |
inner_->FlushForTesting(); |
} |
+void SequencedWorkerPool::TriggerSpuriousWorkSignalForTesting() { |
+ inner_->TriggerSpuriousWorkSignalForTesting(); |
+} |
+ |
+int SequencedWorkerPool::GetWorkSignalCountForTesting() const { |
+ return inner_->GetWorkSignalCountForTesting(); |
+} |
+ |
void SequencedWorkerPool::Shutdown() { |
inner_->Shutdown(); |
} |