Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1175)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9845037: Add SequencedWorkerPool.IsRunningSequenceOnCurrentThread so callers can make stronger assertions ab… (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility> 10 #include <utility>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/atomicops.h" 13 #include "base/atomicops.h"
14 #include "base/callback.h" 14 #include "base/callback.h"
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/lazy_instance.h"
16 #include "base/logging.h" 17 #include "base/logging.h"
17 #include "base/memory/linked_ptr.h" 18 #include "base/memory/linked_ptr.h"
18 #include "base/message_loop_proxy.h" 19 #include "base/message_loop_proxy.h"
19 #include "base/metrics/histogram.h" 20 #include "base/metrics/histogram.h"
20 #include "base/stl_util.h" 21 #include "base/stl_util.h"
21 #include "base/stringprintf.h" 22 #include "base/stringprintf.h"
22 #include "base/synchronization/condition_variable.h" 23 #include "base/synchronization/condition_variable.h"
23 #include "base/synchronization/lock.h" 24 #include "base/synchronization/lock.h"
25 #include "base/threading/thread_local.h"
24 #include "base/threading/platform_thread.h" 26 #include "base/threading/platform_thread.h"
25 #include "base/threading/simple_thread.h" 27 #include "base/threading/simple_thread.h"
26 #include "base/time.h" 28 #include "base/time.h"
27 #include "base/tracked_objects.h" 29 #include "base/tracked_objects.h"
28 30
29 namespace base { 31 namespace base {
30 32
31 namespace { 33 namespace {
32 34
33 struct SequencedTask { 35 struct SequencedTask {
(...skipping 18 matching lines...) Expand all
52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 54 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
53 // around as long as we are running. 55 // around as long as we are running.
54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 56 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
55 int thread_number, 57 int thread_number,
56 const std::string& thread_name_prefix); 58 const std::string& thread_name_prefix);
57 virtual ~Worker(); 59 virtual ~Worker();
58 60
59 // SimpleThread implementation. This actually runs the background thread. 61 // SimpleThread implementation. This actually runs the background thread.
60 virtual void Run() OVERRIDE; 62 virtual void Run() OVERRIDE;
61 63
64 // Used to support IsRunningSequenceOnCurrentThread.
65 void set_running_sequence(SequenceToken token) { running_sequence_ = token; }
66 bool is_running_sequence(SequenceToken token) const {
67 return running_sequence_.Equals(token);
68 }
69
70 static Worker* current() {
71 return current_worker_tls_.Pointer()->Get();
72 }
73
62 private: 74 private:
63 scoped_refptr<SequencedWorkerPool> worker_pool_; 75 scoped_refptr<SequencedWorkerPool> worker_pool_;
76 SequenceToken running_sequence_;
77
78 static LazyInstance<ThreadLocalPointer<Worker> > current_worker_tls_;
64 79
65 DISALLOW_COPY_AND_ASSIGN(Worker); 80 DISALLOW_COPY_AND_ASSIGN(Worker);
66 }; 81 };
67 82
83 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker> >
84 SequencedWorkerPool::Worker::current_worker_tls_ =
85 LAZY_INSTANCE_INITIALIZER;
86
68 // Inner ---------------------------------------------------------------------- 87 // Inner ----------------------------------------------------------------------
69 88
70 class SequencedWorkerPool::Inner { 89 class SequencedWorkerPool::Inner {
71 public: 90 public:
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned 91 // Take a raw pointer to |worker| to avoid cycles (since we're owned
73 // by it). 92 // by it).
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 93 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
75 const std::string& thread_name_prefix, 94 const std::string& thread_name_prefix,
76 TestingObserver* observer); 95 TestingObserver* observer);
77 96
78 ~Inner(); 97 ~Inner();
79 98
80 SequenceToken GetSequenceToken(); 99 SequenceToken GetSequenceToken();
81 100
82 SequenceToken GetNamedSequenceToken(const std::string& name); 101 SequenceToken GetNamedSequenceToken(const std::string& name);
83 102
84 // This function accepts a name and an ID. If the name is null, the 103 // This function accepts a name and an ID. If the name is null, the
85 // token ID is used. This allows us to implement the optional name lookup 104 // token ID is used. This allows us to implement the optional name lookup
86 // from a single function without having to enter the lock a separate time. 105 // from a single function without having to enter the lock a separate time.
87 bool PostTask(const std::string* optional_token_name, 106 bool PostTask(const std::string* optional_token_name,
88 SequenceToken sequence_token, 107 SequenceToken sequence_token,
89 WorkerShutdown shutdown_behavior, 108 WorkerShutdown shutdown_behavior,
90 const tracked_objects::Location& from_here, 109 const tracked_objects::Location& from_here,
91 const Closure& task); 110 const Closure& task);
92 111
93 bool RunsTasksOnCurrentThread() const; 112 bool RunsTasksOnCurrentThread() const;
94 113
114 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
115
95 void FlushForTesting(); 116 void FlushForTesting();
96 117
97 void SignalHasWorkForTesting(); 118 void SignalHasWorkForTesting();
98 119
99 int GetWorkSignalCountForTesting() const; 120 int GetWorkSignalCountForTesting() const;
100 121
101 void Shutdown(); 122 void Shutdown();
102 123
103 // Runs the worker loop on the background thread. 124 // Runs the worker loop on the background thread.
104 void ThreadLoop(Worker* this_worker); 125 void ThreadLoop(Worker* this_worker);
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after
243 264
244 SequencedWorkerPool::Worker::~Worker() { 265 SequencedWorkerPool::Worker::~Worker() {
245 } 266 }
246 267
247 void SequencedWorkerPool::Worker::Run() { 268 void SequencedWorkerPool::Worker::Run() {
248 // Just jump back to the Inner object to run the thread, since it has all the 269 // Just jump back to the Inner object to run the thread, since it has all the
249 // tracking information and queues. It might be more natural to implement 270 // tracking information and queues. It might be more natural to implement
250 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 271 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
251 // having these worker objects at all, but that method lacks the ability to 272 // having these worker objects at all, but that method lacks the ability to
252 // send thread-specific information easily to the thread loop. 273 // send thread-specific information easily to the thread loop.
274 current_worker_tls_.Pointer()->Set(this);
253 worker_pool_->inner_->ThreadLoop(this); 275 worker_pool_->inner_->ThreadLoop(this);
276 current_worker_tls_.Pointer()->Set(NULL);
254 // Release our cyclic reference once we're done. 277 // Release our cyclic reference once we're done.
255 worker_pool_ = NULL; 278 worker_pool_ = NULL;
256 } 279 }
257 280
258 // Inner definitions --------------------------------------------------------- 281 // Inner definitions ---------------------------------------------------------
259 282
260 SequencedWorkerPool::Inner::Inner( 283 SequencedWorkerPool::Inner::Inner(
261 SequencedWorkerPool* worker_pool, 284 SequencedWorkerPool* worker_pool,
262 size_t max_threads, 285 size_t max_threads,
263 const std::string& thread_name_prefix, 286 const std::string& thread_name_prefix,
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
339 // we're outside the lock. 362 // we're outside the lock.
340 if (create_thread_id) 363 if (create_thread_id)
341 FinishStartingAdditionalThread(create_thread_id); 364 FinishStartingAdditionalThread(create_thread_id);
342 else 365 else
343 SignalHasWork(); 366 SignalHasWork();
344 367
345 return true; 368 return true;
346 } 369 }
347 370
348 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 371 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
349 AutoLock lock(lock_); 372 return Worker::current() != NULL;
350 return ContainsKey(threads_, PlatformThread::CurrentId()); 373 }
374
375 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
376 SequenceToken sequence_token) const {
377 Worker* worker = Worker::current();
378 if (!worker)
379 return false;
380 return worker->is_running_sequence(sequence_token);
351 } 381 }
352 382
353 void SequencedWorkerPool::Inner::FlushForTesting() { 383 void SequencedWorkerPool::Inner::FlushForTesting() {
354 AutoLock lock(lock_); 384 AutoLock lock(lock_);
355 while (!IsIdle()) 385 while (!IsIdle())
356 is_idle_cv_.Wait(); 386 is_idle_cv_.Wait();
357 } 387 }
358 388
359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 389 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
360 SignalHasWork(); 390 SignalHasWork();
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
420 // worker thread. (Technically not required, since we 450 // worker thread. (Technically not required, since we
421 // already get a signal for each new task, but it doesn't 451 // already get a signal for each new task, but it doesn't
422 // hurt.) 452 // hurt.)
423 SignalHasWork(); 453 SignalHasWork();
424 delete_these_outside_lock.clear(); 454 delete_these_outside_lock.clear();
425 455
426 // Complete thread creation outside the lock if necessary. 456 // Complete thread creation outside the lock if necessary.
427 if (new_thread_id) 457 if (new_thread_id)
428 FinishStartingAdditionalThread(new_thread_id); 458 FinishStartingAdditionalThread(new_thread_id);
429 459
460 this_worker->set_running_sequence(
461 SequenceToken(task.sequence_token_id));
462
430 task.task.Run(); 463 task.task.Run();
431 464
465 this_worker->set_running_sequence(SequenceToken());
466
432 // Make sure our task is erased outside the lock for the same reason 467 // Make sure our task is erased outside the lock for the same reason
433 // we do this with delete_these_oustide_lock. 468 // we do this with delete_these_oustide_lock.
434 task.task = Closure(); 469 task.task = Closure();
435 } 470 }
436 DidRunWorkerTask(task); // Must be done inside the lock. 471 DidRunWorkerTask(task); // Must be done inside the lock.
437 } else { 472 } else {
438 // When we're terminating and there's no more work, we can 473 // When we're terminating and there's no more work, we can
439 // shut down. You can't get more tasks posted once 474 // shut down. You can't get more tasks posted once
440 // shutdown_called_ is set. There may be some tasks stuck 475 // shutdown_called_ is set. There may be some tasks stuck
441 // behind running ones with the same sequence token, but 476 // behind running ones with the same sequence token, but
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after
781 TimeDelta delay) { 816 TimeDelta delay) {
782 // TODO(akalin): Add support for non-zero delays. 817 // TODO(akalin): Add support for non-zero delays.
783 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); 818 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0);
784 return PostWorkerTask(from_here, task); 819 return PostWorkerTask(from_here, task);
785 } 820 }
786 821
787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 822 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
788 return inner_->RunsTasksOnCurrentThread(); 823 return inner_->RunsTasksOnCurrentThread();
789 } 824 }
790 825
826 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
827 SequenceToken sequence_token) const {
828 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
829 }
830
791 void SequencedWorkerPool::FlushForTesting() { 831 void SequencedWorkerPool::FlushForTesting() {
792 inner_->FlushForTesting(); 832 inner_->FlushForTesting();
793 } 833 }
794 834
795 void SequencedWorkerPool::SignalHasWorkForTesting() { 835 void SequencedWorkerPool::SignalHasWorkForTesting() {
796 inner_->SignalHasWorkForTesting(); 836 inner_->SignalHasWorkForTesting();
797 } 837 }
798 838
799 void SequencedWorkerPool::Shutdown() { 839 void SequencedWorkerPool::Shutdown() {
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 840 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
801 inner_->Shutdown(); 841 inner_->Shutdown();
802 } 842 }
803 843
804 } // namespace base 844 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698