Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 1275353005: VM thread shutdown. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Merge Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/thread_pool.h" 5 #include "vm/thread_pool.h"
6 6
7 #include "vm/flags.h" 7 #include "vm/flags.h"
8 #include "vm/lockers.h" 8 #include "vm/lockers.h"
9 9
10 namespace dart { 10 namespace dart {
11 11
12 DEFINE_FLAG(int, worker_timeout_millis, 5000, 12 DEFINE_FLAG(int, worker_timeout_millis, 5000,
13 "Free workers when they have been idle for this amount of time."); 13 "Free workers when they have been idle for this amount of time.");
14 14
15 Monitor* ThreadPool::exit_monitor_ = NULL;
16 int* ThreadPool::exit_count_ = NULL;
17
18 ThreadPool::ThreadPool() 15 ThreadPool::ThreadPool()
19 : shutting_down_(false), 16 : shutting_down_(false),
20 all_workers_(NULL), 17 all_workers_(NULL),
21 idle_workers_(NULL), 18 idle_workers_(NULL),
22 count_started_(0), 19 count_started_(0),
23 count_stopped_(0), 20 count_stopped_(0),
24 count_running_(0), 21 count_running_(0),
25 count_idle_(0) { 22 count_idle_(0),
23 shutting_down_workers_(NULL),
24 join_list_(NULL) {
26 } 25 }
27 26
28 27
29 ThreadPool::~ThreadPool() { 28 ThreadPool::~ThreadPool() {
30 Shutdown(); 29 Shutdown();
31 } 30 }
32 31
33 32
34 void ThreadPool::Run(Task* task) { 33 bool ThreadPool::Run(Task* task) {
35 Worker* worker = NULL; 34 Worker* worker = NULL;
36 bool new_worker = false; 35 bool new_worker = false;
37 { 36 {
38 // We need ThreadPool::mutex_ to access worker lists and other 37 // We need ThreadPool::mutex_ to access worker lists and other
39 // ThreadPool state. 38 // ThreadPool state.
40 MutexLocker ml(&mutex_); 39 MutexLocker ml(&mutex_);
41 if (shutting_down_) { 40 if (shutting_down_) {
42 return; 41 return false;
43 } 42 }
44 if (idle_workers_ == NULL) { 43 if (idle_workers_ == NULL) {
45 worker = new Worker(this); 44 worker = new Worker(this);
46 ASSERT(worker != NULL); 45 ASSERT(worker != NULL);
47 new_worker = true; 46 new_worker = true;
48 count_started_++; 47 count_started_++;
49 48
50 // Add worker to the all_workers_ list. 49 // Add worker to the all_workers_ list.
51 worker->all_next_ = all_workers_; 50 worker->all_next_ = all_workers_;
52 all_workers_ = worker; 51 all_workers_ = worker;
53 worker->owned_ = true; 52 worker->owned_ = true;
53 count_running_++;
54 } else { 54 } else {
55 // Get the first worker from the idle worker list. 55 // Get the first worker from the idle worker list.
56 worker = idle_workers_; 56 worker = idle_workers_;
57 idle_workers_ = worker->idle_next_; 57 idle_workers_ = worker->idle_next_;
58 worker->idle_next_ = NULL; 58 worker->idle_next_ = NULL;
59 count_idle_--; 59 count_idle_--;
60 count_running_++;
60 } 61 }
61 count_running_++;
62 } 62 }
63
63 // Release ThreadPool::mutex_ before calling Worker functions. 64 // Release ThreadPool::mutex_ before calling Worker functions.
64 ASSERT(worker != NULL); 65 ASSERT(worker != NULL);
65 worker->SetTask(task); 66 worker->SetTask(task);
66 if (new_worker) { 67 if (new_worker) {
67 // Call StartThread after we've assigned the first task. 68 // Call StartThread after we've assigned the first task.
68 worker->StartThread(); 69 worker->StartThread();
69 } 70 }
71 return true;
70 } 72 }
71 73
72 74
73 void ThreadPool::Shutdown() { 75 void ThreadPool::Shutdown() {
74 Worker* saved = NULL; 76 Worker* saved = NULL;
75 { 77 {
76 MutexLocker ml(&mutex_); 78 MutexLocker ml(&mutex_);
77 shutting_down_ = true; 79 shutting_down_ = true;
78 saved = all_workers_; 80 saved = all_workers_;
79 all_workers_ = NULL; 81 all_workers_ = NULL;
80 idle_workers_ = NULL; 82 idle_workers_ = NULL;
81 83
82 Worker* current = saved; 84 Worker* current = saved;
83 while (current != NULL) { 85 while (current != NULL) {
84 Worker* next = current->all_next_; 86 Worker* next = current->all_next_;
85 current->idle_next_ = NULL; 87 current->idle_next_ = NULL;
86 current->owned_ = false; 88 current->owned_ = false;
87 current = next; 89 current = next;
88 count_stopped_++; 90 count_stopped_++;
89 } 91 }
90 92
91 count_idle_ = 0; 93 count_idle_ = 0;
92 count_running_ = 0; 94 count_running_ = 0;
93 ASSERT(count_started_ == count_stopped_); 95 ASSERT(count_started_ == count_stopped_);
94 } 96 }
95 // Release ThreadPool::mutex_ before calling Worker functions. 97 // Release ThreadPool::mutex_ before calling Worker functions.
96 98
97 Worker* current = saved; 99 {
98 while (current != NULL) { 100 MonitorLocker eml(&exit_monitor_);
99 // We may access all_next_ without holding ThreadPool::mutex_ here 101
100 // because the worker is no longer owned by the ThreadPool. 102 // First tell all the workers to shut down.
101 Worker* next = current->all_next_; 103 Worker* current = saved;
102 current->all_next_ = NULL; 104 ThreadId id = OSThread::GetCurrentThreadId();
103 current->Shutdown(); 105 while (current != NULL) {
104 current = next; 106 Worker* next = current->all_next_;
107 ThreadId currentId = current->id();
108 if (currentId != id) {
109 AddWorkerToShutdownList(current);
110 }
111 current->Shutdown();
112 current = next;
113 }
114 saved = NULL;
115
116 // Wait until all workers will exit.
117 while (shutting_down_workers_ != NULL) {
118 // Here, we are waiting for workers to exit. When a worker exits we will
119 // be notified.
120 eml.Wait();
121 }
105 } 122 }
123
124 // Extract the join list, and join on the threads.
125 JoinList* list = NULL;
126 {
127 MutexLocker ml(&mutex_);
128 list = join_list_;
129 join_list_ = NULL;
130 }
131
132 // Join non-idle threads.
133 JoinList::Join(&list);
134
135 #if defined(DEBUG)
136 {
137 MutexLocker ml(&mutex_);
138 ASSERT(join_list_ == NULL);
139 }
140 #endif
106 } 141 }
107 142
108 143
109 bool ThreadPool::IsIdle(Worker* worker) { 144 bool ThreadPool::IsIdle(Worker* worker) {
110 ASSERT(worker != NULL && worker->owned_); 145 ASSERT(worker != NULL && worker->owned_);
111 for (Worker* current = idle_workers_; 146 for (Worker* current = idle_workers_;
112 current != NULL; 147 current != NULL;
113 current = current->idle_next_) { 148 current = current->idle_next_) {
114 if (current == worker) { 149 if (current == worker) {
115 return true; 150 return true;
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 ASSERT(worker != NULL && worker->owned_); 184 ASSERT(worker != NULL && worker->owned_);
150 if (all_workers_ == NULL) { 185 if (all_workers_ == NULL) {
151 return false; 186 return false;
152 } 187 }
153 188
154 // Special case head of list. 189 // Special case head of list.
155 if (all_workers_ == worker) { 190 if (all_workers_ == worker) {
156 all_workers_ = worker->all_next_; 191 all_workers_ = worker->all_next_;
157 worker->all_next_ = NULL; 192 worker->all_next_ = NULL;
158 worker->owned_ = false; 193 worker->owned_ = false;
159 worker->pool_ = NULL; 194 worker->done_ = true;
160 return true; 195 return true;
161 } 196 }
162 197
163 for (Worker* current = all_workers_; 198 for (Worker* current = all_workers_;
164 current->all_next_ != NULL; 199 current->all_next_ != NULL;
165 current = current->all_next_) { 200 current = current->all_next_) {
166 if (current->all_next_ == worker) { 201 if (current->all_next_ == worker) {
167 current->all_next_ = worker->all_next_; 202 current->all_next_ = worker->all_next_;
168 worker->all_next_ = NULL; 203 worker->all_next_ = NULL;
169 worker->owned_ = false; 204 worker->owned_ = false;
170 return true; 205 return true;
171 } 206 }
172 } 207 }
173 return false; 208 return false;
174 } 209 }
175 210
176 211
177 void ThreadPool::SetIdle(Worker* worker) { 212 void ThreadPool::SetIdleAndReapExited(Worker* worker) {
178 MutexLocker ml(&mutex_); 213 JoinList* list = NULL;
179 if (shutting_down_) { 214 {
180 return; 215 MutexLocker ml(&mutex_);
216 if (shutting_down_) {
217 return;
218 }
219 ASSERT(worker->owned_ && !IsIdle(worker));
220 worker->idle_next_ = idle_workers_;
221 idle_workers_ = worker;
222 count_idle_++;
223 count_running_--;
224
225 // While we have the lock, opportunistically grab and clear the join_list_.
226 list = join_list_;
227 join_list_ = NULL;
181 } 228 }
182 ASSERT(worker->owned_ && !IsIdle(worker)); 229 JoinList::Join(&list);
183 worker->idle_next_ = idle_workers_;
184 idle_workers_ = worker;
185 count_idle_++;
186 count_running_--;
187 } 230 }
188 231
189 232
190 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { 233 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
191 MutexLocker ml(&mutex_); 234 MutexLocker ml(&mutex_);
192 if (shutting_down_) { 235 if (shutting_down_) {
193 return false; 236 return false;
194 } 237 }
195 // Remove from idle list. 238 // Remove from idle list.
196 if (!RemoveWorkerFromIdleList(worker)) { 239 if (!RemoveWorkerFromIdleList(worker)) {
197 return false; 240 return false;
198 } 241 }
199 // Remove from all list. 242 // Remove from all list.
200 bool found = RemoveWorkerFromAllList(worker); 243 bool found = RemoveWorkerFromAllList(worker);
201 ASSERT(found); 244 ASSERT(found);
202 245
246 // The thread for worker will exit. Add its ThreadId to the join_list_
247 // so that we can join on it at the next opportunity.
248 JoinList::AddLocked(OSThread::GetCurrentThreadJoinId(), &join_list_);
203 count_stopped_++; 249 count_stopped_++;
204 count_idle_--; 250 count_idle_--;
205 return true; 251 return true;
206 } 252 }
207 253
208 254
255 // Only call while holding the exit_monitor_
256 void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
257 worker->shutdown_next_ = shutting_down_workers_;
258 shutting_down_workers_ = worker;
259 }
260
261
262 // Only call while holding the exit_monitor_
263 bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
264 ASSERT(worker != NULL);
265 ASSERT(shutting_down_workers_ != NULL);
266
267 // Special case head of list.
268 if (shutting_down_workers_ == worker) {
269 shutting_down_workers_ = worker->shutdown_next_;
270 worker->shutdown_next_ = NULL;
271 return true;
272 }
273
274 for (Worker* current = shutting_down_workers_;
275 current->shutdown_next_ != NULL;
276 current = current->shutdown_next_) {
277 if (current->shutdown_next_ == worker) {
278 current->shutdown_next_ = worker->shutdown_next_;
279 worker->shutdown_next_ = NULL;
280 return true;
281 }
282 }
283 return false;
284 }
285
286
287 void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) {
288 *list = new JoinList(id, *list);
289 }
290
291
292 void ThreadPool::JoinList::Join(JoinList** list) {
293 while (*list != NULL) {
294 JoinList* current = *list;
295 *list = current->next();
296 OSThread::Join(current->id());
297 delete current;
298 }
299 }
300
301
209 ThreadPool::Task::Task() { 302 ThreadPool::Task::Task() {
210 } 303 }
211 304
212 305
213 ThreadPool::Task::~Task() { 306 ThreadPool::Task::~Task() {
214 } 307 }
215 308
216 309
217 ThreadPool::Worker::Worker(ThreadPool* pool) 310 ThreadPool::Worker::Worker(ThreadPool* pool)
218 : pool_(pool), 311 : pool_(pool),
219 task_(NULL), 312 task_(NULL),
313 id_(OSThread::kInvalidThreadId),
314 done_(false),
220 owned_(false), 315 owned_(false),
221 all_next_(NULL), 316 all_next_(NULL),
222 idle_next_(NULL) { 317 idle_next_(NULL),
318 shutdown_next_(NULL) {
223 } 319 }
224 320
225 321
322 ThreadId ThreadPool::Worker::id() {
323 MonitorLocker ml(&monitor_);
324 return id_;
325 }
326
327
226 void ThreadPool::Worker::StartThread() { 328 void ThreadPool::Worker::StartThread() {
227 #if defined(DEBUG) 329 #if defined(DEBUG)
228 // Must call SetTask before StartThread. 330 // Must call SetTask before StartThread.
229 { // NOLINT 331 { // NOLINT
230 MonitorLocker ml(&monitor_); 332 MonitorLocker ml(&monitor_);
231 ASSERT(task_ != NULL); 333 ASSERT(task_ != NULL);
232 } 334 }
233 #endif 335 #endif
234 int result = OSThread::Start(&Worker::Main, reinterpret_cast<uword>(this)); 336 int result = OSThread::Start(&Worker::Main, reinterpret_cast<uword>(this));
235 if (result != 0) { 337 if (result != 0) {
(...skipping 21 matching lines...) Expand all
257 // out. Give the worker one last desperate chance to live. We 359 // out. Give the worker one last desperate chance to live. We
258 // are merciful. 360 // are merciful.
259 return 1; 361 return 1;
260 } else { 362 } else {
261 return FLAG_worker_timeout_millis - waited; 363 return FLAG_worker_timeout_millis - waited;
262 } 364 }
263 } 365 }
264 } 366 }
265 367
266 368
267 void ThreadPool::Worker::Loop() { 369 bool ThreadPool::Worker::Loop() {
268 MonitorLocker ml(&monitor_); 370 MonitorLocker ml(&monitor_);
269 int64_t idle_start; 371 int64_t idle_start;
270 while (true) { 372 while (true) {
271 ASSERT(task_ != NULL); 373 ASSERT(task_ != NULL);
272 Task* task = task_; 374 Task* task = task_;
273 task_ = NULL; 375 task_ = NULL;
274 376
275 // Release monitor while handling the task. 377 // Release monitor while handling the task.
276 monitor_.Exit(); 378 monitor_.Exit();
277 task->Run(); 379 task->Run();
278 ASSERT(Isolate::Current() == NULL); 380 ASSERT(Isolate::Current() == NULL);
279 delete task; 381 delete task;
280 monitor_.Enter(); 382 monitor_.Enter();
281 383
282 ASSERT(task_ == NULL); 384 ASSERT(task_ == NULL);
283 if (IsDone()) { 385 if (IsDone()) {
284 return; 386 return false;
285 } 387 }
286 ASSERT(pool_ != NULL); 388 ASSERT(!done_);
287 pool_->SetIdle(this); 389 pool_->SetIdleAndReapExited(this);
288 idle_start = OS::GetCurrentTimeMillis(); 390 idle_start = OS::GetCurrentTimeMillis();
289 while (true) { 391 while (true) {
290 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); 392 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
291 if (task_ != NULL) { 393 if (task_ != NULL) {
292 // We've found a task. Process it, regardless of whether the 394 // We've found a task. Process it, regardless of whether the
293 // worker is done_. 395 // worker is done_.
294 break; 396 break;
295 } 397 }
296 if (IsDone()) { 398 if (IsDone()) {
297 return; 399 return false;
298 } 400 }
299 if (result == Monitor::kTimedOut && 401 if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
300 pool_->ReleaseIdleWorker(this)) { 402 return true;
301 return;
302 } 403 }
303 } 404 }
304 } 405 }
305 UNREACHABLE(); 406 UNREACHABLE();
407 return false;
306 } 408 }
307 409
308 410
309 void ThreadPool::Worker::Shutdown() { 411 void ThreadPool::Worker::Shutdown() {
310 MonitorLocker ml(&monitor_); 412 MonitorLocker ml(&monitor_);
311 pool_ = NULL; // Fail fast if someone tries to access pool_. 413 done_ = true;
312 ml.Notify(); 414 ml.Notify();
313 } 415 }
314 416
315 417
316 // static 418 // static
317 void ThreadPool::Worker::Main(uword args) { 419 void ThreadPool::Worker::Main(uword args) {
318 Thread::EnsureInit(); 420 Thread::EnsureInit();
319 Worker* worker = reinterpret_cast<Worker*>(args); 421 Worker* worker = reinterpret_cast<Worker*>(args);
320 worker->Loop(); 422 ThreadId id = OSThread::GetCurrentThreadId();
423 ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId();
424 ThreadPool* pool;
425
426 {
427 MonitorLocker ml(&worker->monitor_);
428 ASSERT(worker->task_);
429 worker->id_ = id;
430 pool = worker->pool_;
431 }
432
433 bool released = worker->Loop();
321 434
322 // It should be okay to access these unlocked here in this assert. 435 // It should be okay to access these unlocked here in this assert.
323 ASSERT(!worker->owned_ && 436 // worker->all_next_ is retained by the pool for shutdown monitoring.
324 worker->all_next_ == NULL && 437 ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
325 worker->idle_next_ == NULL);
326 438
327 // The exit monitor is only used during testing. 439 if (!released) {
328 if (ThreadPool::exit_monitor_) { 440 // This worker is exiting because the thread pool is being shut down.
329 MonitorLocker ml(ThreadPool::exit_monitor_); 441 // Inform the thread pool that we are exiting. We remove this worker from
330 (*ThreadPool::exit_count_)++; 442 // shutting_down_workers_ list because there will be no need for the
331 ml.Notify(); 443 // ThreadPool to take action for this worker.
444 {
445 MutexLocker ml(&pool->mutex_);
446 JoinList::AddLocked(join_id, &pool->join_list_);
447 }
448
449 // worker->id_ should never be read again, so set to invalid in debug mode
450 // for asserts.
451 #if defined(DEBUG)
452 {
453 MonitorLocker ml(&worker->monitor_);
454 worker->id_ = OSThread::kInvalidThreadId;
455 }
456 #endif
457
458 // Remove from the shutdown list, delete, and notify the thread pool.
459 {
460 MonitorLocker eml(&pool->exit_monitor_);
461 pool->RemoveWorkerFromShutdownList(worker);
462 delete worker;
463 eml.Notify();
464 }
465 } else {
466 // This worker is going down because it was idle for too long. This case
467 // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
468 // The worker's id is added to the thread pool's join list by
469 // ReleaseIdleWorker, so in the case that the thread pool begins shutting
470 // down immediately after returning from worker->Loop() above, we still
471 // wait for the thread to exit by joining on it in Shutdown().
472 delete worker;
332 } 473 }
333 delete worker;
334 #if defined(TARGET_OS_WINDOWS) 474 #if defined(TARGET_OS_WINDOWS)
335 Thread::CleanUp(); 475 Thread::CleanUp();
336 #endif 476 #endif
337 } 477 }
338 478
339 } // namespace dart 479 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698