OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
10 #include <utility> | 10 #include <utility> |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it | 52 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it |
53 // around as long as we are running. | 53 // around as long as we are running. |
54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, | 54 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, |
55 int thread_number, | 55 int thread_number, |
56 const std::string& thread_name_prefix); | 56 const std::string& thread_name_prefix); |
57 virtual ~Worker(); | 57 virtual ~Worker(); |
58 | 58 |
59 // SimpleThread implementation. This actually runs the background thread. | 59 // SimpleThread implementation. This actually runs the background thread. |
60 virtual void Run() OVERRIDE; | 60 virtual void Run() OVERRIDE; |
61 | 61 |
| 62 void set_running_sequence(SequenceToken token) { |
| 63 running_sequence_ = token; |
| 64 } |
| 65 |
| 66 SequenceToken running_sequence() const { |
| 67 return running_sequence_; |
| 68 } |
| 69 |
62 private: | 70 private: |
63 scoped_refptr<SequencedWorkerPool> worker_pool_; | 71 scoped_refptr<SequencedWorkerPool> worker_pool_; |
| 72 SequenceToken running_sequence_; |
64 | 73 |
65 DISALLOW_COPY_AND_ASSIGN(Worker); | 74 DISALLOW_COPY_AND_ASSIGN(Worker); |
66 }; | 75 }; |
67 | 76 |
68 // Inner ---------------------------------------------------------------------- | 77 // Inner ---------------------------------------------------------------------- |
69 | 78 |
70 class SequencedWorkerPool::Inner { | 79 class SequencedWorkerPool::Inner { |
71 public: | 80 public: |
72 // Take a raw pointer to |worker| to avoid cycles (since we're owned | 81 // Take a raw pointer to |worker| to avoid cycles (since we're owned |
73 // by it). | 82 // by it). |
(...skipping 11 matching lines...) Expand all Loading... |
85 // token ID is used. This allows us to implement the optional name lookup | 94 // token ID is used. This allows us to implement the optional name lookup |
86 // from a single function without having to enter the lock a separate time. | 95 // from a single function without having to enter the lock a separate time. |
87 bool PostTask(const std::string* optional_token_name, | 96 bool PostTask(const std::string* optional_token_name, |
88 SequenceToken sequence_token, | 97 SequenceToken sequence_token, |
89 WorkerShutdown shutdown_behavior, | 98 WorkerShutdown shutdown_behavior, |
90 const tracked_objects::Location& from_here, | 99 const tracked_objects::Location& from_here, |
91 const Closure& task); | 100 const Closure& task); |
92 | 101 |
93 bool RunsTasksOnCurrentThread() const; | 102 bool RunsTasksOnCurrentThread() const; |
94 | 103 |
| 104 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; |
| 105 |
95 void FlushForTesting(); | 106 void FlushForTesting(); |
96 | 107 |
97 void SignalHasWorkForTesting(); | 108 void SignalHasWorkForTesting(); |
98 | 109 |
99 int GetWorkSignalCountForTesting() const; | 110 int GetWorkSignalCountForTesting() const; |
100 | 111 |
101 void Shutdown(); | 112 void Shutdown(); |
102 | 113 |
103 // Runs the worker loop on the background thread. | 114 // Runs the worker loop on the background thread. |
104 void ThreadLoop(Worker* this_worker); | 115 void ThreadLoop(Worker* this_worker); |
(...skipping 238 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
343 SignalHasWork(); | 354 SignalHasWork(); |
344 | 355 |
345 return true; | 356 return true; |
346 } | 357 } |
347 | 358 |
348 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { | 359 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
349 AutoLock lock(lock_); | 360 AutoLock lock(lock_); |
350 return ContainsKey(threads_, PlatformThread::CurrentId()); | 361 return ContainsKey(threads_, PlatformThread::CurrentId()); |
351 } | 362 } |
352 | 363 |
| 364 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( |
| 365 SequenceToken sequence_token) const { |
| 366 AutoLock lock(lock_); |
| 367 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); |
| 368 if (found == threads_.end()) |
| 369 return false; |
| 370 return found->second->running_sequence().Equals(sequence_token); |
| 371 } |
| 372 |
353 void SequencedWorkerPool::Inner::FlushForTesting() { | 373 void SequencedWorkerPool::Inner::FlushForTesting() { |
354 AutoLock lock(lock_); | 374 AutoLock lock(lock_); |
355 while (!IsIdle()) | 375 while (!IsIdle()) |
356 is_idle_cv_.Wait(); | 376 is_idle_cv_.Wait(); |
357 } | 377 } |
358 | 378 |
359 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { | 379 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { |
360 SignalHasWork(); | 380 SignalHasWork(); |
361 } | 381 } |
362 | 382 |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
420 // worker thread. (Technically not required, since we | 440 // worker thread. (Technically not required, since we |
421 // already get a signal for each new task, but it doesn't | 441 // already get a signal for each new task, but it doesn't |
422 // hurt.) | 442 // hurt.) |
423 SignalHasWork(); | 443 SignalHasWork(); |
424 delete_these_outside_lock.clear(); | 444 delete_these_outside_lock.clear(); |
425 | 445 |
426 // Complete thread creation outside the lock if necessary. | 446 // Complete thread creation outside the lock if necessary. |
427 if (new_thread_id) | 447 if (new_thread_id) |
428 FinishStartingAdditionalThread(new_thread_id); | 448 FinishStartingAdditionalThread(new_thread_id); |
429 | 449 |
| 450 this_worker->set_running_sequence( |
| 451 SequenceToken(task.sequence_token_id)); |
| 452 |
430 task.task.Run(); | 453 task.task.Run(); |
431 | 454 |
| 455 this_worker->set_running_sequence(SequenceToken()); |
| 456 |
432 // Make sure our task is erased outside the lock for the same reason | 457 // Make sure our task is erased outside the lock for the same reason |
433 // we do this with delete_these_oustide_lock. | 458 // we do this with delete_these_oustide_lock. |
434 task.task = Closure(); | 459 task.task = Closure(); |
435 } | 460 } |
436 DidRunWorkerTask(task); // Must be done inside the lock. | 461 DidRunWorkerTask(task); // Must be done inside the lock. |
437 } else { | 462 } else { |
438 // When we're terminating and there's no more work, we can | 463 // When we're terminating and there's no more work, we can |
439 // shut down. You can't get more tasks posted once | 464 // shut down. You can't get more tasks posted once |
440 // shutdown_called_ is set. There may be some tasks stuck | 465 // shutdown_called_ is set. There may be some tasks stuck |
441 // behind running ones with the same sequence token, but | 466 // behind running ones with the same sequence token, but |
(...skipping 339 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
781 TimeDelta delay) { | 806 TimeDelta delay) { |
782 // TODO(akalin): Add support for non-zero delays. | 807 // TODO(akalin): Add support for non-zero delays. |
783 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); | 808 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); |
784 return PostWorkerTask(from_here, task); | 809 return PostWorkerTask(from_here, task); |
785 } | 810 } |
786 | 811 |
787 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 812 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
788 return inner_->RunsTasksOnCurrentThread(); | 813 return inner_->RunsTasksOnCurrentThread(); |
789 } | 814 } |
790 | 815 |
| 816 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( |
| 817 SequenceToken sequence_token) const { |
| 818 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); |
| 819 } |
| 820 |
791 void SequencedWorkerPool::FlushForTesting() { | 821 void SequencedWorkerPool::FlushForTesting() { |
792 inner_->FlushForTesting(); | 822 inner_->FlushForTesting(); |
793 } | 823 } |
794 | 824 |
795 void SequencedWorkerPool::SignalHasWorkForTesting() { | 825 void SequencedWorkerPool::SignalHasWorkForTesting() { |
796 inner_->SignalHasWorkForTesting(); | 826 inner_->SignalHasWorkForTesting(); |
797 } | 827 } |
798 | 828 |
799 void SequencedWorkerPool::Shutdown() { | 829 void SequencedWorkerPool::Shutdown() { |
800 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); | 830 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); |
801 inner_->Shutdown(); | 831 inner_->Shutdown(); |
802 } | 832 } |
803 | 833 |
804 } // namespace base | 834 } // namespace base |
OLD | NEW |