OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/thread_pool.h" | 5 #include "vm/thread_pool.h" |
6 | 6 |
7 #include "vm/flags.h" | 7 #include "vm/flags.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 | 9 |
10 namespace dart { | 10 namespace dart { |
11 | 11 |
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, | 12 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
13 "Free workers when they have been idle for this amount of time."); | 13 "Free workers when they have been idle for this amount of time."); |
14 | 14 |
15 Monitor* ThreadPool::exit_monitor_ = NULL; | |
16 int* ThreadPool::exit_count_ = NULL; | |
17 | |
18 ThreadPool::ThreadPool() | 15 ThreadPool::ThreadPool() |
19 : shutting_down_(false), | 16 : shutting_down_(false), |
20 all_workers_(NULL), | 17 all_workers_(NULL), |
21 idle_workers_(NULL), | 18 idle_workers_(NULL), |
22 count_started_(0), | 19 count_started_(0), |
23 count_stopped_(0), | 20 count_stopped_(0), |
24 count_running_(0), | 21 count_running_(0), |
25 count_idle_(0) { | 22 count_idle_(0), |
| 23 shutting_down_workers_(NULL), |
| 24 join_list_(NULL) { |
26 } | 25 } |
27 | 26 |
28 | 27 |
29 ThreadPool::~ThreadPool() { | 28 ThreadPool::~ThreadPool() { |
30 Shutdown(); | 29 Shutdown(); |
31 } | 30 } |
32 | 31 |
33 | 32 |
34 void ThreadPool::Run(Task* task) { | 33 bool ThreadPool::Run(Task* task) { |
35 Worker* worker = NULL; | 34 Worker* worker = NULL; |
36 bool new_worker = false; | 35 bool new_worker = false; |
37 { | 36 { |
38 // We need ThreadPool::mutex_ to access worker lists and other | 37 // We need ThreadPool::mutex_ to access worker lists and other |
39 // ThreadPool state. | 38 // ThreadPool state. |
40 MutexLocker ml(&mutex_); | 39 MutexLocker ml(&mutex_); |
41 if (shutting_down_) { | 40 if (shutting_down_) { |
42 return; | 41 return false; |
43 } | 42 } |
44 if (idle_workers_ == NULL) { | 43 if (idle_workers_ == NULL) { |
45 worker = new Worker(this); | 44 worker = new Worker(this); |
46 ASSERT(worker != NULL); | 45 ASSERT(worker != NULL); |
47 new_worker = true; | 46 new_worker = true; |
48 count_started_++; | 47 count_started_++; |
49 | 48 |
50 // Add worker to the all_workers_ list. | 49 // Add worker to the all_workers_ list. |
51 worker->all_next_ = all_workers_; | 50 worker->all_next_ = all_workers_; |
52 all_workers_ = worker; | 51 all_workers_ = worker; |
53 worker->owned_ = true; | 52 worker->owned_ = true; |
| 53 count_running_++; |
54 } else { | 54 } else { |
55 // Get the first worker from the idle worker list. | 55 // Get the first worker from the idle worker list. |
56 worker = idle_workers_; | 56 worker = idle_workers_; |
57 idle_workers_ = worker->idle_next_; | 57 idle_workers_ = worker->idle_next_; |
58 worker->idle_next_ = NULL; | 58 worker->idle_next_ = NULL; |
59 count_idle_--; | 59 count_idle_--; |
| 60 count_running_++; |
60 } | 61 } |
61 count_running_++; | |
62 } | 62 } |
| 63 |
63 // Release ThreadPool::mutex_ before calling Worker functions. | 64 // Release ThreadPool::mutex_ before calling Worker functions. |
64 ASSERT(worker != NULL); | 65 ASSERT(worker != NULL); |
65 worker->SetTask(task); | 66 worker->SetTask(task); |
66 if (new_worker) { | 67 if (new_worker) { |
67 // Call StartThread after we've assigned the first task. | 68 // Call StartThread after we've assigned the first task. |
68 worker->StartThread(); | 69 worker->StartThread(); |
69 } | 70 } |
| 71 return true; |
70 } | 72 } |
71 | 73 |
72 | 74 |
73 void ThreadPool::Shutdown() { | 75 void ThreadPool::Shutdown() { |
74 Worker* saved = NULL; | 76 Worker* saved = NULL; |
75 { | 77 { |
76 MutexLocker ml(&mutex_); | 78 MutexLocker ml(&mutex_); |
77 shutting_down_ = true; | 79 shutting_down_ = true; |
78 saved = all_workers_; | 80 saved = all_workers_; |
79 all_workers_ = NULL; | 81 all_workers_ = NULL; |
80 idle_workers_ = NULL; | 82 idle_workers_ = NULL; |
81 | 83 |
82 Worker* current = saved; | 84 Worker* current = saved; |
83 while (current != NULL) { | 85 while (current != NULL) { |
84 Worker* next = current->all_next_; | 86 Worker* next = current->all_next_; |
85 current->idle_next_ = NULL; | 87 current->idle_next_ = NULL; |
86 current->owned_ = false; | 88 current->owned_ = false; |
87 current = next; | 89 current = next; |
88 count_stopped_++; | 90 count_stopped_++; |
89 } | 91 } |
90 | 92 |
91 count_idle_ = 0; | 93 count_idle_ = 0; |
92 count_running_ = 0; | 94 count_running_ = 0; |
93 ASSERT(count_started_ == count_stopped_); | 95 ASSERT(count_started_ == count_stopped_); |
94 } | 96 } |
95 // Release ThreadPool::mutex_ before calling Worker functions. | 97 // Release ThreadPool::mutex_ before calling Worker functions. |
96 | 98 |
97 Worker* current = saved; | 99 { |
98 while (current != NULL) { | 100 MonitorLocker eml(&exit_monitor_); |
99 // We may access all_next_ without holding ThreadPool::mutex_ here | 101 |
100 // because the worker is no longer owned by the ThreadPool. | 102 // First tell all the workers to shut down. |
101 Worker* next = current->all_next_; | 103 Worker* current = saved; |
102 current->all_next_ = NULL; | 104 ThreadId id = OSThread::GetCurrentThreadId(); |
103 current->Shutdown(); | 105 while (current != NULL) { |
104 current = next; | 106 Worker* next = current->all_next_; |
| 107 ThreadId currentId = current->id(); |
| 108 if (currentId != id) { |
| 109 AddWorkerToShutdownList(current); |
| 110 } |
| 111 current->Shutdown(); |
| 112 current = next; |
| 113 } |
| 114 saved = NULL; |
| 115 |
| 116 // Wait until all workers will exit. |
| 117 while (shutting_down_workers_ != NULL) { |
| 118 // Here, we are waiting for workers to exit. When a worker exits we will |
| 119 // be notified. |
| 120 eml.Wait(); |
| 121 } |
105 } | 122 } |
| 123 |
| 124 // Extract the join list, and join on the threads. |
| 125 JoinList* list = NULL; |
| 126 { |
| 127 MutexLocker ml(&mutex_); |
| 128 list = join_list_; |
| 129 join_list_ = NULL; |
| 130 } |
| 131 |
| 132 // Join non-idle threads. |
| 133 JoinList::Join(&list); |
| 134 |
| 135 #if defined(DEBUG) |
| 136 { |
| 137 MutexLocker ml(&mutex_); |
| 138 ASSERT(join_list_ == NULL); |
| 139 } |
| 140 #endif |
106 } | 141 } |
107 | 142 |
108 | 143 |
109 bool ThreadPool::IsIdle(Worker* worker) { | 144 bool ThreadPool::IsIdle(Worker* worker) { |
110 ASSERT(worker != NULL && worker->owned_); | 145 ASSERT(worker != NULL && worker->owned_); |
111 for (Worker* current = idle_workers_; | 146 for (Worker* current = idle_workers_; |
112 current != NULL; | 147 current != NULL; |
113 current = current->idle_next_) { | 148 current = current->idle_next_) { |
114 if (current == worker) { | 149 if (current == worker) { |
115 return true; | 150 return true; |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
149 ASSERT(worker != NULL && worker->owned_); | 184 ASSERT(worker != NULL && worker->owned_); |
150 if (all_workers_ == NULL) { | 185 if (all_workers_ == NULL) { |
151 return false; | 186 return false; |
152 } | 187 } |
153 | 188 |
154 // Special case head of list. | 189 // Special case head of list. |
155 if (all_workers_ == worker) { | 190 if (all_workers_ == worker) { |
156 all_workers_ = worker->all_next_; | 191 all_workers_ = worker->all_next_; |
157 worker->all_next_ = NULL; | 192 worker->all_next_ = NULL; |
158 worker->owned_ = false; | 193 worker->owned_ = false; |
159 worker->pool_ = NULL; | 194 worker->done_ = true; |
160 return true; | 195 return true; |
161 } | 196 } |
162 | 197 |
163 for (Worker* current = all_workers_; | 198 for (Worker* current = all_workers_; |
164 current->all_next_ != NULL; | 199 current->all_next_ != NULL; |
165 current = current->all_next_) { | 200 current = current->all_next_) { |
166 if (current->all_next_ == worker) { | 201 if (current->all_next_ == worker) { |
167 current->all_next_ = worker->all_next_; | 202 current->all_next_ = worker->all_next_; |
168 worker->all_next_ = NULL; | 203 worker->all_next_ = NULL; |
169 worker->owned_ = false; | 204 worker->owned_ = false; |
170 return true; | 205 return true; |
171 } | 206 } |
172 } | 207 } |
173 return false; | 208 return false; |
174 } | 209 } |
175 | 210 |
176 | 211 |
177 void ThreadPool::SetIdle(Worker* worker) { | 212 void ThreadPool::SetIdleAndReapExited(Worker* worker) { |
178 MutexLocker ml(&mutex_); | 213 JoinList* list = NULL; |
179 if (shutting_down_) { | 214 { |
180 return; | 215 MutexLocker ml(&mutex_); |
| 216 if (shutting_down_) { |
| 217 return; |
| 218 } |
| 219 ASSERT(worker->owned_ && !IsIdle(worker)); |
| 220 worker->idle_next_ = idle_workers_; |
| 221 idle_workers_ = worker; |
| 222 count_idle_++; |
| 223 count_running_--; |
| 224 |
| 225 // While we have the lock, opportunistically grab and clear the join_list_. |
| 226 list = join_list_; |
| 227 join_list_ = NULL; |
181 } | 228 } |
182 ASSERT(worker->owned_ && !IsIdle(worker)); | 229 JoinList::Join(&list); |
183 worker->idle_next_ = idle_workers_; | |
184 idle_workers_ = worker; | |
185 count_idle_++; | |
186 count_running_--; | |
187 } | 230 } |
188 | 231 |
189 | 232 |
190 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { | 233 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
191 MutexLocker ml(&mutex_); | 234 MutexLocker ml(&mutex_); |
192 if (shutting_down_) { | 235 if (shutting_down_) { |
193 return false; | 236 return false; |
194 } | 237 } |
195 // Remove from idle list. | 238 // Remove from idle list. |
196 if (!RemoveWorkerFromIdleList(worker)) { | 239 if (!RemoveWorkerFromIdleList(worker)) { |
197 return false; | 240 return false; |
198 } | 241 } |
199 // Remove from all list. | 242 // Remove from all list. |
200 bool found = RemoveWorkerFromAllList(worker); | 243 bool found = RemoveWorkerFromAllList(worker); |
201 ASSERT(found); | 244 ASSERT(found); |
202 | 245 |
| 246 // The thread for worker will exit. Add its ThreadId to the join_list_ |
| 247 // so that we can join on it at the next opportunity. |
| 248 JoinList::AddLocked(OSThread::GetCurrentThreadJoinId(), &join_list_); |
203 count_stopped_++; | 249 count_stopped_++; |
204 count_idle_--; | 250 count_idle_--; |
205 return true; | 251 return true; |
206 } | 252 } |
207 | 253 |
208 | 254 |
| 255 // Only call while holding the exit_monitor_ |
| 256 void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
| 257 worker->shutdown_next_ = shutting_down_workers_; |
| 258 shutting_down_workers_ = worker; |
| 259 } |
| 260 |
| 261 |
| 262 // Only call while holding the exit_monitor_ |
| 263 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
| 264 ASSERT(worker != NULL); |
| 265 ASSERT(shutting_down_workers_ != NULL); |
| 266 |
| 267 // Special case head of list. |
| 268 if (shutting_down_workers_ == worker) { |
| 269 shutting_down_workers_ = worker->shutdown_next_; |
| 270 worker->shutdown_next_ = NULL; |
| 271 return true; |
| 272 } |
| 273 |
| 274 for (Worker* current = shutting_down_workers_; |
| 275 current->shutdown_next_ != NULL; |
| 276 current = current->shutdown_next_) { |
| 277 if (current->shutdown_next_ == worker) { |
| 278 current->shutdown_next_ = worker->shutdown_next_; |
| 279 worker->shutdown_next_ = NULL; |
| 280 return true; |
| 281 } |
| 282 } |
| 283 return false; |
| 284 } |
| 285 |
| 286 |
| 287 void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) { |
| 288 *list = new JoinList(id, *list); |
| 289 } |
| 290 |
| 291 |
| 292 void ThreadPool::JoinList::Join(JoinList** list) { |
| 293 while (*list != NULL) { |
| 294 JoinList* current = *list; |
| 295 *list = current->next(); |
| 296 OSThread::Join(current->id()); |
| 297 delete current; |
| 298 } |
| 299 } |
| 300 |
| 301 |
209 ThreadPool::Task::Task() { | 302 ThreadPool::Task::Task() { |
210 } | 303 } |
211 | 304 |
212 | 305 |
213 ThreadPool::Task::~Task() { | 306 ThreadPool::Task::~Task() { |
214 } | 307 } |
215 | 308 |
216 | 309 |
217 ThreadPool::Worker::Worker(ThreadPool* pool) | 310 ThreadPool::Worker::Worker(ThreadPool* pool) |
218 : pool_(pool), | 311 : pool_(pool), |
219 task_(NULL), | 312 task_(NULL), |
| 313 id_(OSThread::kInvalidThreadId), |
| 314 done_(false), |
220 owned_(false), | 315 owned_(false), |
221 all_next_(NULL), | 316 all_next_(NULL), |
222 idle_next_(NULL) { | 317 idle_next_(NULL), |
| 318 shutdown_next_(NULL) { |
223 } | 319 } |
224 | 320 |
225 | 321 |
| 322 ThreadId ThreadPool::Worker::id() { |
| 323 MonitorLocker ml(&monitor_); |
| 324 return id_; |
| 325 } |
| 326 |
| 327 |
226 void ThreadPool::Worker::StartThread() { | 328 void ThreadPool::Worker::StartThread() { |
227 #if defined(DEBUG) | 329 #if defined(DEBUG) |
228 // Must call SetTask before StartThread. | 330 // Must call SetTask before StartThread. |
229 { // NOLINT | 331 { // NOLINT |
230 MonitorLocker ml(&monitor_); | 332 MonitorLocker ml(&monitor_); |
231 ASSERT(task_ != NULL); | 333 ASSERT(task_ != NULL); |
232 } | 334 } |
233 #endif | 335 #endif |
234 int result = OSThread::Start(&Worker::Main, reinterpret_cast<uword>(this)); | 336 int result = OSThread::Start(&Worker::Main, reinterpret_cast<uword>(this)); |
235 if (result != 0) { | 337 if (result != 0) { |
(...skipping 21 matching lines...) Expand all Loading... |
257 // out. Give the worker one last desperate chance to live. We | 359 // out. Give the worker one last desperate chance to live. We |
258 // are merciful. | 360 // are merciful. |
259 return 1; | 361 return 1; |
260 } else { | 362 } else { |
261 return FLAG_worker_timeout_millis - waited; | 363 return FLAG_worker_timeout_millis - waited; |
262 } | 364 } |
263 } | 365 } |
264 } | 366 } |
265 | 367 |
266 | 368 |
267 void ThreadPool::Worker::Loop() { | 369 bool ThreadPool::Worker::Loop() { |
268 MonitorLocker ml(&monitor_); | 370 MonitorLocker ml(&monitor_); |
269 int64_t idle_start; | 371 int64_t idle_start; |
270 while (true) { | 372 while (true) { |
271 ASSERT(task_ != NULL); | 373 ASSERT(task_ != NULL); |
272 Task* task = task_; | 374 Task* task = task_; |
273 task_ = NULL; | 375 task_ = NULL; |
274 | 376 |
275 // Release monitor while handling the task. | 377 // Release monitor while handling the task. |
276 monitor_.Exit(); | 378 monitor_.Exit(); |
277 task->Run(); | 379 task->Run(); |
278 ASSERT(Isolate::Current() == NULL); | 380 ASSERT(Isolate::Current() == NULL); |
279 delete task; | 381 delete task; |
280 monitor_.Enter(); | 382 monitor_.Enter(); |
281 | 383 |
282 ASSERT(task_ == NULL); | 384 ASSERT(task_ == NULL); |
283 if (IsDone()) { | 385 if (IsDone()) { |
284 return; | 386 return false; |
285 } | 387 } |
286 ASSERT(pool_ != NULL); | 388 ASSERT(!done_); |
287 pool_->SetIdle(this); | 389 pool_->SetIdleAndReapExited(this); |
288 idle_start = OS::GetCurrentTimeMillis(); | 390 idle_start = OS::GetCurrentTimeMillis(); |
289 while (true) { | 391 while (true) { |
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | 392 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
291 if (task_ != NULL) { | 393 if (task_ != NULL) { |
292 // We've found a task. Process it, regardless of whether the | 394 // We've found a task. Process it, regardless of whether the |
293 // worker is done_. | 395 // worker is done_. |
294 break; | 396 break; |
295 } | 397 } |
296 if (IsDone()) { | 398 if (IsDone()) { |
297 return; | 399 return false; |
298 } | 400 } |
299 if (result == Monitor::kTimedOut && | 401 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
300 pool_->ReleaseIdleWorker(this)) { | 402 return true; |
301 return; | |
302 } | 403 } |
303 } | 404 } |
304 } | 405 } |
305 UNREACHABLE(); | 406 UNREACHABLE(); |
| 407 return false; |
306 } | 408 } |
307 | 409 |
308 | 410 |
309 void ThreadPool::Worker::Shutdown() { | 411 void ThreadPool::Worker::Shutdown() { |
310 MonitorLocker ml(&monitor_); | 412 MonitorLocker ml(&monitor_); |
311 pool_ = NULL; // Fail fast if someone tries to access pool_. | 413 done_ = true; |
312 ml.Notify(); | 414 ml.Notify(); |
313 } | 415 } |
314 | 416 |
315 | 417 |
316 // static | 418 // static |
317 void ThreadPool::Worker::Main(uword args) { | 419 void ThreadPool::Worker::Main(uword args) { |
318 Thread::EnsureInit(); | 420 Thread::EnsureInit(); |
319 Worker* worker = reinterpret_cast<Worker*>(args); | 421 Worker* worker = reinterpret_cast<Worker*>(args); |
320 worker->Loop(); | 422 ThreadId id = OSThread::GetCurrentThreadId(); |
| 423 ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(); |
| 424 ThreadPool* pool; |
| 425 |
| 426 { |
| 427 MonitorLocker ml(&worker->monitor_); |
| 428 ASSERT(worker->task_); |
| 429 worker->id_ = id; |
| 430 pool = worker->pool_; |
| 431 } |
| 432 |
| 433 bool released = worker->Loop(); |
321 | 434 |
322 // It should be okay to access these unlocked here in this assert. | 435 // It should be okay to access these unlocked here in this assert. |
323 ASSERT(!worker->owned_ && | 436 // worker->all_next_ is retained by the pool for shutdown monitoring. |
324 worker->all_next_ == NULL && | 437 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
325 worker->idle_next_ == NULL); | |
326 | 438 |
327 // The exit monitor is only used during testing. | 439 if (!released) { |
328 if (ThreadPool::exit_monitor_) { | 440 // This worker is exiting because the thread pool is being shut down. |
329 MonitorLocker ml(ThreadPool::exit_monitor_); | 441 // Inform the thread pool that we are exiting. We remove this worker from |
330 (*ThreadPool::exit_count_)++; | 442 // shutting_down_workers_ list because there will be no need for the |
331 ml.Notify(); | 443 // ThreadPool to take action for this worker. |
| 444 { |
| 445 MutexLocker ml(&pool->mutex_); |
| 446 JoinList::AddLocked(join_id, &pool->join_list_); |
| 447 } |
| 448 |
| 449 // worker->id_ should never be read again, so set to invalid in debug mode |
| 450 // for asserts. |
| 451 #if defined(DEBUG) |
| 452 { |
| 453 MonitorLocker ml(&worker->monitor_); |
| 454 worker->id_ = OSThread::kInvalidThreadId; |
| 455 } |
| 456 #endif |
| 457 |
| 458 // Remove from the shutdown list, delete, and notify the thread pool. |
| 459 { |
| 460 MonitorLocker eml(&pool->exit_monitor_); |
| 461 pool->RemoveWorkerFromShutdownList(worker); |
| 462 delete worker; |
| 463 eml.Notify(); |
| 464 } |
| 465 } else { |
| 466 // This worker is going down because it was idle for too long. This case |
| 467 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
| 468 // The worker's id is added to the thread pool's join list by |
| 469 // ReleaseIdleWorker, so in the case that the thread pool begins shutting |
| 470 // down immediately after returning from worker->Loop() above, we still |
| 471 // wait for the thread to exit by joining on it in Shutdown(). |
| 472 delete worker; |
332 } | 473 } |
333 delete worker; | |
334 #if defined(TARGET_OS_WINDOWS) | 474 #if defined(TARGET_OS_WINDOWS) |
335 Thread::CleanUp(); | 475 Thread::CleanUp(); |
336 #endif | 476 #endif |
337 } | 477 } |
338 | 478 |
339 } // namespace dart | 479 } // namespace dart |
OLD | NEW |