OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <list> | 7 #include <list> |
8 #include <map> | 8 #include <map> |
9 #include <set> | 9 #include <set> |
| 10 #include <utility> |
10 #include <vector> | 11 #include <vector> |
11 | 12 |
12 #include "base/atomicops.h" | 13 #include "base/atomicops.h" |
13 #include "base/callback.h" | 14 #include "base/callback.h" |
14 #include "base/compiler_specific.h" | 15 #include "base/compiler_specific.h" |
15 #include "base/logging.h" | 16 #include "base/logging.h" |
16 #include "base/memory/linked_ptr.h" | 17 #include "base/memory/linked_ptr.h" |
17 #include "base/message_loop_proxy.h" | 18 #include "base/message_loop_proxy.h" |
18 #include "base/metrics/histogram.h" | 19 #include "base/metrics/histogram.h" |
| 20 #include "base/stl_util.h" |
19 #include "base/stringprintf.h" | 21 #include "base/stringprintf.h" |
20 #include "base/synchronization/condition_variable.h" | 22 #include "base/synchronization/condition_variable.h" |
21 #include "base/synchronization/lock.h" | 23 #include "base/synchronization/lock.h" |
| 24 #include "base/threading/platform_thread.h" |
22 #include "base/threading/simple_thread.h" | 25 #include "base/threading/simple_thread.h" |
23 #include "base/time.h" | 26 #include "base/time.h" |
24 #include "base/tracked_objects.h" | 27 #include "base/tracked_objects.h" |
25 | 28 |
26 namespace base { | 29 namespace base { |
27 | 30 |
28 namespace { | 31 namespace { |
29 | 32 |
30 struct SequencedTask { | 33 struct SequencedTask { |
31 SequencedTask() | 34 SequencedTask() |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
79 | 82 |
80 // This function accepts a name and an ID. If the name is null, the | 83 // This function accepts a name and an ID. If the name is null, the |
81 // token ID is used. This allows us to implement the optional name lookup | 84 // token ID is used. This allows us to implement the optional name lookup |
82 // from a single function without having to enter the lock a separate time. | 85 // from a single function without having to enter the lock a separate time. |
83 bool PostTask(const std::string* optional_token_name, | 86 bool PostTask(const std::string* optional_token_name, |
84 SequenceToken sequence_token, | 87 SequenceToken sequence_token, |
85 WorkerShutdown shutdown_behavior, | 88 WorkerShutdown shutdown_behavior, |
86 const tracked_objects::Location& from_here, | 89 const tracked_objects::Location& from_here, |
87 const Closure& task); | 90 const Closure& task); |
88 | 91 |
| 92 bool RunsTasksOnCurrentThread() const; |
| 93 |
89 void FlushForTesting(); | 94 void FlushForTesting(); |
90 | 95 |
91 void Shutdown(); | 96 void Shutdown(); |
92 | 97 |
93 void SetTestingObserver(TestingObserver* observer); | 98 void SetTestingObserver(TestingObserver* observer); |
94 | 99 |
95 // Runs the worker loop on the background thread. | 100 // Runs the worker loop on the background thread. |
96 void ThreadLoop(Worker* this_worker); | 101 void ThreadLoop(Worker* this_worker); |
97 | 102 |
98 private: | 103 private: |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
144 SequencedWorkerPool* const worker_pool_; | 149 SequencedWorkerPool* const worker_pool_; |
145 | 150 |
146 // The last sequence number used. Managed by GetSequenceToken, since this | 151 // The last sequence number used. Managed by GetSequenceToken, since this |
147 // only does threadsafe increment operations, you do not need to hold the | 152 // only does threadsafe increment operations, you do not need to hold the |
148 // lock. | 153 // lock. |
149 volatile subtle::Atomic32 last_sequence_number_; | 154 volatile subtle::Atomic32 last_sequence_number_; |
150 | 155 |
151 // This lock protects |everything in this class|. Do not read or modify | 156 // This lock protects |everything in this class|. Do not read or modify |
152 // anything without holding this lock. Do not block while holding this | 157 // anything without holding this lock. Do not block while holding this |
153 // lock. | 158 // lock. |
154 Lock lock_; | 159 mutable Lock lock_; |
155 | 160 |
156 // Condition variable used to wake up worker threads when a task is runnable. | 161 // Condition variable used to wake up worker threads when a task is runnable. |
157 ConditionVariable cond_var_; | 162 ConditionVariable cond_var_; |
158 | 163 |
159 // The maximum number of worker threads we'll create. | 164 // The maximum number of worker threads we'll create. |
160 const size_t max_threads_; | 165 const size_t max_threads_; |
161 | 166 |
162 const std::string thread_name_prefix_; | 167 const std::string thread_name_prefix_; |
163 | 168 |
164 // Associates all known sequence token names with their IDs. | 169 // Associates all known sequence token names with their IDs. |
165 std::map<std::string, int> named_sequence_tokens_; | 170 std::map<std::string, int> named_sequence_tokens_; |
166 | 171 |
167 // Owning pointers to all threads we've created so far. Since we lazily | 172 // Owning pointers to all threads we've created so far, indexed by |
168 // create threads, this may be less than max_threads_ and will be initially | 173 // ID. Since we lazily create threads, this may be less than |
169 // empty. | 174 // max_threads_ and will be initially empty. |
170 std::vector<linked_ptr<Worker> > threads_; | 175 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; |
| 176 ThreadMap threads_; |
171 | 177 |
172 // Set to true when we're in the process of creating another thread. | 178 // Set to true when we're in the process of creating another thread. |
173 // See PrepareToStartAdditionalThreadIfHelpful for more. | 179 // See PrepareToStartAdditionalThreadIfHelpful for more. |
174 bool thread_being_created_; | 180 bool thread_being_created_; |
175 | 181 |
176 // Number of threads currently waiting for work. | 182 // Number of threads currently waiting for work. |
177 size_t waiting_thread_count_; | 183 size_t waiting_thread_count_; |
178 | 184 |
179 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN | 185 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN |
180 // flag set. | 186 // flag set. |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
249 blocking_shutdown_pending_task_count_(0), | 255 blocking_shutdown_pending_task_count_(0), |
250 shutdown_called_(false), | 256 shutdown_called_(false), |
251 testing_observer_(NULL) {} | 257 testing_observer_(NULL) {} |
252 | 258 |
253 SequencedWorkerPool::Inner::~Inner() { | 259 SequencedWorkerPool::Inner::~Inner() { |
254 // You must call Shutdown() before destroying the pool. | 260 // You must call Shutdown() before destroying the pool. |
255 DCHECK(shutdown_called_); | 261 DCHECK(shutdown_called_); |
256 | 262 |
257 // Need to explicitly join with the threads before they're destroyed or else | 263 // Need to explicitly join with the threads before they're destroyed or else |
258 // they will be running when our object is half torn down. | 264 // they will be running when our object is half torn down. |
259 for (size_t i = 0; i < threads_.size(); i++) | 265 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) |
260 threads_[i]->Join(); | 266 it->second->Join(); |
261 threads_.clear(); | 267 threads_.clear(); |
262 | 268 |
263 if (testing_observer_) | 269 if (testing_observer_) |
264 testing_observer_->OnDestruct(); | 270 testing_observer_->OnDestruct(); |
265 } | 271 } |
266 | 272 |
267 SequencedWorkerPool::SequenceToken | 273 SequencedWorkerPool::SequenceToken |
268 SequencedWorkerPool::Inner::GetSequenceToken() { | 274 SequencedWorkerPool::Inner::GetSequenceToken() { |
269 subtle::Atomic32 result = | 275 subtle::Atomic32 result = |
270 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | 276 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
310 // Actually start the additional thread or signal an existing one now that | 316 // Actually start the additional thread or signal an existing one now that |
311 // we're outside the lock. | 317 // we're outside the lock. |
312 if (create_thread_id) | 318 if (create_thread_id) |
313 FinishStartingAdditionalThread(create_thread_id); | 319 FinishStartingAdditionalThread(create_thread_id); |
314 else | 320 else |
315 cond_var_.Signal(); | 321 cond_var_.Signal(); |
316 | 322 |
317 return true; | 323 return true; |
318 } | 324 } |
319 | 325 |
| 326 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { |
| 327 AutoLock lock(lock_); |
| 328 return ContainsKey(threads_, PlatformThread::CurrentId()); |
| 329 } |
| 330 |
320 void SequencedWorkerPool::Inner::FlushForTesting() { | 331 void SequencedWorkerPool::Inner::FlushForTesting() { |
321 { | 332 { |
322 AutoLock lock(lock_); | 333 AutoLock lock(lock_); |
323 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) | 334 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
324 cond_var_.Wait(); | 335 cond_var_.Wait(); |
325 } | 336 } |
326 cond_var_.Signal(); | 337 cond_var_.Signal(); |
327 } | 338 } |
328 | 339 |
329 void SequencedWorkerPool::Inner::Shutdown() { | 340 void SequencedWorkerPool::Inner::Shutdown() { |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
370 TestingObserver* observer) { | 381 TestingObserver* observer) { |
371 AutoLock lock(lock_); | 382 AutoLock lock(lock_); |
372 testing_observer_ = observer; | 383 testing_observer_ = observer; |
373 } | 384 } |
374 | 385 |
375 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { | 386 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { |
376 { | 387 { |
377 AutoLock lock(lock_); | 388 AutoLock lock(lock_); |
378 DCHECK(thread_being_created_); | 389 DCHECK(thread_being_created_); |
379 thread_being_created_ = false; | 390 thread_being_created_ = false; |
380 threads_.push_back(linked_ptr<Worker>(this_worker)); | 391 std::pair<ThreadMap::iterator, bool> result = |
| 392 threads_.insert( |
| 393 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); |
| 394 DCHECK(result.second); |
381 | 395 |
382 while (true) { | 396 while (true) { |
383 // See GetWork for what delete_these_outside_lock is doing. | 397 // See GetWork for what delete_these_outside_lock is doing. |
384 SequencedTask task; | 398 SequencedTask task; |
385 std::vector<Closure> delete_these_outside_lock; | 399 std::vector<Closure> delete_these_outside_lock; |
386 if (GetWork(&task, &delete_these_outside_lock)) { | 400 if (GetWork(&task, &delete_these_outside_lock)) { |
387 int new_thread_id = WillRunWorkerTask(task); | 401 int new_thread_id = WillRunWorkerTask(task); |
388 { | 402 { |
389 AutoUnlock unlock(lock_); | 403 AutoUnlock unlock(lock_); |
390 cond_var_.Signal(); | 404 cond_var_.Signal(); |
(...skipping 331 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
722 bool SequencedWorkerPool::PostDelayedTask( | 736 bool SequencedWorkerPool::PostDelayedTask( |
723 const tracked_objects::Location& from_here, | 737 const tracked_objects::Location& from_here, |
724 const Closure& task, | 738 const Closure& task, |
725 TimeDelta delay) { | 739 TimeDelta delay) { |
726 // TODO(akalin): Add support for non-zero delays. | 740 // TODO(akalin): Add support for non-zero delays. |
727 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); | 741 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); |
728 return PostWorkerTask(from_here, task); | 742 return PostWorkerTask(from_here, task); |
729 } | 743 } |
730 | 744 |
731 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { | 745 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { |
732 // TODO(akalin): Keep track of the thread IDs of our worker threads | 746 return inner_->RunsTasksOnCurrentThread(); |
733 // and use those to implement this function. | |
734 NOTREACHED(); | |
735 return true; | |
736 } | 747 } |
737 | 748 |
738 void SequencedWorkerPool::FlushForTesting() { | 749 void SequencedWorkerPool::FlushForTesting() { |
739 inner_->FlushForTesting(); | 750 inner_->FlushForTesting(); |
740 } | 751 } |
741 | 752 |
742 void SequencedWorkerPool::Shutdown() { | 753 void SequencedWorkerPool::Shutdown() { |
743 inner_->Shutdown(); | 754 inner_->Shutdown(); |
744 } | 755 } |
745 | 756 |
746 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | 757 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
747 inner_->SetTestingObserver(observer); | 758 inner_->SetTestingObserver(observer); |
748 } | 759 } |
749 | 760 |
750 } // namespace base | 761 } // namespace base |
OLD | NEW |