OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 #include "vm/thread_pool.h" |
| 6 |
| 7 namespace dart { |
| 8 |
| 9 DEFINE_FLAG(int, worker_timeout_millis, 5000, |
| 10 "Free workers when they have been idle for this amount of time."); |
| 11 |
| 12 Monitor* ThreadPool::exit_monitor_ = NULL; |
| 13 int* ThreadPool::exit_count_ = NULL; |
| 14 |
| 15 ThreadPool::ThreadPool() |
| 16 : shutting_down_(false), |
| 17 all_workers_(NULL), |
| 18 idle_workers_(NULL), |
| 19 count_started_(0), |
| 20 count_stopped_(0), |
| 21 count_running_(0), |
| 22 count_idle_(0) { |
| 23 } |
| 24 |
| 25 |
| 26 ThreadPool::~ThreadPool() { |
| 27 Shutdown(); |
| 28 } |
| 29 |
| 30 |
| 31 void ThreadPool::Run(Task* task) { |
| 32 Worker* worker = NULL; |
| 33 bool new_worker = false; |
| 34 { |
| 35 // We need ThreadPool::mutex_ to access worker lists and other |
| 36 // ThreadPool state. |
| 37 MutexLocker ml(&mutex_); |
| 38 if (shutting_down_) { |
| 39 return; |
| 40 } |
| 41 if (idle_workers_ == NULL) { |
| 42 worker = new Worker(this); |
| 43 ASSERT(worker != NULL); |
| 44 new_worker = true; |
| 45 count_started_++; |
| 46 |
| 47 // Add worker to the all_workers_ list. |
| 48 worker->all_next_ = all_workers_; |
| 49 all_workers_ = worker; |
| 50 worker->owned_ = true; |
| 51 } else { |
| 52 // Get the first worker from the idle worker list. |
| 53 worker = idle_workers_; |
| 54 idle_workers_ = worker->idle_next_; |
| 55 worker->idle_next_ = NULL; |
| 56 count_idle_--; |
| 57 } |
| 58 count_running_++; |
| 59 } |
| 60 // Release ThreadPool::mutex_ before calling Worker functions. |
| 61 ASSERT(worker != NULL); |
| 62 worker->SetTask(task); |
| 63 if (new_worker) { |
| 64 // Call StartThread after we've assigned the first task. |
| 65 worker->StartThread(); |
| 66 } |
| 67 } |
| 68 |
| 69 |
| 70 void ThreadPool::Shutdown() { |
| 71 Worker* saved = NULL; |
| 72 { |
| 73 MutexLocker ml(&mutex_); |
| 74 shutting_down_ = true; |
| 75 saved = all_workers_; |
| 76 all_workers_ = NULL; |
| 77 idle_workers_ = NULL; |
| 78 |
| 79 Worker* current = saved; |
| 80 while (current != NULL) { |
| 81 Worker* next = current->all_next_; |
| 82 current->idle_next_ = NULL; |
| 83 current->owned_ = false; |
| 84 current = next; |
| 85 count_stopped_++; |
| 86 } |
| 87 |
| 88 count_idle_ = 0; |
| 89 count_running_ = 0; |
| 90 ASSERT(count_started_ == count_stopped_); |
| 91 } |
| 92 // Release ThreadPool::mutex_ before calling Worker functions. |
| 93 |
| 94 Worker* current = saved; |
| 95 while (current != NULL) { |
| 96 // We may access all_next_ without holding ThreadPool::mutex_ here |
| 97 // because the worker is no longer owned by the ThreadPool. |
| 98 Worker* next = current->all_next_; |
| 99 current->all_next_ = NULL; |
| 100 current->Shutdown(); |
| 101 current = next; |
| 102 } |
| 103 } |
| 104 |
| 105 |
| 106 bool ThreadPool::IsIdle(Worker* worker) { |
| 107 ASSERT(worker != NULL && worker->owned_); |
| 108 for (Worker* current = idle_workers_; |
| 109 current != NULL; |
| 110 current = current->idle_next_) { |
| 111 if (current == worker) { |
| 112 return true; |
| 113 } |
| 114 } |
| 115 return false; |
| 116 } |
| 117 |
| 118 |
| 119 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { |
| 120 ASSERT(worker != NULL && worker->owned_); |
| 121 if (idle_workers_ == NULL) { |
| 122 return false; |
| 123 } |
| 124 |
| 125 // Special case head of list. |
| 126 if (idle_workers_ == worker) { |
| 127 idle_workers_ = worker->idle_next_; |
| 128 worker->idle_next_ = NULL; |
| 129 return true; |
| 130 } |
| 131 |
| 132 for (Worker* current = idle_workers_; |
| 133 current->idle_next_ != NULL; |
| 134 current = current->idle_next_) { |
| 135 if (current->idle_next_ == worker) { |
| 136 current->idle_next_ = worker->idle_next_; |
| 137 worker->idle_next_ = NULL; |
| 138 return true; |
| 139 } |
| 140 } |
| 141 return false; |
| 142 } |
| 143 |
| 144 |
| 145 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
| 146 ASSERT(worker != NULL && worker->owned_); |
| 147 if (all_workers_ == NULL) { |
| 148 return false; |
| 149 } |
| 150 |
| 151 // Special case head of list. |
| 152 if (all_workers_ == worker) { |
| 153 all_workers_ = worker->all_next_; |
| 154 worker->all_next_ = NULL; |
| 155 worker->owned_ = false; |
| 156 worker->pool_ = NULL; |
| 157 return true; |
| 158 } |
| 159 |
| 160 for (Worker* current = all_workers_; |
| 161 current->all_next_ != NULL; |
| 162 current = current->all_next_) { |
| 163 if (current->all_next_ == worker) { |
| 164 current->all_next_ = worker->all_next_; |
| 165 worker->all_next_ = NULL; |
| 166 worker->owned_ = false; |
| 167 return true; |
| 168 } |
| 169 } |
| 170 return false; |
| 171 } |
| 172 |
| 173 |
| 174 void ThreadPool::SetIdle(Worker* worker) { |
| 175 MutexLocker ml(&mutex_); |
| 176 if (shutting_down_) { |
| 177 return; |
| 178 } |
| 179 ASSERT(worker->owned_ && !IsIdle(worker)); |
| 180 worker->idle_next_ = idle_workers_; |
| 181 idle_workers_ = worker; |
| 182 count_idle_++; |
| 183 count_running_--; |
| 184 } |
| 185 |
| 186 |
| 187 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
| 188 MutexLocker ml(&mutex_); |
| 189 if (shutting_down_) { |
| 190 return false; |
| 191 } |
| 192 // Remove from idle list. |
| 193 if (!RemoveWorkerFromIdleList(worker)) { |
| 194 return false; |
| 195 } |
| 196 // Remove from all list. |
| 197 bool found = RemoveWorkerFromAllList(worker); |
| 198 ASSERT(found); |
| 199 |
| 200 count_stopped_++; |
| 201 count_idle_--; |
| 202 return true; |
| 203 } |
| 204 |
| 205 |
| 206 ThreadPool::Task::Task() { |
| 207 } |
| 208 |
| 209 |
| 210 ThreadPool::Task::~Task() { |
| 211 } |
| 212 |
| 213 |
| 214 ThreadPool::Worker::Worker(ThreadPool* pool) |
| 215 : pool_(pool), |
| 216 task_(NULL), |
| 217 owned_(false), |
| 218 all_next_(NULL), |
| 219 idle_next_(NULL) { |
| 220 } |
| 221 |
| 222 |
| 223 void ThreadPool::Worker::StartThread() { |
| 224 #if defined(DEBUG) |
| 225 // Must call SetTask before StartThread. |
| 226 { // NOLINT |
| 227 MonitorLocker ml(&monitor_); |
| 228 ASSERT(task_ != NULL); |
| 229 } |
| 230 #endif |
| 231 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this)); |
| 232 } |
| 233 |
| 234 |
| 235 void ThreadPool::Worker::SetTask(Task* task) { |
| 236 MonitorLocker ml(&monitor_); |
| 237 ASSERT(task_ == NULL); |
| 238 task_ = task; |
| 239 ml.Notify(); |
| 240 } |
| 241 |
| 242 |
| 243 static int64_t ComputeTimeout(int64_t idle_start) { |
| 244 if (FLAG_worker_timeout_millis <= 0) { |
| 245 // No timeout. |
| 246 return 0; |
| 247 } else { |
| 248 int64_t waited = OS::GetCurrentTimeMillis() - idle_start; |
| 249 if (waited >= FLAG_worker_timeout_millis) { |
| 250 // We must have gotten a spurious wakeup just before we timed |
| 251 // out. Give the worker one last desperate chance to live. We |
| 252 // are merciful. |
| 253 return 1; |
| 254 } else { |
| 255 return FLAG_worker_timeout_millis - waited; |
| 256 } |
| 257 } |
| 258 } |
| 259 |
| 260 |
| 261 void ThreadPool::Worker::Loop() { |
| 262 MonitorLocker ml(&monitor_); |
| 263 int64_t idle_start; |
| 264 while (true) { |
| 265 ASSERT(task_ != NULL); |
| 266 Task* task = task_; |
| 267 task_ = NULL; |
| 268 |
| 269 // Release monitor while handling the task. |
| 270 monitor_.Exit(); |
| 271 task->Run(); |
| 272 delete task; |
| 273 monitor_.Enter(); |
| 274 |
| 275 ASSERT(task_ == NULL); |
| 276 if (IsDone()) { |
| 277 return; |
| 278 } |
| 279 ASSERT(pool_ != NULL); |
| 280 pool_->SetIdle(this); |
| 281 idle_start = OS::GetCurrentTimeMillis(); |
| 282 while (true) { |
| 283 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
| 284 if (task_ != NULL) { |
| 285 // We've found a task. Process it, regardless of whether the |
| 286 // worker is done_. |
| 287 break; |
| 288 } |
| 289 if (IsDone()) { |
| 290 return; |
| 291 } |
| 292 if (result == Monitor::kTimedOut && |
| 293 pool_->ReleaseIdleWorker(this)) { |
| 294 return; |
| 295 } |
| 296 } |
| 297 } |
| 298 UNREACHABLE(); |
| 299 } |
| 300 |
| 301 |
| 302 void ThreadPool::Worker::Shutdown() { |
| 303 MonitorLocker ml(&monitor_); |
| 304 pool_ = NULL; // Fail fast if someone tries to access pool_. |
| 305 ml.Notify(); |
| 306 } |
| 307 |
| 308 |
| 309 // static |
| 310 void ThreadPool::Worker::Main(uword args) { |
| 311 Worker* worker = reinterpret_cast<Worker*>(args); |
| 312 worker->Loop(); |
| 313 |
| 314 // It should be okay to access these unlocked here in this assert. |
| 315 ASSERT(!worker->owned_ && |
| 316 worker->all_next_ == NULL && |
| 317 worker->idle_next_ == NULL); |
| 318 |
| 319 // The exit monitor is only used during testing. |
| 320 if (ThreadPool::exit_monitor_) { |
| 321 MonitorLocker ml(ThreadPool::exit_monitor_); |
| 322 (*ThreadPool::exit_count_)++; |
| 323 ml.Notify(); |
| 324 } |
| 325 delete worker; |
| 326 } |
| 327 |
| 328 } // namespace dart |
OLD | NEW |