Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(145)

Side by Side Diff: base/threading/sequenced_worker_pool.cc

Issue 9347056: Fix up SequencedWorkerPool in preparation for making it a TaskRunner (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Sync to head Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/threading/sequenced_worker_pool.h" 5 #include "base/threading/sequenced_worker_pool.h"
6 6
7 #include <deque> 7 #include <list>
8 #include <map>
8 #include <set> 9 #include <set>
10 #include <vector>
9 11
10 #include "base/atomicops.h" 12 #include "base/atomicops.h"
11 #include "base/bind.h" 13 #include "base/callback.h"
12 #include "base/memory/scoped_ptr.h" 14 #include "base/compiler_specific.h"
15 #include "base/logging.h"
16 #include "base/memory/linked_ptr.h"
13 #include "base/metrics/histogram.h" 17 #include "base/metrics/histogram.h"
14 #include "base/stringprintf.h" 18 #include "base/stringprintf.h"
15 #include "base/synchronization/condition_variable.h" 19 #include "base/synchronization/condition_variable.h"
20 #include "base/synchronization/lock.h"
16 #include "base/threading/simple_thread.h" 21 #include "base/threading/simple_thread.h"
17 #include "base/threading/thread.h" 22 #include "base/time.h"
23 #include "base/tracked_objects.h"
18 24
19 namespace base { 25 namespace base {
20 26
21 namespace { 27 namespace {
22 28
23 struct SequencedTask { 29 struct SequencedTask {
30 SequencedTask()
31 : sequence_token_id(0),
32 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
33
34 ~SequencedTask() {}
35
24 int sequence_token_id; 36 int sequence_token_id;
25 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 37 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
26 tracked_objects::Location location; 38 tracked_objects::Location location;
27 base::Closure task; 39 Closure task;
28 }; 40 };
29 41
30 } // namespace 42 } // namespace
31 43
32 // Worker --------------------------------------------------------------------- 44 // Worker ---------------------------------------------------------------------
33 45
34 class SequencedWorkerPool::Worker : public base::SimpleThread { 46 class SequencedWorkerPool::Worker : public SimpleThread {
35 public: 47 public:
36 Worker(SequencedWorkerPool::Inner* inner, 48 // Hold a ref to |worker_pool|, since we want to keep it around even
49 // if it doesn't join our thread. Note that this (deliberately)
50 // leaks on shutdown.
51 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
37 int thread_number, 52 int thread_number,
38 const std::string& thread_name_prefix); 53 const std::string& thread_name_prefix);
39 ~Worker(); 54 virtual ~Worker();
40 55
41 // SimpleThread implementation. This actually runs the background thread. 56 // SimpleThread implementation. This actually runs the background thread.
42 virtual void Run(); 57 virtual void Run() OVERRIDE;
43 58
44 private: 59 private:
45 SequencedWorkerPool::Inner* inner_; 60 const scoped_refptr<SequencedWorkerPool> worker_pool_;
46 SequencedWorkerPool::WorkerShutdown current_shutdown_mode_;
47 61
48 DISALLOW_COPY_AND_ASSIGN(Worker); 62 DISALLOW_COPY_AND_ASSIGN(Worker);
49 }; 63 };
50 64
51
52 // Inner ---------------------------------------------------------------------- 65 // Inner ----------------------------------------------------------------------
53 66
54 class SequencedWorkerPool::Inner 67 class SequencedWorkerPool::Inner {
55 : public base::RefCountedThreadSafe<SequencedWorkerPool::Inner> {
56 public: 68 public:
57 Inner(size_t max_threads, const std::string& thread_name_prefix); 69 // Take a raw pointer to |worker| to avoid cycles (since we're owned
58 virtual ~Inner(); 70 // by it).
71 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
72 const std::string& thread_name_prefix);
73
74 ~Inner();
59 75
60 SequenceToken GetSequenceToken(); 76 SequenceToken GetSequenceToken();
61 77
62 SequenceToken GetNamedSequenceToken(const std::string& name); 78 SequenceToken GetNamedSequenceToken(const std::string& name);
63 79
64 // This function accepts a name and an ID. If the name is null, the 80 // This function accepts a name and an ID. If the name is null, the
65 // token ID is used. This allows us to implement the optional name lookup 81 // token ID is used. This allows us to implement the optional name lookup
66 // from a single function without having to enter the lock a separate time. 82 // from a single function without having to enter the lock a separate time.
67 bool PostTask(const std::string* optional_token_name, 83 bool PostTask(const std::string* optional_token_name,
68 int sequence_token_id, 84 SequenceToken sequence_token,
69 SequencedWorkerPool::WorkerShutdown shutdown_behavior, 85 WorkerShutdown shutdown_behavior,
70 const tracked_objects::Location& from_here, 86 const tracked_objects::Location& from_here,
71 const base::Closure& task); 87 const Closure& task);
72 88
73 void Flush(); 89 void FlushForTesting();
74 90
75 void Shutdown(); 91 void Shutdown();
76 92
77 void SetTestingObserver(SequencedWorkerPool::TestingObserver* observer); 93 void SetTestingObserver(TestingObserver* observer);
78 94
79 // Runs the worker loop on the background thread. 95 // Runs the worker loop on the background thread.
80 void ThreadLoop(Worker* this_worker); 96 void ThreadLoop(Worker* this_worker);
81 97
82 private: 98 private:
83 // Called from within the lock, this converts the given token name into a 99 // Called from within the lock, this converts the given token name into a
84 // token ID, creating a new one if necessary. 100 // token ID, creating a new one if necessary.
85 int LockedGetNamedTokenID(const std::string& name); 101 int LockedGetNamedTokenID(const std::string& name);
86 102
87 // The calling code should clear the given delete_these_oustide_lock 103 // The calling code should clear the given delete_these_oustide_lock
88 // vector the next time the lock is released. See the implementation for 104 // vector the next time the lock is released. See the implementation for
89 // a more detailed description. 105 // a more detailed description.
90 bool GetWork(SequencedTask* task, 106 bool GetWork(SequencedTask* task,
91 std::vector<base::Closure>* delete_these_outside_lock); 107 std::vector<Closure>* delete_these_outside_lock);
92 108
93 // Peforms init and cleanup around running the given task. WillRun... 109 // Peforms init and cleanup around running the given task. WillRun...
94 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 110 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
95 // The calling code should call FinishStartingAdditionalThread once the 111 // The calling code should call FinishStartingAdditionalThread once the
96 // lock is released if the return values is nonzero. 112 // lock is released if the return values is nonzero.
97 int WillRunWorkerTask(const SequencedTask& task); 113 int WillRunWorkerTask(const SequencedTask& task);
98 void DidRunWorkerTask(const SequencedTask& task); 114 void DidRunWorkerTask(const SequencedTask& task);
99 115
100 // Returns true if there are no threads currently running the given 116 // Returns true if there are no threads currently running the given
101 // sequence token. 117 // sequence token.
(...skipping 16 matching lines...) Expand all
118 // The second part of thread creation after 134 // The second part of thread creation after
119 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 135 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
120 // generated. This actually creates the thread and should be called outside 136 // generated. This actually creates the thread and should be called outside
121 // the lock to avoid blocking important work starting a thread in the lock. 137 // the lock to avoid blocking important work starting a thread in the lock.
122 void FinishStartingAdditionalThread(int thread_number); 138 void FinishStartingAdditionalThread(int thread_number);
123 139
124 // Checks whether there is work left that's blocking shutdown. Must be 140 // Checks whether there is work left that's blocking shutdown. Must be
125 // called inside the lock. 141 // called inside the lock.
126 bool CanShutdown() const; 142 bool CanShutdown() const;
127 143
144 SequencedWorkerPool* const worker_pool_;
145
128 // The last sequence number used. Managed by GetSequenceToken, since this 146 // The last sequence number used. Managed by GetSequenceToken, since this
129 // only does threadsafe increment operations, you do not need to hold the 147 // only does threadsafe increment operations, you do not need to hold the
130 // lock. 148 // lock.
131 volatile base::subtle::Atomic32 last_sequence_number_; 149 volatile subtle::Atomic32 last_sequence_number_;
132 150
133 // This lock protects |everything in this class|. Do not read or modify 151 // This lock protects |everything in this class|. Do not read or modify
134 // anything without holding this lock. Do not block while holding this 152 // anything without holding this lock. Do not block while holding this
135 // lock. 153 // lock.
136 base::Lock lock_; 154 Lock lock_;
137 155
138 // Condition variable used to wake up worker threads when a task is runnable. 156 // Condition variable used to wake up worker threads when a task is runnable.
139 base::ConditionVariable cond_var_; 157 ConditionVariable cond_var_;
140 158
141 // The maximum number of worker threads we'll create. 159 // The maximum number of worker threads we'll create.
142 size_t max_threads_; 160 const size_t max_threads_;
143 161
144 std::string thread_name_prefix_; 162 const std::string thread_name_prefix_;
145 163
146 // Associates all known sequence token names with their IDs. 164 // Associates all known sequence token names with their IDs.
147 std::map<std::string, int> named_sequence_tokens_; 165 std::map<std::string, int> named_sequence_tokens_;
148 166
149 // Owning pointers to all threads we've created so far. Since we lazily 167 // Owning pointers to all threads we've created so far. Since we lazily
150 // create threads, this may be less than max_threads_ and will be initially 168 // create threads, this may be less than max_threads_ and will be initially
151 // empty. 169 // empty.
152 std::vector<linked_ptr<Worker> > threads_; 170 std::vector<linked_ptr<Worker> > threads_;
153 171
154 // Set to true when we're in the process of creating another thread. 172 // Set to true when we're in the process of creating another thread.
(...skipping 15 matching lines...) Expand all
170 std::list<SequencedTask> pending_tasks_; 188 std::list<SequencedTask> pending_tasks_;
171 size_t pending_task_count_; 189 size_t pending_task_count_;
172 190
173 // Number of tasks in the pending_tasks_ list that are marked as blocking 191 // Number of tasks in the pending_tasks_ list that are marked as blocking
174 // shutdown. 192 // shutdown.
175 size_t blocking_shutdown_pending_task_count_; 193 size_t blocking_shutdown_pending_task_count_;
176 194
177 // Lists all sequence tokens currently executing. 195 // Lists all sequence tokens currently executing.
178 std::set<int> current_sequences_; 196 std::set<int> current_sequences_;
179 197
180 // Set when the app is terminating and no further tasks should be allowed, 198 // Set when Shutdown is called and no further tasks should be
181 // though we may still be running existing tasks. 199 // allowed, though we may still be running existing tasks.
182 bool terminating_;
183
184 // Set when Shutdown is called to do some assertions.
185 bool shutdown_called_; 200 bool shutdown_called_;
186 201
187 SequencedWorkerPool::TestingObserver* testing_observer_; 202 TestingObserver* testing_observer_;
203
204 DISALLOW_COPY_AND_ASSIGN(Inner);
188 }; 205 };
189 206
190 SequencedWorkerPool::Worker::Worker(SequencedWorkerPool::Inner* inner, 207 // Worker definitions ---------------------------------------------------------
191 int thread_number, 208
192 const std::string& prefix) 209 SequencedWorkerPool::Worker::Worker(
193 : base::SimpleThread( 210 const scoped_refptr<SequencedWorkerPool>& worker_pool,
211 int thread_number,
212 const std::string& prefix)
213 : SimpleThread(
194 prefix + StringPrintf("Worker%d", thread_number).c_str()), 214 prefix + StringPrintf("Worker%d", thread_number).c_str()),
195 inner_(inner), 215 worker_pool_(worker_pool) {
196 current_shutdown_mode_(SequencedWorkerPool::CONTINUE_ON_SHUTDOWN) {
197 Start(); 216 Start();
198 } 217 }
199 218
200 SequencedWorkerPool::Worker::~Worker() { 219 SequencedWorkerPool::Worker::~Worker() {
201 } 220 }
202 221
203 void SequencedWorkerPool::Worker::Run() { 222 void SequencedWorkerPool::Worker::Run() {
204 // Just jump back to the Inner object to run the thread, since it has all the 223 // Just jump back to the Inner object to run the thread, since it has all the
205 // tracking information and queues. It might be more natural to implement 224 // tracking information and queues. It might be more natural to implement
206 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 225 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
207 // having these worker objects at all, but that method lacks the ability to 226 // having these worker objects at all, but that method lacks the ability to
208 // send thread-specific information easily to the thread loop. 227 // send thread-specific information easily to the thread loop.
209 inner_->ThreadLoop(this); 228 worker_pool_->inner_->ThreadLoop(this);
210 } 229 }
211 230
212 SequencedWorkerPool::Inner::Inner(size_t max_threads, 231 // Inner definitions ---------------------------------------------------------
213 const std::string& thread_name_prefix) 232
214 : last_sequence_number_(0), 233 SequencedWorkerPool::Inner::Inner(
234 SequencedWorkerPool* worker_pool,
235 size_t max_threads,
236 const std::string& thread_name_prefix)
237 : worker_pool_(worker_pool),
238 last_sequence_number_(0),
215 lock_(), 239 lock_(),
216 cond_var_(&lock_), 240 cond_var_(&lock_),
217 max_threads_(max_threads), 241 max_threads_(max_threads),
218 thread_name_prefix_(thread_name_prefix), 242 thread_name_prefix_(thread_name_prefix),
219 thread_being_created_(false), 243 thread_being_created_(false),
220 waiting_thread_count_(0), 244 waiting_thread_count_(0),
221 blocking_shutdown_thread_count_(0), 245 blocking_shutdown_thread_count_(0),
222 pending_task_count_(0), 246 pending_task_count_(0),
223 blocking_shutdown_pending_task_count_(0), 247 blocking_shutdown_pending_task_count_(0),
224 terminating_(false),
225 shutdown_called_(false), 248 shutdown_called_(false),
226 testing_observer_(NULL) { 249 testing_observer_(NULL) {}
227 }
228 250
229 SequencedWorkerPool::Inner::~Inner() { 251 SequencedWorkerPool::Inner::~Inner() {
230 // You must call Shutdown() before destroying the pool. 252 // You must call Shutdown() before destroying the pool.
231 DCHECK(shutdown_called_); 253 DCHECK(shutdown_called_);
232 254
233 // Need to explicitly join with the threads before they're destroyed or else 255 // Need to explicitly join with the threads before they're destroyed or else
234 // they will be running when our object is half torn down. 256 // they will be running when our object is half torn down.
235 for (size_t i = 0; i < threads_.size(); i++) 257 for (size_t i = 0; i < threads_.size(); i++)
236 threads_[i]->Join(); 258 threads_[i]->Join();
237 threads_.clear(); 259 threads_.clear();
238 } 260 }
239 261
240 SequencedWorkerPool::SequenceToken 262 SequencedWorkerPool::SequenceToken
241 SequencedWorkerPool::Inner::GetSequenceToken() { 263 SequencedWorkerPool::Inner::GetSequenceToken() {
242 base::subtle::Atomic32 result = 264 subtle::Atomic32 result =
243 base::subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 265 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
244 return SequenceToken(static_cast<int>(result)); 266 return SequenceToken(static_cast<int>(result));
245 } 267 }
246 268
247 SequencedWorkerPool::SequenceToken 269 SequencedWorkerPool::SequenceToken
248 SequencedWorkerPool::Inner::GetNamedSequenceToken( 270 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
249 const std::string& name) { 271 AutoLock lock(lock_);
250 base::AutoLock lock(lock_);
251 return SequenceToken(LockedGetNamedTokenID(name)); 272 return SequenceToken(LockedGetNamedTokenID(name));
252 } 273 }
253 274
254 bool SequencedWorkerPool::Inner::PostTask( 275 bool SequencedWorkerPool::Inner::PostTask(
255 const std::string* optional_token_name, 276 const std::string* optional_token_name,
256 int sequence_token_id, 277 SequenceToken sequence_token,
257 SequencedWorkerPool::WorkerShutdown shutdown_behavior, 278 WorkerShutdown shutdown_behavior,
258 const tracked_objects::Location& from_here, 279 const tracked_objects::Location& from_here,
259 const base::Closure& task) { 280 const Closure& task) {
260 SequencedTask sequenced; 281 SequencedTask sequenced;
261 sequenced.sequence_token_id = sequence_token_id; 282 sequenced.sequence_token_id = sequence_token.id_;
262 sequenced.shutdown_behavior = shutdown_behavior; 283 sequenced.shutdown_behavior = shutdown_behavior;
263 sequenced.location = from_here; 284 sequenced.location = from_here;
264 sequenced.task = task; 285 sequenced.task = task;
265 286
266 int create_thread_id = 0; 287 int create_thread_id = 0;
267 { 288 {
268 base::AutoLock lock(lock_); 289 AutoLock lock(lock_);
269 if (terminating_) 290 if (shutdown_called_)
270 return false; 291 return false;
271 292
272 // Now that we have the lock, apply the named token rules. 293 // Now that we have the lock, apply the named token rules.
273 if (optional_token_name) 294 if (optional_token_name)
274 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 295 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
275 296
276 pending_tasks_.push_back(sequenced); 297 pending_tasks_.push_back(sequenced);
277 pending_task_count_++; 298 pending_task_count_++;
278 if (shutdown_behavior == BLOCK_SHUTDOWN) 299 if (shutdown_behavior == BLOCK_SHUTDOWN)
279 blocking_shutdown_pending_task_count_++; 300 blocking_shutdown_pending_task_count_++;
280 301
281 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 302 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
282 } 303 }
283 304
284 // Actually start the additional thread or signal an existing one now that 305 // Actually start the additional thread or signal an existing one now that
285 // we're outside the lock. 306 // we're outside the lock.
286 if (create_thread_id) 307 if (create_thread_id)
287 FinishStartingAdditionalThread(create_thread_id); 308 FinishStartingAdditionalThread(create_thread_id);
288 else 309 else
289 cond_var_.Signal(); 310 cond_var_.Signal();
290 311
291 return true; 312 return true;
292 } 313 }
293 314
294 void SequencedWorkerPool::Inner::Flush() { 315 void SequencedWorkerPool::Inner::FlushForTesting() {
295 { 316 {
296 base::AutoLock lock(lock_); 317 AutoLock lock(lock_);
297 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size()) 318 while (pending_task_count_ > 0 || waiting_thread_count_ < threads_.size())
298 cond_var_.Wait(); 319 cond_var_.Wait();
299 } 320 }
300 cond_var_.Signal(); 321 cond_var_.Signal();
301 } 322 }
302 323
303 void SequencedWorkerPool::Inner::Shutdown() { 324 void SequencedWorkerPool::Inner::Shutdown() {
304 if (shutdown_called_)
305 return;
306 shutdown_called_ = true;
307
308 // Mark us as terminated and go through and drop all tasks that aren't 325 // Mark us as terminated and go through and drop all tasks that aren't
309 // required to run on shutdown. Since no new tasks will get posted once the 326 // required to run on shutdown. Since no new tasks will get posted once the
310 // terminated flag is set, this ensures that all remaining tasks are required 327 // terminated flag is set, this ensures that all remaining tasks are required
311 // for shutdown whenever the termianted_ flag is set. 328 // for shutdown whenever the termianted_ flag is set.
312 { 329 {
313 base::AutoLock lock(lock_); 330 AutoLock lock(lock_);
314 DCHECK(!terminating_); 331
315 terminating_ = true; 332 if (shutdown_called_)
333 return;
334 shutdown_called_ = true;
316 335
317 // Tickle the threads. This will wake up a waiting one so it will know that 336 // Tickle the threads. This will wake up a waiting one so it will know that
318 // it can exit, which in turn will wake up any other waiting ones. 337 // it can exit, which in turn will wake up any other waiting ones.
319 cond_var_.Signal(); 338 cond_var_.Signal();
320 339
321 // There are no pending or running tasks blocking shutdown, we're done. 340 // There are no pending or running tasks blocking shutdown, we're done.
322 if (CanShutdown()) 341 if (CanShutdown())
323 return; 342 return;
324 } 343 }
325 344
326 // If we get here, we know we're either waiting on a blocking task that's 345 // If we get here, we know we're either waiting on a blocking task that's
327 // currently running, waiting on a blocking task that hasn't been scheduled 346 // currently running, waiting on a blocking task that hasn't been scheduled
328 // yet, or both. Block on the "queue empty" event to know when all tasks are 347 // yet, or both. Block on the "queue empty" event to know when all tasks are
329 // complete. This must be done outside the lock. 348 // complete. This must be done outside the lock.
330 if (testing_observer_) 349 if (testing_observer_)
331 testing_observer_->WillWaitForShutdown(); 350 testing_observer_->WillWaitForShutdown();
332 351
333 base::TimeTicks shutdown_wait_begin = base::TimeTicks::Now(); 352 TimeTicks shutdown_wait_begin = TimeTicks::Now();
334 353
335 // Wait for no more tasks. 354 // Wait for no more tasks.
336 { 355 {
337 base::AutoLock lock(lock_); 356 AutoLock lock(lock_);
338 while (!CanShutdown()) 357 while (!CanShutdown())
339 cond_var_.Wait(); 358 cond_var_.Wait();
340 } 359 }
341 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 360 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
342 base::TimeTicks::Now() - shutdown_wait_begin); 361 TimeTicks::Now() - shutdown_wait_begin);
343 } 362 }
344 363
345 void SequencedWorkerPool::Inner::SetTestingObserver( 364 void SequencedWorkerPool::Inner::SetTestingObserver(
346 SequencedWorkerPool::TestingObserver* observer) { 365 TestingObserver* observer) {
347 base::AutoLock lock(lock_); 366 AutoLock lock(lock_);
348 testing_observer_ = observer; 367 testing_observer_ = observer;
349 } 368 }
350 369
351 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 370 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
352 { 371 {
353 base::AutoLock lock(lock_); 372 AutoLock lock(lock_);
354 DCHECK(thread_being_created_); 373 DCHECK(thread_being_created_);
355 thread_being_created_ = false; 374 thread_being_created_ = false;
356 threads_.push_back(linked_ptr<Worker>(this_worker)); 375 threads_.push_back(linked_ptr<Worker>(this_worker));
357 376
358 while (true) { 377 while (true) {
359 // See GetWork for what delete_these_outside_lock is doing. 378 // See GetWork for what delete_these_outside_lock is doing.
360 SequencedTask task; 379 SequencedTask task;
361 std::vector<base::Closure> delete_these_outside_lock; 380 std::vector<Closure> delete_these_outside_lock;
362 if (GetWork(&task, &delete_these_outside_lock)) { 381 if (GetWork(&task, &delete_these_outside_lock)) {
363 int new_thread_id = WillRunWorkerTask(task); 382 int new_thread_id = WillRunWorkerTask(task);
364 { 383 {
365 base::AutoUnlock unlock(lock_); 384 AutoUnlock unlock(lock_);
366 cond_var_.Signal(); 385 cond_var_.Signal();
367 delete_these_outside_lock.clear(); 386 delete_these_outside_lock.clear();
368 387
369 // Complete thread creation outside the lock if necessary. 388 // Complete thread creation outside the lock if necessary.
370 if (new_thread_id) 389 if (new_thread_id)
371 FinishStartingAdditionalThread(new_thread_id); 390 FinishStartingAdditionalThread(new_thread_id);
372 391
373 task.task.Run(); 392 task.task.Run();
374 393
375 // Make sure our task is erased outside the lock for the same reason 394 // Make sure our task is erased outside the lock for the same reason
376 // we do this with delete_these_oustide_lock. 395 // we do this with delete_these_oustide_lock.
377 task.task = base::Closure(); 396 task.task = Closure();
378 } 397 }
379 DidRunWorkerTask(task); // Must be done inside the lock. 398 DidRunWorkerTask(task); // Must be done inside the lock.
380 } else { 399 } else {
381 // When we're terminating and there's no more work, we can shut down. 400 // When we're terminating and there's no more work, we can
382 // You can't get more tasks posted once terminating_ is set. There may 401 // shut down. You can't get more tasks posted once
383 // be some tasks stuck behind running ones with the same sequence 402 // shutdown_called_ is set. There may be some tasks stuck
384 // token, but additional threads won't help this case. 403 // behind running ones with the same sequence token, but
385 if (terminating_) 404 // additional threads won't help this case.
405 if (shutdown_called_)
386 break; 406 break;
387 waiting_thread_count_++; 407 waiting_thread_count_++;
388 cond_var_.Signal(); // For Flush() that may be waiting on the 408 cond_var_.Signal(); // For Flush() that may be waiting on the
389 // waiting thread count to go up. 409 // waiting thread count to go up.
390 cond_var_.Wait(); 410 cond_var_.Wait();
391 waiting_thread_count_--; 411 waiting_thread_count_--;
392 } 412 }
393 } 413 }
394 } 414 }
395 415
(...skipping 13 matching lines...) Expand all
409 return found->second; // Got an existing one. 429 return found->second; // Got an existing one.
410 430
411 // Create a new one for this name. 431 // Create a new one for this name.
412 SequenceToken result = GetSequenceToken(); 432 SequenceToken result = GetSequenceToken();
413 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 433 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
414 return result.id_; 434 return result.id_;
415 } 435 }
416 436
417 bool SequencedWorkerPool::Inner::GetWork( 437 bool SequencedWorkerPool::Inner::GetWork(
418 SequencedTask* task, 438 SequencedTask* task,
419 std::vector<base::Closure>* delete_these_outside_lock) { 439 std::vector<Closure>* delete_these_outside_lock) {
420 lock_.AssertAcquired(); 440 lock_.AssertAcquired();
421 441
422 DCHECK_EQ(pending_tasks_.size(), pending_task_count_); 442 DCHECK_EQ(pending_tasks_.size(), pending_task_count_);
423 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 443 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
424 static_cast<int>(pending_task_count_)); 444 static_cast<int>(pending_task_count_));
425 445
426 // Find the next task with a sequence token that's not currently in use. 446 // Find the next task with a sequence token that's not currently in use.
427 // If the token is in use, that means another thread is running something 447 // If the token is in use, that means another thread is running something
428 // in that sequence, and we can't run it without going out-of-order. 448 // in that sequence, and we can't run it without going out-of-order.
429 // 449 //
(...skipping 18 matching lines...) Expand all
448 bool found_task = false; 468 bool found_task = false;
449 int unrunnable_tasks = 0; 469 int unrunnable_tasks = 0;
450 std::list<SequencedTask>::iterator i = pending_tasks_.begin(); 470 std::list<SequencedTask>::iterator i = pending_tasks_.begin();
451 while (i != pending_tasks_.end()) { 471 while (i != pending_tasks_.end()) {
452 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 472 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
453 unrunnable_tasks++; 473 unrunnable_tasks++;
454 ++i; 474 ++i;
455 continue; 475 continue;
456 } 476 }
457 477
458 if (terminating_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 478 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
459 // We're shutting down and the task we just found isn't blocking 479 // We're shutting down and the task we just found isn't blocking
460 // shutdown. Delete it and get more work. 480 // shutdown. Delete it and get more work.
461 // 481 //
462 // Note that we do not want to delete unrunnable tasks. Deleting a task 482 // Note that we do not want to delete unrunnable tasks. Deleting a task
463 // can have side effects (like freeing some objects) and deleting a 483 // can have side effects (like freeing some objects) and deleting a
464 // task that's supposed to run after one that's currently running could 484 // task that's supposed to run after one that's currently running could
465 // cause an obscure crash. 485 // cause an obscure crash.
466 // 486 //
467 // We really want to delete these tasks outside the lock in case the 487 // We really want to delete these tasks outside the lock in case the
468 // closures are holding refs to objects that want to post work from 488 // closures are holding refs to objects that want to post work from
(...skipping 26 matching lines...) Expand all
495 return found_task; 515 return found_task;
496 } 516 }
497 517
498 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 518 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
499 lock_.AssertAcquired(); 519 lock_.AssertAcquired();
500 520
501 // Mark the task's sequence number as in use. 521 // Mark the task's sequence number as in use.
502 if (task.sequence_token_id) 522 if (task.sequence_token_id)
503 current_sequences_.insert(task.sequence_token_id); 523 current_sequences_.insert(task.sequence_token_id);
504 524
505 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) 525 if (task.shutdown_behavior == BLOCK_SHUTDOWN)
506 blocking_shutdown_thread_count_++; 526 blocking_shutdown_thread_count_++;
507 527
508 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 528 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
509 // creates a new thread if there is no free one, there is a race when posting 529 // creates a new thread if there is no free one, there is a race when posting
510 // tasks that many tasks could have been posted before a thread started 530 // tasks that many tasks could have been posted before a thread started
511 // running them, so only one thread would have been created. So we also check 531 // running them, so only one thread would have been created. So we also check
512 // whether we should create more threads after removing our task from the 532 // whether we should create more threads after removing our task from the
513 // queue, which also has the nice side effect of creating the workers from 533 // queue, which also has the nice side effect of creating the workers from
514 // background threads rather than the main thread of the app. 534 // background threads rather than the main thread of the app.
515 // 535 //
516 // If another thread wasn't created, we want to wake up an existing thread 536 // If another thread wasn't created, we want to wake up an existing thread
517 // if there is one waiting to pick up the next task. 537 // if there is one waiting to pick up the next task.
518 // 538 //
519 // Note that we really need to do this *before* running the task, not 539 // Note that we really need to do this *before* running the task, not
520 // after. Otherwise, if more than one task is posted, the creation of the 540 // after. Otherwise, if more than one task is posted, the creation of the
521 // second thread (since we only create one at a time) will be blocked by 541 // second thread (since we only create one at a time) will be blocked by
522 // the execution of the first task, which could be arbitrarily long. 542 // the execution of the first task, which could be arbitrarily long.
523 return PrepareToStartAdditionalThreadIfHelpful(); 543 return PrepareToStartAdditionalThreadIfHelpful();
524 } 544 }
525 545
526 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 546 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
527 lock_.AssertAcquired(); 547 lock_.AssertAcquired();
528 548
529 if (task.shutdown_behavior == SequencedWorkerPool::BLOCK_SHUTDOWN) { 549 if (task.shutdown_behavior == BLOCK_SHUTDOWN) {
530 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 550 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
531 blocking_shutdown_thread_count_--; 551 blocking_shutdown_thread_count_--;
532 } 552 }
533 553
534 if (task.sequence_token_id) 554 if (task.sequence_token_id)
535 current_sequences_.erase(task.sequence_token_id); 555 current_sequences_.erase(task.sequence_token_id);
536 } 556 }
537 557
538 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 558 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
539 int sequence_token_id) const { 559 int sequence_token_id) const {
540 lock_.AssertAcquired(); 560 lock_.AssertAcquired();
541 return !sequence_token_id || 561 return !sequence_token_id ||
542 current_sequences_.find(sequence_token_id) == 562 current_sequences_.find(sequence_token_id) ==
543 current_sequences_.end(); 563 current_sequences_.end();
544 } 564 }
545 565
546 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 566 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
567 lock_.AssertAcquired();
547 // How thread creation works: 568 // How thread creation works:
548 // 569 //
549 // We'de like to avoid creating threads with the lock held. However, we 570 // We'de like to avoid creating threads with the lock held. However, we
550 // need to be sure that we have an accurate accounting of the threads for 571 // need to be sure that we have an accurate accounting of the threads for
551 // proper Joining and deltion on shutdown. 572 // proper Joining and deltion on shutdown.
552 // 573 //
553 // We need to figure out if we need another thread with the lock held, which 574 // We need to figure out if we need another thread with the lock held, which
554 // is what this function does. It then marks us as in the process of creating 575 // is what this function does. It then marks us as in the process of creating
555 // a thread. When we do shutdown, we wait until the thread_being_created_ 576 // a thread. When we do shutdown, we wait until the thread_being_created_
556 // flag is cleared, which ensures that the new thread is properly added to 577 // flag is cleared, which ensures that the new thread is properly added to
557 // all the data structures and we can't leak it. Once shutdown starts, we'll 578 // all the data structures and we can't leak it. Once shutdown starts, we'll
558 // refuse to create more threads or they would be leaked. 579 // refuse to create more threads or they would be leaked.
559 // 580 //
560 // Note that this creates a mostly benign race condition on shutdown that 581 // Note that this creates a mostly benign race condition on shutdown that
561 // will cause fewer workers to be created than one would expect. It isn't 582 // will cause fewer workers to be created than one would expect. It isn't
562 // much of an issue in real life, but affects some tests. Since we only spawn 583 // much of an issue in real life, but affects some tests. Since we only spawn
563 // one worker at a time, the following sequence of events can happen: 584 // one worker at a time, the following sequence of events can happen:
564 // 585 //
565 // 1. Main thread posts a bunch of unrelated tasks that would normally be 586 // 1. Main thread posts a bunch of unrelated tasks that would normally be
566 // run on separate threads. 587 // run on separate threads.
567 // 2. The first task post causes us to start a worker. Other tasks do not 588 // 2. The first task post causes us to start a worker. Other tasks do not
568 // cause a worker to start since one is pending. 589 // cause a worker to start since one is pending.
569 // 3. Main thread initiates shutdown. 590 // 3. Main thread initiates shutdown.
570 // 4. No more threads are created since the terminating_ flag is set. 591 // 4. No more threads are created since the shutdown_called_ flag is set.
571 // 592 //
572 // The result is that one may expect that max_threads_ workers to be created 593 // The result is that one may expect that max_threads_ workers to be created
573 // given the workload, but in reality fewer may be created because the 594 // given the workload, but in reality fewer may be created because the
574 // sequence of thread creation on the background threads is racing with the 595 // sequence of thread creation on the background threads is racing with the
575 // shutdown call. 596 // shutdown call.
576 if (!terminating_ && 597 if (!shutdown_called_ &&
577 !thread_being_created_ && 598 !thread_being_created_ &&
578 threads_.size() < max_threads_ && 599 threads_.size() < max_threads_ &&
579 waiting_thread_count_ == 0) { 600 waiting_thread_count_ == 0) {
580 // We could use an additional thread if there's work to be done. 601 // We could use an additional thread if there's work to be done.
581 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin(); 602 for (std::list<SequencedTask>::iterator i = pending_tasks_.begin();
582 i != pending_tasks_.end(); ++i) { 603 i != pending_tasks_.end(); ++i) {
583 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 604 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
584 // Found a runnable task, mark the thread as being started. 605 // Found a runnable task, mark the thread as being started.
585 thread_being_created_ = true; 606 thread_being_created_ = true;
586 return static_cast<int>(threads_.size() + 1); 607 return static_cast<int>(threads_.size() + 1);
587 } 608 }
588 } 609 }
589 } 610 }
590 return 0; 611 return 0;
591 } 612 }
592 613
593 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 614 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
594 int thread_number) { 615 int thread_number) {
595 // Called outside of the lock. 616 // Called outside of the lock.
596 DCHECK(thread_number > 0); 617 DCHECK(thread_number > 0);
597 618
598 // The worker is assigned to the list when the thread actually starts, which 619 // The worker is assigned to the list when the thread actually starts, which
599 // will manage the memory of the pointer. 620 // will manage the memory of the pointer.
600 new Worker(this, thread_number, thread_name_prefix_); 621 new Worker(worker_pool_, thread_number, thread_name_prefix_);
601 } 622 }
602 623
603 bool SequencedWorkerPool::Inner::CanShutdown() const { 624 bool SequencedWorkerPool::Inner::CanShutdown() const {
604 lock_.AssertAcquired(); 625 lock_.AssertAcquired();
605 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 626 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
606 return !thread_being_created_ && 627 return !thread_being_created_ &&
607 blocking_shutdown_thread_count_ == 0 && 628 blocking_shutdown_thread_count_ == 0 &&
608 blocking_shutdown_pending_task_count_ == 0; 629 blocking_shutdown_pending_task_count_ == 0;
609 } 630 }
610 631
611 // SequencedWorkerPool -------------------------------------------------------- 632 // SequencedWorkerPool --------------------------------------------------------
612 633
613 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads, 634 SequencedWorkerPool::SequencedWorkerPool(
614 const std::string& thread_name_prefix) 635 size_t max_threads,
615 : inner_(new Inner(max_threads, thread_name_prefix)) { 636 const std::string& thread_name_prefix)
616 } 637 : inner_(new Inner(ALLOW_THIS_IN_INITIALIZER_LIST(this),
638 max_threads, thread_name_prefix)) {}
617 639
618 SequencedWorkerPool::~SequencedWorkerPool() { 640 SequencedWorkerPool::~SequencedWorkerPool() {}
619 }
620 641
621 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 642 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
622 return inner_->GetSequenceToken(); 643 return inner_->GetSequenceToken();
623 } 644 }
624 645
625 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 646 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
626 const std::string& name) { 647 const std::string& name) {
627 return inner_->GetNamedSequenceToken(name); 648 return inner_->GetNamedSequenceToken(name);
628 } 649 }
629 650
630 bool SequencedWorkerPool::PostWorkerTask( 651 bool SequencedWorkerPool::PostWorkerTask(
631 const tracked_objects::Location& from_here, 652 const tracked_objects::Location& from_here,
632 const base::Closure& task) { 653 const Closure& task) {
633 return inner_->PostTask(NULL, 0, BLOCK_SHUTDOWN, from_here, task); 654 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
655 from_here, task);
634 } 656 }
635 657
636 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 658 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
637 const tracked_objects::Location& from_here, 659 const tracked_objects::Location& from_here,
638 const base::Closure& task, 660 const Closure& task,
639 WorkerShutdown shutdown_behavior) { 661 WorkerShutdown shutdown_behavior) {
640 return inner_->PostTask(NULL, 0, shutdown_behavior, from_here, task); 662 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
663 from_here, task);
641 } 664 }
642 665
643 bool SequencedWorkerPool::PostSequencedWorkerTask( 666 bool SequencedWorkerPool::PostSequencedWorkerTask(
644 SequenceToken sequence_token, 667 SequenceToken sequence_token,
645 const tracked_objects::Location& from_here, 668 const tracked_objects::Location& from_here,
646 const base::Closure& task) { 669 const Closure& task) {
647 return inner_->PostTask(NULL, sequence_token.id_, BLOCK_SHUTDOWN, 670 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
648 from_here, task); 671 from_here, task);
649 } 672 }
650 673
651 bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 674 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
652 const std::string& token_name, 675 const std::string& token_name,
653 const tracked_objects::Location& from_here, 676 const tracked_objects::Location& from_here,
654 const base::Closure& task) { 677 const Closure& task) {
655 DCHECK(!token_name.empty()); 678 DCHECK(!token_name.empty());
656 return inner_->PostTask(&token_name, 0, BLOCK_SHUTDOWN, from_here, task); 679 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
680 from_here, task);
657 } 681 }
658 682
659 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 683 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
660 SequenceToken sequence_token, 684 SequenceToken sequence_token,
661 const tracked_objects::Location& from_here, 685 const tracked_objects::Location& from_here,
662 const base::Closure& task, 686 const Closure& task,
663 WorkerShutdown shutdown_behavior) { 687 WorkerShutdown shutdown_behavior) {
664 return inner_->PostTask(NULL, sequence_token.id_, shutdown_behavior, 688 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
665 from_here, task); 689 from_here, task);
666 } 690 }
667 691
668 void SequencedWorkerPool::FlushForTesting() { 692 void SequencedWorkerPool::FlushForTesting() {
669 inner_->Flush(); 693 inner_->FlushForTesting();
670 } 694 }
671 695
672 void SequencedWorkerPool::Shutdown() { 696 void SequencedWorkerPool::Shutdown() {
673 inner_->Shutdown(); 697 inner_->Shutdown();
674 } 698 }
675 699
676 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) { 700 void SequencedWorkerPool::SetTestingObserver(TestingObserver* observer) {
677 inner_->SetTestingObserver(observer); 701 inner_->SetTestingObserver(observer);
678 } 702 }
679 703
680 } // namespace base 704 } // namespace base
OLDNEW
« no previous file with comments | « base/threading/sequenced_worker_pool.h ('k') | base/threading/sequenced_worker_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698