Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(467)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9592037: Implement SequencedWorkerPool::RunsTasksOnCurrentThread() (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « base/task_runner_test_template.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <list> 7 #include <list>
8 #include <map> 8 #include <map>
9 #include <set> 9 #include <set>
10 #include <utility>
10 #include <vector> 11 #include <vector>
11 12
12 #include "base/atomicops.h" 13 #include "base/atomicops.h"
13 #include "base/callback.h" 14 #include "base/callback.h"
14 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
15 #include "base/logging.h" 16 #include "base/logging.h"
16 #include "base/memory/linked_ptr.h" 17 #include "base/memory/linked_ptr.h"
17 #include "base/message_loop_proxy.h" 18 #include "base/message_loop_proxy.h"
18 #include "base/metrics/histogram.h" 19 #include "base/metrics/histogram.h"
20 #include "base/stl_util.h"
19 #include "base/stringprintf.h" 21 #include "base/stringprintf.h"
20 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/condition_variable.h"
21 #include "base/synchronization/lock.h" 23 #include "base/synchronization/lock.h"
24 #include "base/threading/platform_thread.h"
22 #include "base/threading/simple_thread.h" 25 #include "base/threading/simple_thread.h"
23 #include "base/time.h" 26 #include "base/time.h"
24 #include "base/tracked_objects.h" 27 #include "base/tracked_objects.h"
25 28
26 namespace base { 29 namespace base {
27 30
28 namespace { 31 namespace {
29 32
30 struct SequencedTask { 33 struct SequencedTask {
31 SequencedTask() 34 SequencedTask()
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 82
80 // This function accepts a name and an ID. If the name is null, the 83 // This function accepts a name and an ID. If the name is null, the
81 // token ID is used. This allows us to implement the optional name lookup 84 // token ID is used. This allows us to implement the optional name lookup
82 // from a single function without having to enter the lock a separate time. 85 // from a single function without having to enter the lock a separate time.
83 bool PostTask(const std::string* optional_token_name, 86 bool PostTask(const std::string* optional_token_name,
84 SequenceToken sequence_token, 87 SequenceToken sequence_token,
85 WorkerShutdown shutdown_behavior, 88 WorkerShutdown shutdown_behavior,
86 const tracked_objects::Location& from_here, 89 const tracked_objects::Location& from_here,
87 const Closure& task); 90 const Closure& task);
88 91
92 bool RunsTasksOnCurrentThread() const;
93
89 void FlushForTesting(); 94 void FlushForTesting();
90 95
91 void Shutdown(); 96 void Shutdown();
92 97
93 void SetTestingObserver(TestingObserver* observer); 98 void SetTestingObserver(TestingObserver* observer);
94 99
95 // Runs the worker loop on the background thread. 100 // Runs the worker loop on the background thread.
96 void ThreadLoop(Worker* this_worker); 101 void ThreadLoop(Worker* this_worker);
97 102
98 private: 103 private:
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
144 SequencedWorkerPool* const worker_pool_; 149 SequencedWorkerPool* const worker_pool_;
145 150
146 // The last sequence number used. Managed by GetSequenceToken, since this 151 // The last sequence number used. Managed by GetSequenceToken, since this
147 // only does threadsafe increment operations, you do not need to hold the 152 // only does threadsafe increment operations, you do not need to hold the
148 // lock. 153 // lock.
149 volatile subtle::Atomic32 last_sequence_number_; 154 volatile subtle::Atomic32 last_sequence_number_;
150 155
151 // This lock protects |everything in this class|. Do not read or modify 156 // This lock protects |everything in this class|. Do not read or modify
152 // anything without holding this lock. Do not block while holding this 157 // anything without holding this lock. Do not block while holding this
153 // lock. 158 // lock.
154 Lock lock_; 159 mutable Lock lock_;
155 160
156 // Condition variable used to wake up worker threads when a task is runnable. 161 // Condition variable used to wake up worker threads when a task is runnable.
157 ConditionVariable cond_var_; 162 ConditionVariable cond_var_;
158 163
159 // The maximum number of worker threads we'll create. 164 // The maximum number of worker threads we'll create.
160 const size_t max_threads_; 165 const size_t max_threads_;
161 166
162 const std::string thread_name_prefix_; 167 const std::string thread_name_prefix_;
163 168
164 // Associates all known sequence token names with their IDs. 169 // Associates all known sequence token names with their IDs.
165 std::map<std::string, int> named_sequence_tokens_; 170 std::map<std::string, int> named_sequence_tokens_;
166 171
167 // Owning pointers to all threads we've created so far. Since we lazily 172 // Owning pointers to all threads we've created so far, indexed by
168 // create threads, this may be less than max_threads_ and will be initially 173 // ID. Since we lazily create threads, this may be less than
169 // empty. 174 // max_threads_ and will be initially empty.
170 std::vector<linked_ptr<Worker> > threads_; 175 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
176 ThreadMap threads_;
171 177
172 // Set to true when we're in the process of creating another thread. 178 // Set to true when we're in the process of creating another thread.
173 // See PrepareToStartAdditionalThreadIfHelpful for more. 179 // See PrepareToStartAdditionalThreadIfHelpful for more.
174 bool thread_being_created_; 180 bool thread_being_created_;
175 181
176 // Number of threads currently waiting for work. 182 // Number of threads currently waiting for work.
177 size_t waiting_thread_count_; 183 size_t waiting_thread_count_;
178 184
179 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 185 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
180 // flag set. 186 // flag set.
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
249 blocking_shutdown_pending_task_count_(0), 255 blocking_shutdown_pending_task_count_(0),
250 shutdown_called_(false), 256 shutdown_called_(false),
251 testing_observer_(NULL) {} 257 testing_observer_(NULL) {}
252 258
253 SequencedWorkerPool::Inner::~Inner() { 259 SequencedWorkerPool::Inner::~Inner() {
254 // You must call Shutdown() before destroying the pool. 260 // You must call Shutdown() before destroying the pool.
255 DCHECK(shutdown_called_); 261 DCHECK(shutdown_called_);
256 262
257 // Need to explicitly join with the threads before they're destroyed or else 263 // Need to explicitly join with the threads before they're destroyed or else
258 // they will be running when our object is half torn down. 264 // they will be running when our object is half torn down.
259 for (size_t i = 0; i < threads_.size(); i++) 265 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
260 threads_[i]->Join(); 266 it->second->Join();
261 threads_.clear(); 267 threads_.clear();
262 268
263 if (testing_observer_) 269 if (testing_observer_)
264 testing_observer_->OnDestruct(); 270 testing_observer_->OnDestruct();
265 } 271 }
266 272
267 SequencedWorkerPool::SequenceToken 273 SequencedWorkerPool::SequenceToken
268 SequencedWorkerPool::Inner::GetSequenceToken() { 274 SequencedWorkerPool::Inner::GetSequenceToken() {
269 subtle::Atomic32 result = 275 subtle::Atomic32 result =
270 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 276 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
310 // Actually start the additional thread or signal an existing one now that 316 // Actually start the additional thread or signal an existing one now that
311 // we're outside the lock. 317 // we're outside the lock.
312 if (create_thread_id) 318 if (create_thread_id)
313 FinishStartingAdditionalThread(create_thread_id); 319 FinishStartingAdditionalThread(create_thread_id);
314 else 320 else
315 cond_var_.Signal(); 321 cond_var_.Signal();
316 322
317 return true; 323 return true;
318 } 324 }
319 325
326 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
327 AutoLock lock(lock_);
328 return ContainsKey(threads_, PlatformThread::CurrentId());
329 }
330
320 void SequencedWorkerPool::Inner::FlushForTesting() { 331 void SequencedWorkerPool::Inner::FlushForTesting() {
321 { 332 {
322 AutoLock lock(lock_); 333 AutoLock lock(lock_);
323 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) 334 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
324 cond_var_.Wait(); 335 cond_var_.Wait();
325 } 336 }
326 cond_var_.Signal(); 337 cond_var_.Signal();
327 } 338 }
328 339
329 void SequencedWorkerPool::Inner::Shutdown() { 340 void SequencedWorkerPool::Inner::Shutdown() {
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
370 TestingObserver* observer) { 381 TestingObserver* observer) {
371 AutoLock lock(lock_); 382 AutoLock lock(lock_);
372 testing_observer_ = observer; 383 testing_observer_ = observer;
373 } 384 }
374 385
375 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 386 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
376 { 387 {
377 AutoLock lock(lock_); 388 AutoLock lock(lock_);
378 DCHECK(thread_being_created_); 389 DCHECK(thread_being_created_);
379 thread_being_created_ = false; 390 thread_being_created_ = false;
380 threads_.push_back(linked_ptr<Worker>(this_worker)); 391 std::pair<ThreadMap::iterator, bool> result =
392 threads_.insert(
393 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
394 DCHECK(result.second);
381 395
382 while (true) { 396 while (true) {
383 // See GetWork for what delete_these_outside_lock is doing. 397 // See GetWork for what delete_these_outside_lock is doing.
384 SequencedTask task; 398 SequencedTask task;
385 std::vector<Closure> delete_these_outside_lock; 399 std::vector<Closure> delete_these_outside_lock;
386 if (GetWork(&task, &delete_these_outside_lock)) { 400 if (GetWork(&task, &delete_these_outside_lock)) {
387 int new_thread_id = WillRunWorkerTask(task); 401 int new_thread_id = WillRunWorkerTask(task);
388 { 402 {
389 AutoUnlock unlock(lock_); 403 AutoUnlock unlock(lock_);
390 cond_var_.Signal(); 404 cond_var_.Signal();
(...skipping 331 matching lines...) Expand 10 before | Expand all | Expand 10 after
722 bool SequencedWorkerPool::PostDelayedTask( 736 bool SequencedWorkerPool::PostDelayedTask(
723 const tracked_objects::Location& from_here, 737 const tracked_objects::Location& from_here,
724 const Closure& task, 738 const Closure& task,
725 TimeDelta delay) { 739 TimeDelta delay) {
726 // TODO(akalin): Add support for non-zero delays. 740 // TODO(akalin): Add support for non-zero delays.
727 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0); 741 DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0);
728 return PostWorkerTask(from_here, task); 742 return PostWorkerTask(from_here, task);
729 } 743 }
730 744
731 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 745 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
732 // TODO(akalin): Keep track of the thread IDs of our worker threads 746 return inner_->RunsTasksOnCurrentThread();
733 // and use those to implement this function.
734 NOTREACHED();
735 return true;
736 } 747 }
737 748
738 void SequencedWorkerPool::FlushForTesting() { 749 void SequencedWorkerPool::FlushForTesting() {
739 inner_->FlushForTesting(); 750 inner_->FlushForTesting();
740 } 751 }
741 752
742 void SequencedWorkerPool::Shutdown() { 753 void SequencedWorkerPool::Shutdown() {
743 inner_->Shutdown(); 754 inner_->Shutdown();
744 } 755 }
745 756
746 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { 757 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
747 inner_->SetTestingObserver(observer); 758 inner_->SetTestingObserver(observer);
748 } 759 }
749 760
750 } // namespace base 761 } // namespace base
OLDNEW
« no previous file with comments | « base/task_runner_test_template.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698