OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
13 #include "base/atomicops.h" | 13 #include "base/atomicops.h" |
14 #include "base/callback.h" | 14 #include "base/callback.h" |
15 #include "base/compiler_specific.h" | 15 #include "base/compiler_specific.h" |
16 #include "base/lazy_instance.h" | |
16 #include "base/logging.h" | 17 #include "base/logging.h" |
17 #include "base/memory/linked_ptr.h" | 18 #include "base/memory/linked_ptr.h" |
18 #include "base/message_loop_proxy.h" | 19 #include "base/message_loop_proxy.h" |
19 #include "base/metrics/histogram.h" | 20 #include "base/metrics/histogram.h" |
20 #include "base/stl_util.h" | 21 #include "base/stl_util.h" |
21 #include "base/stringprintf.h" | 22 #include "base/stringprintf.h" |
22 #include "base/synchronization/condition_variable.h" | 23 #include "base/synchronization/condition_variable.h" |
23 #include "base/synchronization/lock.h" | 24 #include "base/synchronization/lock.h" |
25 #include "base/threading/thread_local.h" | |
24 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
25 #include "base/threading/simple_thread.h" | 27 #include "base/threading/simple_thread.h" |
26 #include "base/time.h" | 28 #include "base/time.h" |
27 #include "base/tracked_objects.h" | 29 #include "base/tracked_objects.h" |
28 | 30 |
29 namespace base { | 31 namespace base { |
30 | 32 |
31 namespace { | 33 namespace { |
32 | 34 |
33 struct SequencedTask { | 35 struct SequencedTask { |
(...skipping 18 matching lines...) Expand all Loading... | |
52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 54 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
53 // around as long as we are running. | 55 // around as long as we are running. |
54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 56 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
55 int thread_number, | 57 int thread_number, |
56 const std::string& thread_name_prefix); | 58 const std::string& thread_name_prefix); |
57 virtual ~Worker(); | 59 virtual ~Worker(); |
58 | 60 |
59 // SimpleThread implementation. This actually runs the background thread. | 61 // SimpleThread implementation. This actually runs the background thread. |
60 virtual void Run() OVERRIDE; | 62 virtual void Run() OVERRIDE; |
61 | 63 |
64 | |
65 void set_running_sequence(SequenceToken token) { | |
66 DCHECK_EQ(current(), this); | |
67 running_sequence_ = token; | |
68 } | |
69 | |
70 bool is_running_sequence(SequenceToken token) const { | |
71 DCHECK_EQ(current(), this); | |
72 return running_sequence_.Equals(token); | |
73 } | |
74 | |
75 SequencedWorkerPool* owner() const { | |
76 DCHECK_EQ(current(), this); | |
77 return worker_pool_.get(); | |
78 } | |
79 | |
80 static Worker* current() { | |
81 return current_worker_tls_.Pointer()->Get(); | |
82 } | |
83 | |
62 private: | 84 private: |
63 scoped_refptr<SequencedWorkerPool> worker_pool_; | 85 scoped_refptr<SequencedWorkerPool> worker_pool_; |
86 SequenceToken running_sequence_; | |
87 static LazyInstance<ThreadLocalPointer<Worker> > current_worker_tls_; | |
64 | 88 |
65 DISALLOW_COPY_AND_ASSIGN(Worker); | 89 DISALLOW_COPY_AND_ASSIGN(Worker); |
66 }; | 90 }; |
67 | 91 |
92 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker> > | |
93 SequencedWorkerPool::Worker::current_worker_tls_ = | |
94 LAZY_INSTANCE_INITIALIZER; | |
95 | |
68 // Inner ---------------------------------------------------------------------- | 96 // Inner ---------------------------------------------------------------------- |
69 | 97 |
70 class SequencedWorkerPool::Inner { | 98 class SequencedWorkerPool::Inner { |
71 public: | 99 public: |
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 100 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
73 // by it). | 101 // by it). |
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 102 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
75 const std::string& thread_name_prefix, | 103 const std::string& thread_name_prefix, |
76 TestingObserver* observer); | 104 TestingObserver* observer); |
77 | 105 |
78 ~Inner(); | 106 ~Inner(); |
79 | 107 |
80 SequenceToken GetSequenceToken(); | 108 SequenceToken GetSequenceToken(); |
81 | 109 |
82 SequenceToken GetNamedSequenceToken(const std::string& name); | 110 SequenceToken GetNamedSequenceToken(const std::string& name); |
83 | 111 |
84 // This function accepts a name and an ID. If the name is null, the | 112 // This function accepts a name and an ID. If the name is null, the |
85 // token ID is used. This allows us to implement the optional name lookup | 113 // token ID is used. This allows us to implement the optional name lookup |
86 // from a single function without having to enter the lock a separate time. | 114 // from a single function without having to enter the lock a separate time. |
87 bool PostTask(const std::string* optional_token_name, | 115 bool PostTask(const std::string* optional_token_name, |
88 SequenceToken sequence_token, | 116 SequenceToken sequence_token, |
89 WorkerShutdown shutdown_behavior, | 117 WorkerShutdown shutdown_behavior, |
90 const tracked_objects::Location& from_here, | 118 const tracked_objects::Location& from_here, |
91 const Closure& task); | 119 const Closure& task); |
92 | 120 |
93 bool RunsTasksOnCurrentThread() const; | 121 bool RunsTasksOnCurrentThread() const; |
94 | 122 |
123 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; | |
124 | |
95 void FlushForTesting(); | 125 void FlushForTesting(); |
96 | 126 |
97 void SignalHasWorkForTesting(); | 127 void SignalHasWorkForTesting(); |
98 | 128 |
99 int GetWorkSignalCountForTesting() const; | 129 int GetWorkSignalCountForTesting() const; |
100 | 130 |
101 void Shutdown(); | 131 void Shutdown(); |
102 | 132 |
103 // Runs the worker loop on the background thread. | 133 // Runs the worker loop on the background thread. |
104 void ThreadLoop(Worker* this_worker); | 134 void ThreadLoop(Worker* this_worker); |
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
243 | 273 |
244 SequencedWorkerPool::Worker::~Worker() { | 274 SequencedWorkerPool::Worker::~Worker() { |
245 } | 275 } |
246 | 276 |
247 void SequencedWorkerPool::Worker::Run() { | 277 void SequencedWorkerPool::Worker::Run() { |
248 // Just jump back to the Inner object to run the thread, since it has all the | 278 // Just jump back to the Inner object to run the thread, since it has all the |
249 // tracking information and queues. It might be more natural to implement | 279 // tracking information and queues. It might be more natural to implement |
250 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 280 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
251 // having these worker objects at all, but that method lacks the ability to | 281 // having these worker objects at all, but that method lacks the ability to |
252 // send thread-specific information easily to the thread loop. | 282 // send thread-specific information easily to the thread loop. |
283 current_worker_tls_.Pointer()->Set(this); | |
253 worker_pool_->inner_->ThreadLoop(this); | 284 worker_pool_->inner_->ThreadLoop(this); |
254 // Release our cyclic reference once we're done. | 285 // Release our cyclic reference once we're done. |
255 worker_pool_ = NULL; | 286 worker_pool_ = NULL; |
287 // Reset TLS after the worker_pool_ is reference is dropped because | |
288 // its destructor tests Worker::current(). | |
289 current_worker_tls_.Pointer()->Set(NULL); | |
256 } | 290 } |
257 | 291 |
258 // Inner definitions --------------------------------------------------------- | 292 // Inner definitions --------------------------------------------------------- |
259 | 293 |
260 SequencedWorkerPool::Inner::Inner( | 294 SequencedWorkerPool::Inner::Inner( |
261 SequencedWorkerPool* worker_pool, | 295 SequencedWorkerPool* worker_pool, |
262 size_t max_threads, | 296 size_t max_threads, |
263 const std::string& thread_name_prefix, | 297 const std::string& thread_name_prefix, |
264 TestingObserver* observer) | 298 TestingObserver* observer) |
265 : worker_pool_(worker_pool), | 299 : worker_pool_(worker_pool), |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
338 // Actually start the additional thread or signal an existing one now that | 372 // Actually start the additional thread or signal an existing one now that |
339 // we're outside the lock. | 373 // we're outside the lock. |
340 if (create_thread_id) | 374 if (create_thread_id) |
341 FinishStartingAdditionalThread(create_thread_id); | 375 FinishStartingAdditionalThread(create_thread_id); |
342 else | 376 else |
343 SignalHasWork(); | 377 SignalHasWork(); |
344 | 378 |
345 return true; | 379 return true; |
346 } | 380 } |
347 | 381 |
348 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 382 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
michaeln
2012/03/25 00:20:02
we could hoist these method bodies into SequencedW
| |
349 AutoLock lock(lock_); | 383 Worker* worker = Worker::current(); |
350 return ContainsKey(threads_, PlatformThread::CurrentId()); | 384 return worker && (worker->owner() == worker_pool_); |
385 } | |
386 | |
387 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( | |
388 SequenceToken sequence_token) const { | |
389 Worker* worker = Worker::current(); | |
390 return worker && (worker->owner() == worker_pool_) && | |
391 worker->is_running_sequence(sequence_token); | |
351 } | 392 } |
352 | 393 |
353 void SequencedWorkerPool::Inner::FlushForTesting() { | 394 void SequencedWorkerPool::Inner::FlushForTesting() { |
354 AutoLock lock(lock_); | 395 AutoLock lock(lock_); |
355 while (!IsIdle()) | 396 while (!IsIdle()) |
356 is_idle_cv_.Wait(); | 397 is_idle_cv_.Wait(); |
357 } | 398 } |
358 | 399 |
359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 400 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
360 SignalHasWork(); | 401 SignalHasWork(); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
420 // worker thread. (Technically not required, since we | 461 // worker thread. (Technically not required, since we |
421 // already get a signal for each new task, but it doesn't | 462 // already get a signal for each new task, but it doesn't |
422 // hurt.) | 463 // hurt.) |
423 SignalHasWork(); | 464 SignalHasWork(); |
424 delete_these_outside_lock.clear(); | 465 delete_these_outside_lock.clear(); |
425 | 466 |
426 // Complete thread creation outside the lock if necessary. | 467 // Complete thread creation outside the lock if necessary. |
427 if (new_thread_id) | 468 if (new_thread_id) |
428 FinishStartingAdditionalThread(new_thread_id); | 469 FinishStartingAdditionalThread(new_thread_id); |
429 | 470 |
471 this_worker->set_running_sequence( | |
472 SequenceToken(task.sequence_token_id)); | |
473 | |
430 task.task.Run(); | 474 task.task.Run(); |
431 | 475 |
476 this_worker->set_running_sequence(SequenceToken()); | |
477 | |
432 // Make sure our task is erased outside the lock for the same reason | 478 // Make sure our task is erased outside the lock for the same reason |
433 // we do this with delete_these_oustide_lock. | 479 // we do this with delete_these_oustide_lock. |
434 task.task = Closure(); | 480 task.task = Closure(); |
435 } | 481 } |
436 DidRunWorkerTask(task); // Must be done inside the lock. | 482 DidRunWorkerTask(task); // Must be done inside the lock. |
437 } else { | 483 } else { |
438 // When we're terminating and there's no more work, we can | 484 // When we're terminating and there's no more work, we can |
439 // shut down. You can't get more tasks posted once | 485 // shut down. You can't get more tasks posted once |
440 // shutdown_called_ is set. There may be some tasks stuck | 486 // shutdown_called_ is set. There may be some tasks stuck |
441 // behind running ones with the same sequence token, but | 487 // behind running ones with the same sequence token, but |
(...skipping 260 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
702 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), | 748 inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this), |
703 max_threads, thread_name_prefix, observer)) { | 749 max_threads, thread_name_prefix, observer)) { |
704 } | 750 } |
705 | 751 |
706 SequencedWorkerPool::~SequencedWorkerPool() {} | 752 SequencedWorkerPool::~SequencedWorkerPool() {} |
707 | 753 |
708 void SequencedWorkerPool::OnDestruct() const { | 754 void SequencedWorkerPool::OnDestruct() const { |
709 DCHECK(constructor_message_loop_.get()); | 755 DCHECK(constructor_message_loop_.get()); |
710 // Avoid deleting ourselves on a worker thread (which would | 756 // Avoid deleting ourselves on a worker thread (which would |
711 // deadlock). | 757 // deadlock). |
712 if (RunsTasksOnCurrentThread()) { | 758 if (Worker::current()) { |
713 constructor_message_loop_->DeleteSoon(FROM_HERE, this); | 759 constructor_message_loop_->DeleteSoon(FROM_HERE, this); |
714 } else { | 760 } else { |
715 delete this; | 761 delete this; |
716 } | 762 } |
717 } | 763 } |
718 | 764 |
719 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { | 765 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { |
720 return inner_->GetSequenceToken(); | 766 return inner_->GetSequenceToken(); |
721 } | 767 } |
722 | 768 |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
781 TimeDelta delay) { | 827 TimeDelta delay) { |
782 // TODO(akalin): Add support for non-zero delays. | 828 // TODO(akalin): Add support for non-zero delays. |
783 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); | 829 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); |
784 return PostWorkerTask(from_here, task); | 830 return PostWorkerTask(from_here, task); |
785 } | 831 } |
786 | 832 |
787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 833 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
788 return inner_->RunsTasksOnCurrentThread(); | 834 return inner_->RunsTasksOnCurrentThread(); |
789 } | 835 } |
790 | 836 |
837 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( | |
838 SequenceToken sequence_token) const { | |
839 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); | |
840 } | |
841 | |
791 void SequencedWorkerPool::FlushForTesting() { | 842 void SequencedWorkerPool::FlushForTesting() { |
792 inner_->FlushForTesting(); | 843 inner_->FlushForTesting(); |
793 } | 844 } |
794 | 845 |
795 void SequencedWorkerPool::SignalHasWorkForTesting() { | 846 void SequencedWorkerPool::SignalHasWorkForTesting() { |
796 inner_->SignalHasWorkForTesting(); | 847 inner_->SignalHasWorkForTesting(); |
797 } | 848 } |
798 | 849 |
799 void SequencedWorkerPool::Shutdown() { | 850 void SequencedWorkerPool::Shutdown() { |
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 851 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
801 inner_->Shutdown(); | 852 inner_->Shutdown(); |
802 } | 853 } |
803 | 854 |
804 } // namespace base | 855 } // namespace base |
OLD | NEW |