OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
11 #include <vector> | 11 #include <vector> |
12 | 12 |
13 #include "base/atomicops.h" | 13 #include "base/atomicops.h" |
14 #include "base/callback.h" | 14 #include "base/callback.h" |
15 #include "base/compiler_specific.h" | 15 #include "base/compiler_specific.h" |
| 16 #include "base/lazy_instance.h" |
16 #include "base/logging.h" | 17 #include "base/logging.h" |
17 #include "base/memory/linked_ptr.h" | 18 #include "base/memory/linked_ptr.h" |
18 #include "base/message_loop_proxy.h" | 19 #include "base/message_loop_proxy.h" |
19 #include "base/metrics/histogram.h" | 20 #include "base/metrics/histogram.h" |
20 #include "base/stl_util.h" | 21 #include "base/stl_util.h" |
21 #include "base/stringprintf.h" | 22 #include "base/stringprintf.h" |
22 #include "base/synchronization/condition_variable.h" | 23 #include "base/synchronization/condition_variable.h" |
23 #include "base/synchronization/lock.h" | 24 #include "base/synchronization/lock.h" |
| 25 #include "base/threading/thread_local.h" |
24 #include "base/threading/platform_thread.h" | 26 #include "base/threading/platform_thread.h" |
25 #include "base/threading/simple_thread.h" | 27 #include "base/threading/simple_thread.h" |
26 #include "base/time.h" | 28 #include "base/time.h" |
27 #include "base/tracked_objects.h" | 29 #include "base/tracked_objects.h" |
28 | 30 |
29 namespace base { | 31 namespace base { |
30 | 32 |
31 namespace { | 33 namespace { |
32 | 34 |
33 struct SequencedTask { | 35 struct SequencedTask { |
(...skipping 18 matching lines...) Expand all Loading... |
52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 54 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
53 // around as long as we are running. | 55 // around as long as we are running. |
54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 56 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
55 int thread_number, | 57 int thread_number, |
56 const std::string& thread_name_prefix); | 58 const std::string& thread_name_prefix); |
57 virtual ~Worker(); | 59 virtual ~Worker(); |
58 | 60 |
59 // SimpleThread implementation. This actually runs the background thread. | 61 // SimpleThread implementation. This actually runs the background thread. |
60 virtual void Run() OVERRIDE; | 62 virtual void Run() OVERRIDE; |
61 | 63 |
| 64 // Used to support IsRunningSequenceOnCurrentThread. |
| 65 void set_running_sequence(SequenceToken token) { running_sequence_ = token; } |
| 66 bool is_running_sequence(SequenceToken token) const { |
| 67 return running_sequence_.Equals(token); |
| 68 } |
| 69 |
| 70 static Worker* current() { |
| 71 return current_worker_tls_.Pointer()->Get(); |
| 72 } |
| 73 |
62 private: | 74 private: |
63 scoped_refptr<SequencedWorkerPool> worker_pool_; | 75 scoped_refptr<SequencedWorkerPool> worker_pool_; |
| 76 SequenceToken running_sequence_; |
| 77 |
| 78 static LazyInstance<ThreadLocalPointer<Worker> > current_worker_tls_; |
64 | 79 |
65 DISALLOW_COPY_AND_ASSIGN(Worker); | 80 DISALLOW_COPY_AND_ASSIGN(Worker); |
66 }; | 81 }; |
67 | 82 |
| 83 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker> > |
| 84 SequencedWorkerPool::Worker::current_worker_tls_ = |
| 85 LAZY_INSTANCE_INITIALIZER; |
| 86 |
68 // Inner ---------------------------------------------------------------------- | 87 // Inner ---------------------------------------------------------------------- |
69 | 88 |
70 class SequencedWorkerPool::Inner { | 89 class SequencedWorkerPool::Inner { |
71 public: | 90 public: |
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 91 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
73 // by it). | 92 // by it). |
74 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, | 93 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, |
75 const std::string& thread_name_prefix, | 94 const std::string& thread_name_prefix, |
76 TestingObserver* observer); | 95 TestingObserver* observer); |
77 | 96 |
78 ~Inner(); | 97 ~Inner(); |
79 | 98 |
80 SequenceToken GetSequenceToken(); | 99 SequenceToken GetSequenceToken(); |
81 | 100 |
82 SequenceToken GetNamedSequenceToken(const std::string& name); | 101 SequenceToken GetNamedSequenceToken(const std::string& name); |
83 | 102 |
84 // This function accepts a name and an ID. If the name is null, the | 103 // This function accepts a name and an ID. If the name is null, the |
85 // token ID is used. This allows us to implement the optional name lookup | 104 // token ID is used. This allows us to implement the optional name lookup |
86 // from a single function without having to enter the lock a separate time. | 105 // from a single function without having to enter the lock a separate time. |
87 bool PostTask(const std::string* optional_token_name, | 106 bool PostTask(const std::string* optional_token_name, |
88 SequenceToken sequence_token, | 107 SequenceToken sequence_token, |
89 WorkerShutdown shutdown_behavior, | 108 WorkerShutdown shutdown_behavior, |
90 const tracked_objects::Location& from_here, | 109 const tracked_objects::Location& from_here, |
91 const Closure& task); | 110 const Closure& task); |
92 | 111 |
93 bool RunsTasksOnCurrentThread() const; | 112 bool RunsTasksOnCurrentThread() const; |
94 | 113 |
| 114 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 115 |
95 void FlushForTesting(); | 116 void FlushForTesting(); |
96 | 117 |
97 void SignalHasWorkForTesting(); | 118 void SignalHasWorkForTesting(); |
98 | 119 |
99 int GetWorkSignalCountForTesting() const; | 120 int GetWorkSignalCountForTesting() const; |
100 | 121 |
101 void Shutdown(); | 122 void Shutdown(); |
102 | 123 |
103 // Runs the worker loop on the background thread. | 124 // Runs the worker loop on the background thread. |
104 void ThreadLoop(Worker* this_worker); | 125 void ThreadLoop(Worker* this_worker); |
(...skipping 138 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 | 264 |
244 SequencedWorkerPool::Worker::~Worker() { | 265 SequencedWorkerPool::Worker::~Worker() { |
245 } | 266 } |
246 | 267 |
247 void SequencedWorkerPool::Worker::Run() { | 268 void SequencedWorkerPool::Worker::Run() { |
248 // Just jump back to the Inner object to run the thread, since it has all the | 269 // Just jump back to the Inner object to run the thread, since it has all the |
249 // tracking information and queues. It might be more natural to implement | 270 // tracking information and queues. It might be more natural to implement |
250 // using DelegateSimpleThread and have Inner implement the Delegate to avoid | 271 // using DelegateSimpleThread and have Inner implement the Delegate to avoid |
251 // having these worker objects at all, but that method lacks the ability to | 272 // having these worker objects at all, but that method lacks the ability to |
252 // send thread-specific information easily to the thread loop. | 273 // send thread-specific information easily to the thread loop. |
| 274 current_worker_tls_.Pointer()->Set(this); |
253 worker_pool_->inner_->ThreadLoop(this); | 275 worker_pool_->inner_->ThreadLoop(this); |
| 276 current_worker_tls_.Pointer()->Set(NULL); |
254 // Release our cyclic reference once we're done. | 277 // Release our cyclic reference once we're done. |
255 worker_pool_ = NULL; | 278 worker_pool_ = NULL; |
256 } | 279 } |
257 | 280 |
258 // Inner definitions --------------------------------------------------------- | 281 // Inner definitions --------------------------------------------------------- |
259 | 282 |
260 SequencedWorkerPool::Inner::Inner( | 283 SequencedWorkerPool::Inner::Inner( |
261 SequencedWorkerPool* worker_pool, | 284 SequencedWorkerPool* worker_pool, |
262 size_t max_threads, | 285 size_t max_threads, |
263 const std::string& thread_name_prefix, | 286 const std::string& thread_name_prefix, |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
339 // we're outside the lock. | 362 // we're outside the lock. |
340 if (create_thread_id) | 363 if (create_thread_id) |
341 FinishStartingAdditionalThread(create_thread_id); | 364 FinishStartingAdditionalThread(create_thread_id); |
342 else | 365 else |
343 SignalHasWork(); | 366 SignalHasWork(); |
344 | 367 |
345 return true; | 368 return true; |
346 } | 369 } |
347 | 370 |
348 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 371 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
349 AutoLock lock(lock_); | 372 return Worker::current() != NULL; |
350 return ContainsKey(threads_, PlatformThread::CurrentId()); | 373 } |
| 374 |
| 375 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 376 SequenceToken sequence_token) const { |
| 377 Worker* worker = Worker::current(); |
| 378 if (!worker) |
| 379 return false; |
| 380 return worker->is_running_sequence(sequence_token); |
351 } | 381 } |
352 | 382 |
353 void SequencedWorkerPool::Inner::FlushForTesting() { | 383 void SequencedWorkerPool::Inner::FlushForTesting() { |
354 AutoLock lock(lock_); | 384 AutoLock lock(lock_); |
355 while (!IsIdle()) | 385 while (!IsIdle()) |
356 is_idle_cv_.Wait(); | 386 is_idle_cv_.Wait(); |
357 } | 387 } |
358 | 388 |
359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 389 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
360 SignalHasWork(); | 390 SignalHasWork(); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
420 // worker thread. (Technically not required, since we | 450 // worker thread. (Technically not required, since we |
421 // already get a signal for each new task, but it doesn't | 451 // already get a signal for each new task, but it doesn't |
422 // hurt.) | 452 // hurt.) |
423 SignalHasWork(); | 453 SignalHasWork(); |
424 delete_these_outside_lock.clear(); | 454 delete_these_outside_lock.clear(); |
425 | 455 |
426 // Complete thread creation outside the lock if necessary. | 456 // Complete thread creation outside the lock if necessary. |
427 if (new_thread_id) | 457 if (new_thread_id) |
428 FinishStartingAdditionalThread(new_thread_id); | 458 FinishStartingAdditionalThread(new_thread_id); |
429 | 459 |
| 460 this_worker->set_running_sequence( |
| 461 SequenceToken(task.sequence_token_id)); |
| 462 |
430 task.task.Run(); | 463 task.task.Run(); |
431 | 464 |
| 465 this_worker->set_running_sequence(SequenceToken()); |
| 466 |
432 // Make sure our task is erased outside the lock for the same reason | 467 // Make sure our task is erased outside the lock for the same reason |
433 // we do this with delete_these_oustide_lock. | 468 // we do this with delete_these_oustide_lock. |
434 task.task = Closure(); | 469 task.task = Closure(); |
435 } | 470 } |
436 DidRunWorkerTask(task); // Must be done inside the lock. | 471 DidRunWorkerTask(task); // Must be done inside the lock. |
437 } else { | 472 } else { |
438 // When we're terminating and there's no more work, we can | 473 // When we're terminating and there's no more work, we can |
439 // shut down. You can't get more tasks posted once | 474 // shut down. You can't get more tasks posted once |
440 // shutdown_called_ is set. There may be some tasks stuck | 475 // shutdown_called_ is set. There may be some tasks stuck |
441 // behind running ones with the same sequence token, but | 476 // behind running ones with the same sequence token, but |
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
781 TimeDelta delay) { | 816 TimeDelta delay) { |
782 // TODO(akalin): Add support for non-zero delays. | 817 // TODO(akalin): Add support for non-zero delays. |
783 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); | 818 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); |
784 return PostWorkerTask(from_here, task); | 819 return PostWorkerTask(from_here, task); |
785 } | 820 } |
786 | 821 |
787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 822 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
788 return inner_->RunsTasksOnCurrentThread(); | 823 return inner_->RunsTasksOnCurrentThread(); |
789 } | 824 } |
790 | 825 |
| 826 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 827 SequenceToken sequence_token) const { |
| 828 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 829 } |
| 830 |
791 void SequencedWorkerPool::FlushForTesting() { | 831 void SequencedWorkerPool::FlushForTesting() { |
792 inner_->FlushForTesting(); | 832 inner_->FlushForTesting(); |
793 } | 833 } |
794 | 834 |
795 void SequencedWorkerPool::SignalHasWorkForTesting() { | 835 void SequencedWorkerPool::SignalHasWorkForTesting() { |
796 inner_->SignalHasWorkForTesting(); | 836 inner_->SignalHasWorkForTesting(); |
797 } | 837 } |
798 | 838 |
799 void SequencedWorkerPool::Shutdown() { | 839 void SequencedWorkerPool::Shutdown() { |
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 840 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
801 inner_->Shutdown(); | 841 inner_->Shutdown(); |
802 } | 842 } |
803 | 843 |
804 } // namespace base | 844 } // namespace base |
OLD | NEW |