Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(99)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9124033: Hook up the SequencedWorkerPool to the browser thread. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: '' Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/crash_upload_list.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <deque> 7 #include <deque>
8 #include <set> 8 #include <set>
9 9
10 #include "base/atomicops.h" 10 #include "base/atomicops.h"
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 50
51 51
52 // Inner ---------------------------------------------------------------------- 52 // Inner ----------------------------------------------------------------------
53 53
54 class SequencedWorkerPool::Inner 54 class SequencedWorkerPool::Inner
55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> { 55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> {
56 public: 56 public:
57 Inner(size_t max_threads, const std::string& thread_name_prefix); 57 Inner(size_t max_threads, const std::string& thread_name_prefix);
58 virtual ~Inner(); 58 virtual ~Inner();
59 59
60 // Backends for SequenceWorkerPool.
61 SequenceToken GetSequenceToken(); 60 SequenceToken GetSequenceToken();
61
62 SequenceToken GetNamedSequenceToken(const std::string& name); 62 SequenceToken GetNamedSequenceToken(const std::string& name);
63 bool PostTask(int sequence_token_id, 63
64 // This function accepts a name and an ID. If the name is null, the
65 // token ID is used. This allows us to implement the optional name lookup
66 // from a single function without having to enter the lock a separate time.
67 bool PostTask(const std::string* optional_token_name,
68 int sequence_token_id,
64 SequencedWorkerPool::WorkerShutdown shutdown_behavior, 69 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
65 const tracked_objects::Location& from_here, 70 const tracked_objects::Location& from_here,
66 const base::Closure& task); 71 const base::Closure& task);
72
73 void Flush();
74
67 void Shutdown(); 75 void Shutdown();
76
68 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); 77 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer);
69 78
70 // Runs the worker loop on the background thread. 79 // Runs the worker loop on the background thread.
71 void ThreadLoop(Worker* this_worker); 80 void ThreadLoop(Worker* this_worker);
72 81
73 private: 82 private:
83 // Called from within the lock, this converts the given token name into a
84 // token ID, creating a new one if necessary.
85 int LockedGetNamedTokenID(const std::string& name);
86
74 // The calling code should clear the given delete_these_oustide_lock 87 // The calling code should clear the given delete_these_oustide_lock
75 // vector the next time the lock is released. See the implementation for 88 // vector the next time the lock is released. See the implementation for
76 // a more detailed description. 89 // a more detailed description.
77 bool GetWork(SequencedTask* task, 90 bool GetWork(SequencedTask* task,
78 std::vector<base::Closure>* delete_these_outside_lock); 91 std::vector<base::Closure>* delete_these_outside_lock);
79 92
80 // Peforms init and cleanup around running the given task. WillRun... 93 // Peforms init and cleanup around running the given task. WillRun...
81 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 94 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
82 // The calling code should call FinishStartingAdditionalThread once the 95 // The calling code should call FinishStartingAdditionalThread once the
83 // lock is released if the return values is nonzero. 96 // lock is released if the return values is nonzero.
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
228 SequencedWorkerPool::Inner::GetSequenceToken() { 241 SequencedWorkerPool::Inner::GetSequenceToken() {
229 base::subtle::Atomic32 result = 242 base::subtle::Atomic32 result =
230 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 243 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
231 return SequenceToken(static_cast<int>(result)); 244 return SequenceToken(static_cast<int>(result));
232 } 245 }
233 246
234 SequencedWorkerPool::SequenceToken 247 SequencedWorkerPool::SequenceToken
235 SequencedWorkerPool::Inner::GetNamedSequenceToken( 248 SequencedWorkerPool::Inner::GetNamedSequenceToken(
236 const std::string& name) { 249 const std::string& name) {
237 base::AutoLock lock(lock_); 250 base::AutoLock lock(lock_);
238 std::map<std::string, int>::const_iterator found = 251 return SequenceToken(LockedGetNamedTokenID(name));
239 named_sequence_tokens_.find(name);
240 if (found != named_sequence_tokens_.end())
241 return SequenceToken(found->second); // Got an existing one.
242
243 // Create a new one for this name.
244 SequenceToken result = GetSequenceToken();
245 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
246 return result;
247 } 252 }
248 253
249 bool SequencedWorkerPool::Inner::PostTask( 254 bool SequencedWorkerPool::Inner::PostTask(
255 const std::string* optional_token_name,
250 int sequence_token_id, 256 int sequence_token_id,
251 SequencedWorkerPool::WorkerShutdown shutdown_behavior, 257 SequencedWorkerPool::WorkerShutdown shutdown_behavior,
252 const tracked_objects::Location& from_here, 258 const tracked_objects::Location& from_here,
253 const base::Closure& task) { 259 const base::Closure& task) {
254 SequencedTask sequenced; 260 SequencedTask sequenced;
255 sequenced.sequence_token_id = sequence_token_id; 261 sequenced.sequence_token_id = sequence_token_id;
256 sequenced.shutdown_behavior = shutdown_behavior; 262 sequenced.shutdown_behavior = shutdown_behavior;
257 sequenced.location = from_here; 263 sequenced.location = from_here;
258 sequenced.task = task; 264 sequenced.task = task;
259 265
260 int create_thread_id = 0; 266 int create_thread_id = 0;
261 { 267 {
262 base::AutoLock lock(lock_); 268 base::AutoLock lock(lock_);
263 if (terminating_) 269 if (terminating_)
264 return false; 270 return false;
265 271
272 // Now that we have the lock, apply the named token rules.
273 if (optional_token_name)
274 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
275
266 pending_tasks_.push_back(sequenced); 276 pending_tasks_.push_back(sequenced);
267 pending_task_count_++; 277 pending_task_count_++;
268 if (shutdown_behavior == BLOCK_SHUTDOWN) 278 if (shutdown_behavior == BLOCK_SHUTDOWN)
269 blocking_shutdown_pending_task_count_++; 279 blocking_shutdown_pending_task_count_++;
270 280
271 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 281 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
272 } 282 }
273 283
274 // Actually start the additional thread or signal an existing one now that 284 // Actually start the additional thread or signal an existing one now that
275 // we're outside the lock. 285 // we're outside the lock.
276 if (create_thread_id) 286 if (create_thread_id)
277 FinishStartingAdditionalThread(create_thread_id); 287 FinishStartingAdditionalThread(create_thread_id);
278 else 288 else
279 cond_var_.Signal(); 289 cond_var_.Signal();
280 290
281 return true; 291 return true;
282 } 292 }
283 293
294 void SequencedWorkerPool::Inner::Flush() {
295 {
296 base::AutoLock lock(lock_);
297 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
298 cond_var_.Wait();
299 }
300 cond_var_.Signal();
301 }
302
284 void SequencedWorkerPool::Inner::Shutdown() { 303 void SequencedWorkerPool::Inner::Shutdown() {
285 if (shutdown_called_) 304 if (shutdown_called_)
286 return; 305 return;
287 shutdown_called_ = true; 306 shutdown_called_ = true;
288 307
289 // Mark us as terminated and go through and drop all tasks that aren't 308 // Mark us as terminated and go through and drop all tasks that aren't
290 // required to run on shutdown. Since no new tasks will get posted once the 309 // required to run on shutdown. Since no new tasks will get posted once the
291 // terminated flag is set, this ensures that all remaining tasks are required 310 // terminated flag is set, this ensures that all remaining tasks are required
292 // for shutdown whenever the termianted_ flag is set. 311 // for shutdown whenever the termianted_ flag is set.
293 { 312 {
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
359 } 378 }
360 DidRunWorkerTask(task); // Must be done inside the lock. 379 DidRunWorkerTask(task); // Must be done inside the lock.
361 } else { 380 } else {
362 // When we're terminating and there's no more work, we can shut down. 381 // When we're terminating and there's no more work, we can shut down.
363 // You can't get more tasks posted once terminating_ is set. There may 382 // You can't get more tasks posted once terminating_ is set. There may
364 // be some tasks stuck behind running ones with the same sequence 383 // be some tasks stuck behind running ones with the same sequence
365 // token, but additional threads won't help this case. 384 // token, but additional threads won't help this case.
366 if (terminating_) 385 if (terminating_)
367 break; 386 break;
368 waiting_thread_count_++; 387 waiting_thread_count_++;
388 cond_var_.Signal(); // For Flush() that may be waiting on the
389 // waiting thread count to go up.
369 cond_var_.Wait(); 390 cond_var_.Wait();
370 waiting_thread_count_--; 391 waiting_thread_count_--;
371 } 392 }
372 } 393 }
373 } 394 }
374 395
375 // We noticed we should exit. Wake up the next worker so it knows it should 396 // We noticed we should exit. Wake up the next worker so it knows it should
376 // exit as well (because the Shutdown() code only signals once). 397 // exit as well (because the Shutdown() code only signals once).
377 cond_var_.Signal(); 398 cond_var_.Signal();
378 } 399 }
379 400
401 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
402 const std::string& name) {
403 lock_.AssertAcquired();
404 DCHECK(!name.empty());
405
406 std::map<std::string, int>::const_iterator found =
407 named_sequence_tokens_.find(name);
408 if (found != named_sequence_tokens_.end())
409 return found->second; // Got an existing one.
410
411 // Create a new one for this name.
412 SequenceToken result = GetSequenceToken();
413 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
414 return result.id_;
415 }
416
380 bool SequencedWorkerPool::Inner::GetWork( 417 bool SequencedWorkerPool::Inner::GetWork(
381 SequencedTask* task, 418 SequencedTask* task,
382 std::vector<base::Closure>* delete_these_outside_lock) { 419 std::vector<base::Closure>* delete_these_outside_lock) {
383 lock_.AssertAcquired(); 420 lock_.AssertAcquired();
384 421
385 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); 422 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
386 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 423 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
387 static_cast<int>(pending_task_count_)); 424 static_cast<int>(pending_task_count_));
388 425
389 // Find the next task with a sequence token that's not currently in use. 426 // Find the next task with a sequence token that's not currently in use.
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after
586 } 623 }
587 624
588 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 625 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
589 const std::string& name) { 626 const std::string& name) {
590 return inner_->GetNamedSequenceToken(name); 627 return inner_->GetNamedSequenceToken(name);
591 } 628 }
592 629
593 bool SequencedWorkerPool::PostWorkerTask( 630 bool SequencedWorkerPool::PostWorkerTask(
594 const tracked_objects::Location& from_here, 631 const tracked_objects::Location& from_here,
595 const base::Closure& task) { 632 const base::Closure& task) {
596 return inner_->PostTask(0, BLOCK_SHUTDOWN, from_here, task); 633 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task);
597 } 634 }
598 635
599 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 636 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
600 const tracked_objects::Location& from_here, 637 const tracked_objects::Location& from_here,
601 const base::Closure& task, 638 const base::Closure& task,
602 WorkerShutdown shutdown_behavior) { 639 WorkerShutdown shutdown_behavior) {
603 return inner_->PostTask(0, shutdown_behavior, from_here, task); 640 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task);
604 } 641 }
605 642
606 bool SequencedWorkerPool::PostSequencedWorkerTask( 643 bool SequencedWorkerPool::PostSequencedWorkerTask(
607 SequenceToken sequence_token, 644 SequenceToken sequence_token,
608 const tracked_objects::Location& from_here, 645 const tracked_objects::Location& from_here,
609 const base::Closure& task) { 646 const base::Closure& task) {
610 return inner_->PostTask(sequence_token.id_, BLOCK_SHUTDOWN, 647 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN,
611 from_here, task); 648 from_here, task);
612 } 649 }
613 650
651 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
652 const std::string& token_name,
653 const tracked_objects::Location& from_here,
654 const base::Closure& task) {
655 DCHECK(!token_name.empty());
656 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task);
657 }
658
614 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 659 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
615 SequenceToken sequence_token, 660 SequenceToken sequence_token,
616 const tracked_objects::Location& from_here, 661 const tracked_objects::Location& from_here,
617 const base::Closure& task, 662 const base::Closure& task,
618 WorkerShutdown shutdown_behavior) { 663 WorkerShutdown shutdown_behavior) {
619 return inner_->PostTask(sequence_token.id_, shutdown_behavior, 664 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior,
620 from_here, task); 665 from_here, task);
621 } 666 }
622 667
668 void SequencedWorkerPool::FlushForTesting() {
669 inner_->Flush();
670 }
671
623 void SequencedWorkerPool::Shutdown() { 672 void SequencedWorkerPool::Shutdown() {
624 inner_->Shutdown(); 673 inner_->Shutdown();
625 } 674 }
626 675
627 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { 676 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
628 inner_->SetTestingObserver(observer); 677 inner_->SetTestingObserver(observer);
629 } 678 }
630 679
631 } // namespace base 680 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | chrome/browser/crash_upload_list.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698