Index: base/threading/sequenced_worker_pool.cc |
=================================================================== |
--- base/threading/sequenced_worker_pool.cc (revision 128552) |
+++ base/threading/sequenced_worker_pool.cc (working copy) |
@@ -13,6 +13,7 @@ |
#include "base/atomicops.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
+#include "base/lazy_instance.h" |
#include "base/logging.h" |
#include "base/memory/linked_ptr.h" |
#include "base/message_loop_proxy.h" |
@@ -21,6 +22,7 @@ |
#include "base/stringprintf.h" |
#include "base/synchronization/condition_variable.h" |
#include "base/synchronization/lock.h" |
+#include "base/threading/thread_local.h" |
#include "base/threading/platform_thread.h" |
#include "base/threading/simple_thread.h" |
#include "base/time.h" |
@@ -59,12 +61,38 @@ |
// SimpleThread implementation. This actually runs the background thread. |
virtual void Run() OVERRIDE; |
+ |
+ void set_running_sequence(SequenceToken token) { |
+ DCHECK_EQ(current(), this); |
+ running_sequence_ = token; |
+ } |
+ |
+ bool is_running_sequence(SequenceToken token) const { |
+ DCHECK_EQ(current(), this); |
+ return running_sequence_.Equals(token); |
+ } |
+ |
+ SequencedWorkerPool* owner() const { |
+ DCHECK_EQ(current(), this); |
+ return worker_pool_.get(); |
+ } |
+ |
+ static Worker* current() { |
+ return current_worker_tls_.Pointer()->Get(); |
+ } |
+ |
private: |
scoped_refptr<SequencedWorkerPool> worker_pool_; |
+ SequenceToken running_sequence_; |
+ static LazyInstance<ThreadLocalPointer<Worker> > current_worker_tls_; |
DISALLOW_COPY_AND_ASSIGN(Worker); |
}; |
+LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker> > |
+ SequencedWorkerPool::Worker::current_worker_tls_ = |
+ LAZY_INSTANCE_INITIALIZER; |
+ |
// Inner ---------------------------------------------------------------------- |
class SequencedWorkerPool::Inner { |
@@ -92,6 +120,8 @@ |
bool RunsTasksOnCurrentThread() const; |
+ bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
+ |
void FlushForTesting(); |
void SignalHasWorkForTesting(); |
@@ -250,9 +280,13 @@ |
// using DelegateSimpleThread and have Inner implement the Delegate to avoid |
// having these worker objects at all, but that method lacks the ability to |
// send thread-specific information easily to the thread loop. |
+ current_worker_tls_.Pointer()->Set(this); |
worker_pool_->inner_->ThreadLoop(this); |
// Release our cyclic reference once we're done. |
worker_pool_ = NULL; |
+ // Reset TLS after the worker_pool_ is reference is dropped because |
+ // its destructor tests Worker::current(). |
+ current_worker_tls_.Pointer()->Set(NULL); |
} |
// Inner definitions --------------------------------------------------------- |
@@ -346,10 +380,17 @@ |
} |
bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
michaeln
2012/03/25 00:20:02
we could hoist these method bodies into SequencedW
|
- AutoLock lock(lock_); |
- return ContainsKey(threads_, PlatformThread::CurrentId()); |
+ Worker* worker = Worker::current(); |
+ return worker && (worker->owner() == worker_pool_); |
} |
+bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
+ SequenceToken sequence_token) const { |
+ Worker* worker = Worker::current(); |
+ return worker && (worker->owner() == worker_pool_) && |
+ worker->is_running_sequence(sequence_token); |
+} |
+ |
void SequencedWorkerPool::Inner::FlushForTesting() { |
AutoLock lock(lock_); |
while (!IsIdle()) |
@@ -427,8 +468,13 @@ |
if (new_thread_id) |
FinishStartingAdditionalThread(new_thread_id); |
+ this_worker->set_running_sequence( |
+ SequenceToken(task.sequence_token_id)); |
+ |
task.task.Run(); |
+ this_worker->set_running_sequence(SequenceToken()); |
+ |
// Make sure our task is erased outside the lock for the same reason |
// we do this with delete_these_oustide_lock. |
task.task = Closure(); |
@@ -709,7 +755,7 @@ |
DCHECK(constructor_message_loop_.get()); |
// Avoid deleting ourselves on a worker thread (which would |
// deadlock). |
- if (RunsTasksOnCurrentThread()) { |
+ if (Worker::current()) { |
constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
} else { |
delete this; |
@@ -788,6 +834,11 @@ |
return inner_->RunsTasksOnCurrentThread(); |
} |
+bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
+ SequenceToken sequence_token) const { |
+ return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
+} |
+ |
void SequencedWorkerPool::FlushForTesting() { |
inner_->FlushForTesting(); |
} |