OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/threading/sequenced_worker_pool.h" | 5 #include "base/threading/sequenced_worker_pool.h" |
6 | 6 |
7 #include <deque> | 7 #include <deque> |
8 #include <set> | 8 #include <set> |
9 | 9 |
10 #include "base/atomicops.h" | 10 #include "base/atomicops.h" |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
50 | 50 |
51 | 51 |
52 // Inner ---------------------------------------------------------------------- | 52 // Inner ---------------------------------------------------------------------- |
53 | 53 |
54 class SequencedWorkerPool::Inner | 54 class SequencedWorkerPool::Inner |
55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { | 55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { |
56 public: | 56 public: |
57 Inner(size_t max_threads, const std::string& thread_name_prefix); | 57 Inner(size_t max_threads, const std::string& thread_name_prefix); |
58 virtual ~Inner(); | 58 virtual ~Inner(); |
59 | 59 |
60 // Backends for SequenceWorkerPool. | |
61 SequenceToken GetSequenceToken(); | 60 SequenceToken GetSequenceToken(); |
| 61 |
62 SequenceToken GetNamedSequenceToken(const std::string& name); | 62 SequenceToken GetNamedSequenceToken(const std::string& name); |
63 bool PostTask(int sequence_token_id, | 63 |
| 64 // This function accepts a name and an ID. If the name is null, the |
| 65 // token ID is used. This allows us to implement the optional name lookup |
| 66 // from a single function without having to enter the lock a separate time. |
| 67 bool PostTask(const std::string* optional_token_name, |
| 68 int sequence_token_id, |
64 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | 69 SequencedWorkerPool::WorkerShutdown shutdown_behavior, |
65 const tracked_objects::Location& from_here, | 70 const tracked_objects::Location& from_here, |
66 const base::Closure& task); | 71 const base::Closure& task); |
| 72 |
| 73 void Flush(); |
| 74 |
67 void Shutdown(); | 75 void Shutdown(); |
| 76 |
68 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); | 77 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); |
69 | 78 |
70 // Runs the worker loop on the background thread. | 79 // Runs the worker loop on the background thread. |
71 void ThreadLoop(Worker* this_worker); | 80 void ThreadLoop(Worker* this_worker); |
72 | 81 |
73 private: | 82 private: |
| 83 // Called from within the lock, this converts the given token name into a |
| 84 // token ID, creating a new one if necessary. |
| 85 int LockedGetNamedTokenID(const std::string& name); |
| 86 |
74 // The calling code should clear the given delete_these_oustide_lock | 87 // The calling code should clear the given delete_these_oustide_lock |
75 // vector the next time the lock is released. See the implementation for | 88 // vector the next time the lock is released. See the implementation for |
76 // a more detailed description. | 89 // a more detailed description. |
77 bool GetWork(SequencedTask* task, | 90 bool GetWork(SequencedTask* task, |
78 std::vector<base::Closure>* delete_these_outside_lock); | 91 std::vector<base::Closure>* delete_these_outside_lock); |
79 | 92 |
80 // Peforms init and cleanup around running the given task. WillRun... | 93 // Peforms init and cleanup around running the given task. WillRun... |
81 // returns the value from PrepareToStartAdditionalThreadIfNecessary. | 94 // returns the value from PrepareToStartAdditionalThreadIfNecessary. |
82 // The calling code should call FinishStartingAdditionalThread once the | 95 // The calling code should call FinishStartingAdditionalThread once the |
83 // lock is released if the return values is nonzero. | 96 // lock is released if the return values is nonzero. |
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
228 SequencedWorkerPool::Inner::GetSequenceToken() { | 241 SequencedWorkerPool::Inner::GetSequenceToken() { |
229 base::subtle::Atomic32 result = | 242 base::subtle::Atomic32 result = |
230 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); | 243 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); |
231 return SequenceToken(static_cast<int>(result)); | 244 return SequenceToken(static_cast<int>(result)); |
232 } | 245 } |
233 | 246 |
234 SequencedWorkerPool::SequenceToken | 247 SequencedWorkerPool::SequenceToken |
235 SequencedWorkerPool::Inner::GetNamedSequenceToken( | 248 SequencedWorkerPool::Inner::GetNamedSequenceToken( |
236 const std::string& name) { | 249 const std::string& name) { |
237 base::AutoLock lock(lock_); | 250 base::AutoLock lock(lock_); |
238 std::map<std::string, int>::const_iterator found = | 251 return SequenceToken(LockedGetNamedTokenID(name)); |
239 named_sequence_tokens_.find(name); | |
240 if (found != named_sequence_tokens_.end()) | |
241 return SequenceToken(found->second); // Got an existing one. | |
242 | |
243 // Create a new one for this name. | |
244 SequenceToken result = GetSequenceToken(); | |
245 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); | |
246 return result; | |
247 } | 252 } |
248 | 253 |
249 bool SequencedWorkerPool::Inner::PostTask( | 254 bool SequencedWorkerPool::Inner::PostTask( |
| 255 const std::string* optional_token_name, |
250 int sequence_token_id, | 256 int sequence_token_id, |
251 SequencedWorkerPool::WorkerShutdown shutdown_behavior, | 257 SequencedWorkerPool::WorkerShutdown shutdown_behavior, |
252 const tracked_objects::Location& from_here, | 258 const tracked_objects::Location& from_here, |
253 const base::Closure& task) { | 259 const base::Closure& task) { |
254 SequencedTask sequenced; | 260 SequencedTask sequenced; |
255 sequenced.sequence_token_id = sequence_token_id; | 261 sequenced.sequence_token_id = sequence_token_id; |
256 sequenced.shutdown_behavior = shutdown_behavior; | 262 sequenced.shutdown_behavior = shutdown_behavior; |
257 sequenced.location = from_here; | 263 sequenced.location = from_here; |
258 sequenced.task = task; | 264 sequenced.task = task; |
259 | 265 |
260 int create_thread_id = 0; | 266 int create_thread_id = 0; |
261 { | 267 { |
262 base::AutoLock lock(lock_); | 268 base::AutoLock lock(lock_); |
263 if (terminating_) | 269 if (terminating_) |
264 return false; | 270 return false; |
265 | 271 |
| 272 // Now that we have the lock, apply the named token rules. |
| 273 if (optional_token_name) |
| 274 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); |
| 275 |
266 pending_tasks_.push_back(sequenced); | 276 pending_tasks_.push_back(sequenced); |
267 pending_task_count_++; | 277 pending_task_count_++; |
268 if (shutdown_behavior == BLOCK_SHUTDOWN) | 278 if (shutdown_behavior == BLOCK_SHUTDOWN) |
269 blocking_shutdown_pending_task_count_++; | 279 blocking_shutdown_pending_task_count_++; |
270 | 280 |
271 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); | 281 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); |
272 } | 282 } |
273 | 283 |
274 // Actually start the additional thread or signal an existing one now that | 284 // Actually start the additional thread or signal an existing one now that |
275 // we're outside the lock. | 285 // we're outside the lock. |
276 if (create_thread_id) | 286 if (create_thread_id) |
277 FinishStartingAdditionalThread(create_thread_id); | 287 FinishStartingAdditionalThread(create_thread_id); |
278 else | 288 else |
279 cond_var_.Signal(); | 289 cond_var_.Signal(); |
280 | 290 |
281 return true; | 291 return true; |
282 } | 292 } |
283 | 293 |
| 294 void SequencedWorkerPool::Inner::Flush() { |
| 295 { |
| 296 base::AutoLock lock(lock_); |
| 297 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) |
| 298 cond_var_.Wait(); |
| 299 } |
| 300 cond_var_.Signal(); |
| 301 } |
| 302 |
284 void SequencedWorkerPool::Inner::Shutdown() { | 303 void SequencedWorkerPool::Inner::Shutdown() { |
285 if (shutdown_called_) | 304 if (shutdown_called_) |
286 return; | 305 return; |
287 shutdown_called_ = true; | 306 shutdown_called_ = true; |
288 | 307 |
289 // Mark us as terminated and go through and drop all tasks that aren't | 308 // Mark us as terminated and go through and drop all tasks that aren't |
290 // required to run on shutdown. Since no new tasks will get posted once the | 309 // required to run on shutdown. Since no new tasks will get posted once the |
291 // terminated flag is set, this ensures that all remaining tasks are required | 310 // terminated flag is set, this ensures that all remaining tasks are required |
292 // for shutdown whenever the termianted_ flag is set. | 311 // for shutdown whenever the termianted_ flag is set. |
293 { | 312 { |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
359 } | 378 } |
360 DidRunWorkerTask(task); // Must be done inside the lock. | 379 DidRunWorkerTask(task); // Must be done inside the lock. |
361 } else { | 380 } else { |
362 // When we're terminating and there's no more work, we can shut down. | 381 // When we're terminating and there's no more work, we can shut down. |
363 // You can't get more tasks posted once terminating_ is set. There may | 382 // You can't get more tasks posted once terminating_ is set. There may |
364 // be some tasks stuck behind running ones with the same sequence | 383 // be some tasks stuck behind running ones with the same sequence |
365 // token, but additional threads won't help this case. | 384 // token, but additional threads won't help this case. |
366 if (terminating_) | 385 if (terminating_) |
367 break; | 386 break; |
368 waiting_thread_count_++; | 387 waiting_thread_count_++; |
| 388 cond_var_.Signal(); // For Flush() that may be waiting on the |
| 389 // waiting thread count to go up. |
369 cond_var_.Wait(); | 390 cond_var_.Wait(); |
370 waiting_thread_count_--; | 391 waiting_thread_count_--; |
371 } | 392 } |
372 } | 393 } |
373 } | 394 } |
374 | 395 |
375 // We noticed we should exit. Wake up the next worker so it knows it should | 396 // We noticed we should exit. Wake up the next worker so it knows it should |
376 // exit as well (because the Shutdown() code only signals once). | 397 // exit as well (because the Shutdown() code only signals once). |
377 cond_var_.Signal(); | 398 cond_var_.Signal(); |
378 } | 399 } |
379 | 400 |
| 401 int SequencedWorkerPool::Inner::LockedGetNamedTokenID( |
| 402 const std::string& name) { |
| 403 lock_.AssertAcquired(); |
| 404 DCHECK(!name.empty()); |
| 405 |
| 406 std::map<std::string, int>::const_iterator found = |
| 407 named_sequence_tokens_.find(name); |
| 408 if (found != named_sequence_tokens_.end()) |
| 409 return found->second; // Got an existing one. |
| 410 |
| 411 // Create a new one for this name. |
| 412 SequenceToken result = GetSequenceToken(); |
| 413 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); |
| 414 return result.id_; |
| 415 } |
| 416 |
380 bool SequencedWorkerPool::Inner::GetWork( | 417 bool SequencedWorkerPool::Inner::GetWork( |
381 SequencedTask* task, | 418 SequencedTask* task, |
382 std::vector<base::Closure>* delete_these_outside_lock) { | 419 std::vector<base::Closure>* delete_these_outside_lock) { |
383 lock_.AssertAcquired(); | 420 lock_.AssertAcquired(); |
384 | 421 |
385 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); | 422 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); |
386 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", | 423 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", |
387 static_cast<int>(pending_task_count_)); | 424 static_cast<int>(pending_task_count_)); |
388 | 425 |
389 // Find the next task with a sequence token that's not currently in use. | 426 // Find the next task with a sequence token that's not currently in use. |
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
586 } | 623 } |
587 | 624 |
588 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( | 625 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( |
589 const std::string& name) { | 626 const std::string& name) { |
590 return inner_->GetNamedSequenceToken(name); | 627 return inner_->GetNamedSequenceToken(name); |
591 } | 628 } |
592 | 629 |
593 bool SequencedWorkerPool::PostWorkerTask( | 630 bool SequencedWorkerPool::PostWorkerTask( |
594 const tracked_objects::Location& from_here, | 631 const tracked_objects::Location& from_here, |
595 const base::Closure& task) { | 632 const base::Closure& task) { |
596 return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); | 633 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); |
597 } | 634 } |
598 | 635 |
599 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( | 636 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( |
600 const tracked_objects::Location& from_here, | 637 const tracked_objects::Location& from_here, |
601 const base::Closure& task, | 638 const base::Closure& task, |
602 WorkerShutdown shutdown_behavior) { | 639 WorkerShutdown shutdown_behavior) { |
603 return inner_->PostTask(0, shutdown_behavior, from_here, task); | 640 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); |
604 } | 641 } |
605 | 642 |
606 bool SequencedWorkerPool::PostSequencedWorkerTask( | 643 bool SequencedWorkerPool::PostSequencedWorkerTask( |
607 SequenceToken sequence_token, | 644 SequenceToken sequence_token, |
608 const tracked_objects::Location& from_here, | 645 const tracked_objects::Location& from_here, |
609 const base::Closure& task) { | 646 const base::Closure& task) { |
610 return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, | 647 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, |
611 from_here, task); | 648 from_here, task); |
612 } | 649 } |
613 | 650 |
| 651 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( |
| 652 const std::string& token_name, |
| 653 const tracked_objects::Location& from_here, |
| 654 const base::Closure& task) { |
| 655 DCHECK(!token_name.empty()); |
| 656 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); |
| 657 } |
| 658 |
614 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( | 659 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( |
615 SequenceToken sequence_token, | 660 SequenceToken sequence_token, |
616 const tracked_objects::Location& from_here, | 661 const tracked_objects::Location& from_here, |
617 const base::Closure& task, | 662 const base::Closure& task, |
618 WorkerShutdown shutdown_behavior) { | 663 WorkerShutdown shutdown_behavior) { |
619 return inner_->PostTask(sequence_token.id_, shutdown_behavior, | 664 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, |
620 from_here, task); | 665 from_here, task); |
621 } | 666 } |
622 | 667 |
| 668 void SequencedWorkerPool::FlushForTesting() { |
| 669 inner_->Flush(); |
| 670 } |
| 671 |
623 void SequencedWorkerPool::Shutdown() { | 672 void SequencedWorkerPool::Shutdown() { |
624 inner_->Shutdown(); | 673 inner_->Shutdown(); |
625 } | 674 } |
626 | 675 |
627 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { | 676 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { |
628 inner_->SetTestingObserver(observer); | 677 inner_->SetTestingObserver(observer); |
629 } | 678 } |
630 | 679 |
631 } // namespace base | 680 } // namespace base |
OLD | NEW |