Index: runtime/vm/thread_pool.cc |
=================================================================== |
--- runtime/vm/thread_pool.cc (revision 0) |
+++ runtime/vm/thread_pool.cc (revision 0) |
@@ -0,0 +1,328 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+#include "vm/thread_pool.h" |
+ |
+namespace dart { |
+ |
+DEFINE_FLAG(int, worker_timeout_millis, 5000, |
+ "Free workers when they have been idle for this amount of time."); |
+ |
+Monitor* ThreadPool::exit_monitor_ = NULL; |
+int* ThreadPool::exit_count_ = NULL; |
+ |
+ThreadPool::ThreadPool() |
+ : shutting_down_(false), |
+ all_workers_(NULL), |
+ idle_workers_(NULL), |
+ count_started_(0), |
+ count_stopped_(0), |
+ count_running_(0), |
+ count_idle_(0) { |
+} |
+ |
+ |
+ThreadPool::~ThreadPool() { |
+ Shutdown(); |
+} |
+ |
+ |
+void ThreadPool::Run(Task* task) { |
+ Worker* worker = NULL; |
+ bool new_worker = false; |
+ { |
+ // We need ThreadPool::mutex_ to access worker lists and other |
+ // ThreadPool state. |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return; |
+ } |
+ if (idle_workers_ == NULL) { |
+ worker = new Worker(this); |
+ ASSERT(worker != NULL); |
+ new_worker = true; |
+ count_started_++; |
+ |
+ // Add worker to the all_workers_ list. |
+ worker->all_next_ = all_workers_; |
+ all_workers_ = worker; |
+ worker->owned_ = true; |
+ } else { |
+ // Get the first worker from the idle worker list. |
+ worker = idle_workers_; |
+ idle_workers_ = worker->idle_next_; |
+ worker->idle_next_ = NULL; |
+ count_idle_--; |
+ } |
+ count_running_++; |
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
+ ASSERT(worker != NULL); |
+ worker->SetTask(task); |
+ if (new_worker) { |
+ // Call StartThread after we've assigned the first task. |
+ worker->StartThread(); |
+ } |
+} |
+ |
+ |
+void ThreadPool::Shutdown() { |
+ Worker* saved = NULL; |
+ { |
+ MutexLocker ml(&mutex_); |
+ shutting_down_ = true; |
+ saved = all_workers_; |
+ all_workers_ = NULL; |
+ idle_workers_ = NULL; |
+ |
+ Worker* current = saved; |
+ while (current != NULL) { |
+ Worker* next = current->all_next_; |
+ current->idle_next_ = NULL; |
+ current->owned_ = false; |
+ current = next; |
+ count_stopped_++; |
+ } |
+ |
+ count_idle_ = 0; |
+ count_running_ = 0; |
+ ASSERT(count_started_ == count_stopped_); |
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
+ |
+ Worker* current = saved; |
+ while (current != NULL) { |
+ // We may access all_next_ without holding ThreadPool::mutex_ here |
+ // because the worker is no longer owned by the ThreadPool. |
+ Worker* next = current->all_next_; |
+ current->all_next_ = NULL; |
+ current->Shutdown(); |
+ current = next; |
+ } |
+} |
+ |
+ |
+bool ThreadPool::IsIdle(Worker* worker) { |
+ ASSERT(worker != NULL && worker->owned_); |
+ for (Worker* current = idle_workers_; |
+ current != NULL; |
+ current = current->idle_next_) { |
+ if (current == worker) { |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
+bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { |
+ ASSERT(worker != NULL && worker->owned_); |
+ if (idle_workers_ == NULL) { |
+ return false; |
+ } |
+ |
+ // Special case head of list. |
+ if (idle_workers_ == worker) { |
+ idle_workers_ = worker->idle_next_; |
+ worker->idle_next_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = idle_workers_; |
+ current->idle_next_ != NULL; |
+ current = current->idle_next_) { |
+ if (current->idle_next_ == worker) { |
+ current->idle_next_ = worker->idle_next_; |
+ worker->idle_next_ = NULL; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
+bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
+ ASSERT(worker != NULL && worker->owned_); |
+ if (all_workers_ == NULL) { |
+ return false; |
+ } |
+ |
+ // Special case head of list. |
+ if (all_workers_ == worker) { |
+ all_workers_ = worker->all_next_; |
+ worker->all_next_ = NULL; |
+ worker->owned_ = false; |
+ worker->pool_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = all_workers_; |
+ current->all_next_ != NULL; |
+ current = current->all_next_) { |
+ if (current->all_next_ == worker) { |
+ current->all_next_ = worker->all_next_; |
+ worker->all_next_ = NULL; |
+ worker->owned_ = false; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
+void ThreadPool::SetIdle(Worker* worker) { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return; |
+ } |
+ ASSERT(worker->owned_ && !IsIdle(worker)); |
+ worker->idle_next_ = idle_workers_; |
+ idle_workers_ = worker; |
+ count_idle_++; |
+ count_running_--; |
+} |
+ |
+ |
+bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return false; |
+ } |
+ // Remove from idle list. |
+ if (!RemoveWorkerFromIdleList(worker)) { |
+ return false; |
+ } |
+ // Remove from all list. |
+ bool found = RemoveWorkerFromAllList(worker); |
+ ASSERT(found); |
+ |
+ count_stopped_++; |
+ count_idle_--; |
+ return true; |
+} |
+ |
+ |
+ThreadPool::Task::Task() { |
+} |
+ |
+ |
+ThreadPool::Task::~Task() { |
+} |
+ |
+ |
+ThreadPool::Worker::Worker(ThreadPool* pool) |
+ : pool_(pool), |
+ task_(NULL), |
+ owned_(false), |
+ all_next_(NULL), |
+ idle_next_(NULL) { |
+} |
+ |
+ |
+void ThreadPool::Worker::StartThread() { |
+#if defined(DEBUG) |
+ // Must call SetTask before StartThread. |
+ { // NOLINT |
+ MonitorLocker ml(&monitor_); |
+ ASSERT(task_ != NULL); |
+ } |
+#endif |
+ Thread::Start(&Worker::Main, reinterpret_cast<uword>(this)); |
+} |
+ |
+ |
+void ThreadPool::Worker::SetTask(Task* task) { |
+ MonitorLocker ml(&monitor_); |
+ ASSERT(task_ == NULL); |
+ task_ = task; |
+ ml.Notify(); |
+} |
+ |
+ |
+static int64_t ComputeTimeout(int64_t idle_start) { |
+ if (FLAG_worker_timeout_millis <= 0) { |
+ // No timeout. |
+ return 0; |
+ } else { |
+ int64_t waited = OS::GetCurrentTimeMillis() - idle_start; |
+ if (waited >= FLAG_worker_timeout_millis) { |
+ // We must have gotten a spurious wakeup just before we timed |
+ // out. Give the worker one last desperate chance to live. We |
+ // are merciful. |
+ return 1; |
+ } else { |
+ return FLAG_worker_timeout_millis - waited; |
+ } |
+ } |
+} |
+ |
+ |
+void ThreadPool::Worker::Loop() { |
+ MonitorLocker ml(&monitor_); |
+ int64_t idle_start; |
+ while (true) { |
+ ASSERT(task_ != NULL); |
+ Task* task = task_; |
+ task_ = NULL; |
+ |
+ // Release monitor while handling the task. |
+ monitor_.Exit(); |
+ task->Run(); |
+ delete task; |
+ monitor_.Enter(); |
+ |
+ ASSERT(task_ == NULL); |
+ if (IsDone()) { |
+ return; |
+ } |
+ ASSERT(pool_ != NULL); |
+ pool_->SetIdle(this); |
+ idle_start = OS::GetCurrentTimeMillis(); |
+ while (true) { |
+ Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
+ if (task_ != NULL) { |
+ // We've found a task. Process it, regardless of whether the |
+ // worker is done_. |
+ break; |
+ } |
+ if (IsDone()) { |
+ return; |
+ } |
+ if (result == Monitor::kTimedOut && |
+ pool_->ReleaseIdleWorker(this)) { |
+ return; |
+ } |
+ } |
+ } |
+ UNREACHABLE(); |
+} |
+ |
+ |
+void ThreadPool::Worker::Shutdown() { |
+ MonitorLocker ml(&monitor_); |
+ pool_ = NULL; // Fail fast if someone tries to access pool_. |
+ ml.Notify(); |
+} |
+ |
+ |
+// static |
+void ThreadPool::Worker::Main(uword args) { |
+ Worker* worker = reinterpret_cast<Worker*>(args); |
+ worker->Loop(); |
+ |
+ // It should be okay to access these unlocked here in this assert. |
+ ASSERT(!worker->owned_ && |
+ worker->all_next_ == NULL && |
+ worker->idle_next_ == NULL); |
+ |
+ // The exit monitor is only used during testing. |
+ if (ThreadPool::exit_monitor_) { |
+ MonitorLocker ml(ThreadPool::exit_monitor_); |
+ (*ThreadPool::exit_count_)++; |
+ ml.Notify(); |
+ } |
+ delete worker; |
+} |
+ |
+} // namespace dart |