Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(242)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9845037: Add SequencedWorkerPool.IsRunningSequenceOnCurrentThread so callers can make stronger assertions ab… (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h" 13 #include "base/atomicops.h"
14 #include "base/callback.h" 14 #include "base/callback.h"
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/lazy_instance.h"
16 #include "base/logging.h" 17 #include "base/logging.h"
17 #include "base/memory/linked_ptr.h" 18 #include "base/memory/linked_ptr.h"
18 #include "base/message_loop_proxy.h" 19 #include "base/message_loop_proxy.h"
19 #include "base/metrics/histogram.h" 20 #include "base/metrics/histogram.h"
20 #include "base/stl_util.h" 21 #include "base/stl_util.h"
21 #include "base/stringprintf.h" 22 #include "base/stringprintf.h"
22 #include "base/synchronization/condition_variable.h" 23 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h" 24 #include "base/synchronization/lock.h"
25 #include "base/threading/thread_local.h"
24 #include "base/threading/platform_thread.h" 26 #include "base/threading/platform_thread.h"
25 #include "base/threading/simple_thread.h" 27 #include "base/threading/simple_thread.h"
26 #include "base/time.h" 28 #include "base/time.h"
27 #include "base/tracked_objects.h" 29 #include "base/tracked_objects.h"
28 30
29 namespace base { 31 namespace base {
30 32
31 namespace { 33 namespace {
32 34
33 struct SequencedTask { 35 struct SequencedTask {
(...skipping 18 matching lines...) Expand all
52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 54 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
53 // around as long as we are running. 55 // around as long as we are running.
54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 56 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
55 int thread_number, 57 int thread_number,
56 const std::string& thread_name_prefix); 58 const std::string& thread_name_prefix);
57 virtual ~Worker(); 59 virtual ~Worker();
58 60
59 // SimpleThread implementation. This actually runs the background thread. 61 // SimpleThread implementation. This actually runs the background thread.
60 virtual void Run() OVERRIDE; 62 virtual void Run() OVERRIDE;
61 63
64
65 void set_running_sequence(SequenceToken token) {
66 DCHECK_EQ(current(), this);
67 running_sequence_ = token;
68 }
69
70 bool is_running_sequence(SequenceToken token) const {
71 DCHECK_EQ(current(), this);
72 return running_sequence_.Equals(token);
73 }
74
75 SequencedWorkerPool* owner() const {
76 DCHECK_EQ(current(), this);
77 return worker_pool_.get();
78 }
79
80 static Worker* current() {
81 return current_worker_tls_.Pointer()->Get();
82 }
83
62 private: 84 private:
63 scoped_refptr<SequencedWorkerPool> worker_pool_; 85 scoped_refptr<SequencedWorkerPool> worker_pool_;
86 SequenceToken running_sequence_;
87 static LazyInstance<ThreadLocalPointer<Worker> > current_worker_tls_;
64 88
65 DISALLOW_COPY_AND_ASSIGN(Worker); 89 DISALLOW_COPY_AND_ASSIGN(Worker);
66 }; 90 };
67 91
92 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker> >
93 SequencedWorkerPool::Worker::current_worker_tls_ =
94 LAZY_INSTANCE_INITIALIZER;
95
68 // Inner ---------------------------------------------------------------------- 96 // Inner ----------------------------------------------------------------------
69 97
70 class SequencedWorkerPool::Inner { 98 class SequencedWorkerPool::Inner {
71 public: 99 public:
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned 100 // Take a raw pointer to |worker| to avoid cycles (since we're owned
73 // by it). 101 // by it).
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 102 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
75 const std::string& thread_name_prefix, 103 const std::string& thread_name_prefix,
76 TestingObserver* observer); 104 TestingObserver* observer);
77 105
78 ~Inner(); 106 ~Inner();
79 107
80 SequenceToken GetSequenceToken(); 108 SequenceToken GetSequenceToken();
81 109
82 SequenceToken GetNamedSequenceToken(const std::string& name); 110 SequenceToken GetNamedSequenceToken(const std::string& name);
83 111
84 // This function accepts a name and an ID. If the name is null, the 112 // This function accepts a name and an ID. If the name is null, the
85 // token ID is used. This allows us to implement the optional name lookup 113 // token ID is used. This allows us to implement the optional name lookup
86 // from a single function without having to enter the lock a separate time. 114 // from a single function without having to enter the lock a separate time.
87 bool PostTask(const std::string* optional_token_name, 115 bool PostTask(const std::string* optional_token_name,
88 SequenceToken sequence_token, 116 SequenceToken sequence_token,
89 WorkerShutdown shutdown_behavior, 117 WorkerShutdown shutdown_behavior,
90 const tracked_objects::Location& from_here, 118 const tracked_objects::Location& from_here,
91 const Closure& task); 119 const Closure& task);
92 120
93 bool RunsTasksOnCurrentThread() const; 121 bool RunsTasksOnCurrentThread() const;
94 122
123 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
124
95 void FlushForTesting(); 125 void FlushForTesting();
96 126
97 void SignalHasWorkForTesting(); 127 void SignalHasWorkForTesting();
98 128
99 int GetWorkSignalCountForTesting() const; 129 int GetWorkSignalCountForTesting() const;
100 130
101 void Shutdown(); 131 void Shutdown();
102 132
103 // Runs the worker loop on the background thread. 133 // Runs the worker loop on the background thread.
104 void ThreadLoop(Worker* this_worker); 134 void ThreadLoop(Worker* this_worker);
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after
243 273
244 SequencedWorkerPool::Worker::~Worker() { 274 SequencedWorkerPool::Worker::~Worker() {
245 } 275 }
246 276
247 void SequencedWorkerPool::Worker::Run() { 277 void SequencedWorkerPool::Worker::Run() {
248 // Just jump back to the Inner object to run the thread, since it has all the 278 // Just jump back to the Inner object to run the thread, since it has all the
249 // tracking information and queues. It might be more natural to implement 279 // tracking information and queues. It might be more natural to implement
250 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 280 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
251 // having these worker objects at all, but that method lacks the ability to 281 // having these worker objects at all, but that method lacks the ability to
252 // send thread-specific information easily to the thread loop. 282 // send thread-specific information easily to the thread loop.
283 current_worker_tls_.Pointer()->Set(this);
253 worker_pool_->inner_->ThreadLoop(this); 284 worker_pool_->inner_->ThreadLoop(this);
254 // Release our cyclic reference once we're done. 285 // Release our cyclic reference once we're done.
255 worker_pool_ = NULL; 286 worker_pool_ = NULL;
287 // Reset TLS after the worker_pool_ is reference is dropped because
288 // its destructor tests Worker::current().
289 current_worker_tls_.Pointer()->Set(NULL);
256 } 290 }
257 291
258 // Inner definitions --------------------------------------------------------- 292 // Inner definitions ---------------------------------------------------------
259 293
260 SequencedWorkerPool::Inner::Inner( 294 SequencedWorkerPool::Inner::Inner(
261 SequencedWorkerPool* worker_pool, 295 SequencedWorkerPool* worker_pool,
262 size_t max_threads, 296 size_t max_threads,
263 const std::string& thread_name_prefix, 297 const std::string& thread_name_prefix,
264 TestingObserver* observer) 298 TestingObserver* observer)
265 : worker_pool_(worker_pool), 299 : worker_pool_(worker_pool),
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
338 // Actually start the additional thread or signal an existing one now that 372 // Actually start the additional thread or signal an existing one now that
339 // we're outside the lock. 373 // we're outside the lock.
340 if (create_thread_id) 374 if (create_thread_id)
341 FinishStartingAdditionalThread(create_thread_id); 375 FinishStartingAdditionalThread(create_thread_id);
342 else 376 else
343 SignalHasWork(); 377 SignalHasWork();
344 378
345 return true; 379 return true;
346 } 380 }
347 381
348 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 382 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
michaeln 2012/03/25 00:20:02 we could hoist these method bodies into SequencedW
349 AutoLock lock(lock_); 383 Worker* worker = Worker::current();
350 return ContainsKey(threads_, PlatformThread::CurrentId()); 384 return worker && (worker->owner() == worker_pool_);
385 }
386
387 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
388 SequenceToken sequence_token) const {
389 Worker* worker = Worker::current();
390 return worker && (worker->owner() == worker_pool_) &&
391 worker->is_running_sequence(sequence_token);
351 } 392 }
352 393
353 void SequencedWorkerPool::Inner::FlushForTesting() { 394 void SequencedWorkerPool::Inner::FlushForTesting() {
354 AutoLock lock(lock_); 395 AutoLock lock(lock_);
355 while (!IsIdle()) 396 while (!IsIdle())
356 is_idle_cv_.Wait(); 397 is_idle_cv_.Wait();
357 } 398 }
358 399
359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 400 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
360 SignalHasWork(); 401 SignalHasWork();
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
420 // worker thread. (Technically not required, since we 461 // worker thread. (Technically not required, since we
421 // already get a signal for each new task, but it doesn't 462 // already get a signal for each new task, but it doesn't
422 // hurt.) 463 // hurt.)
423 SignalHasWork(); 464 SignalHasWork();
424 delete_these_outside_lock.clear(); 465 delete_these_outside_lock.clear();
425 466
426 // Complete thread creation outside the lock if necessary. 467 // Complete thread creation outside the lock if necessary.
427 if (new_thread_id) 468 if (new_thread_id)
428 FinishStartingAdditionalThread(new_thread_id); 469 FinishStartingAdditionalThread(new_thread_id);
429 470
471 this_worker->set_running_sequence(
472 SequenceToken(task.sequence_token_id));
473
430 task.task.Run(); 474 task.task.Run();
431 475
476 this_worker->set_running_sequence(SequenceToken());
477
432 // Make sure our task is erased outside the lock for the same reason 478 // Make sure our task is erased outside the lock for the same reason
433 // we do this with delete_these_oustide_lock. 479 // we do this with delete_these_oustide_lock.
434 task.task = Closure(); 480 task.task = Closure();
435 } 481 }
436 DidRunWorkerTask(task); // Must be done inside the lock. 482 DidRunWorkerTask(task); // Must be done inside the lock.
437 } else { 483 } else {
438 // When we're terminating and there's no more work, we can 484 // When we're terminating and there's no more work, we can
439 // shut down. You can't get more tasks posted once 485 // shut down. You can't get more tasks posted once
440 // shutdown_called_ is set. There may be some tasks stuck 486 // shutdown_called_ is set. There may be some tasks stuck
441 // behind running ones with the same sequence token, but 487 // behind running ones with the same sequence token, but
(...skipping 260 matching lines...) Expand 10 before | Expand all | Expand 10 after
702 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), 748 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
703 max_threads, thread_name_prefix, observer)) { 749 max_threads, thread_name_prefix, observer)) {
704 } 750 }
705 751
706 SequencedWorkerPool::~SequencedWorkerPool() {} 752 SequencedWorkerPool::~SequencedWorkerPool() {}
707 753
708 void SequencedWorkerPool::OnDestruct() const { 754 void SequencedWorkerPool::OnDestruct() const {
709 DCHECK(constructor_message_loop_.get()); 755 DCHECK(constructor_message_loop_.get());
710 // Avoid deleting ourselves on a worker thread (which would 756 // Avoid deleting ourselves on a worker thread (which would
711 // deadlock). 757 // deadlock).
712 if (RunsTasksOnCurrentThread()) { 758 if (Worker::current()) {
713 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 759 constructor_message_loop_->DeleteSoon(FROM_HERE, this);
714 } else { 760 } else {
715 delete this; 761 delete this;
716 } 762 }
717 } 763 }
718 764
719 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 765 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
720 return inner_->GetSequenceToken(); 766 return inner_->GetSequenceToken();
721 } 767 }
722 768
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
781 TimeDelta delay) { 827 TimeDelta delay) {
782 // TODO(akalin): Add support for non-zero delays. 828 // TODO(akalin): Add support for non-zero delays.
783 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); 829 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0);
784 return PostWorkerTask(from_here, task); 830 return PostWorkerTask(from_here, task);
785 } 831 }
786 832
787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 833 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
788 return inner_->RunsTasksOnCurrentThread(); 834 return inner_->RunsTasksOnCurrentThread();
789 } 835 }
790 836
837 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
838 SequenceToken sequence_token) const {
839 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
840 }
841
791 void SequencedWorkerPool::FlushForTesting() { 842 void SequencedWorkerPool::FlushForTesting() {
792 inner_->FlushForTesting(); 843 inner_->FlushForTesting();
793 } 844 }
794 845
795 void SequencedWorkerPool::SignalHasWorkForTesting() { 846 void SequencedWorkerPool::SignalHasWorkForTesting() {
796 inner_->SignalHasWorkForTesting(); 847 inner_->SignalHasWorkForTesting();
797 } 848 }
798 849
799 void SequencedWorkerPool::Shutdown() { 850 void SequencedWorkerPool::Shutdown() {
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 851 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
801 inner_->Shutdown(); 852 inner_->Shutdown();
802 } 853 }
803 854
804 } // namespace base 855 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698