| Index: runtime/vm/thread_pool.cc
|
| ===================================================================
|
| --- runtime/vm/thread_pool.cc (revision 0)
|
| +++ runtime/vm/thread_pool.cc (revision 0)
|
| @@ -0,0 +1,328 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +#include "vm/thread_pool.h"
|
| +
|
| +namespace dart {
|
| +
|
| +DEFINE_FLAG(int, worker_timeout_millis, 5000,
|
| + "Free workers when they have been idle for this amount of time.");
|
| +
|
| +Monitor* ThreadPool::exit_monitor_ = NULL;
|
| +int* ThreadPool::exit_count_ = NULL;
|
| +
|
| +ThreadPool::ThreadPool()
|
| + : shutting_down_(false),
|
| + all_workers_(NULL),
|
| + idle_workers_(NULL),
|
| + count_started_(0),
|
| + count_stopped_(0),
|
| + count_running_(0),
|
| + count_idle_(0) {
|
| +}
|
| +
|
| +
|
| +ThreadPool::~ThreadPool() {
|
| + Shutdown();
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Run(Task* task) {
|
| + Worker* worker = NULL;
|
| + bool new_worker = false;
|
| + {
|
| + // We need ThreadPool::mutex_ to access worker lists and other
|
| + // ThreadPool state.
|
| + MutexLocker ml(&mutex_);
|
| + if (shutting_down_) {
|
| + return;
|
| + }
|
| + if (idle_workers_ == NULL) {
|
| + worker = new Worker(this);
|
| + ASSERT(worker != NULL);
|
| + new_worker = true;
|
| + count_started_++;
|
| +
|
| + // Add worker to the all_workers_ list.
|
| + worker->all_next_ = all_workers_;
|
| + all_workers_ = worker;
|
| + worker->owned_ = true;
|
| + } else {
|
| + // Get the first worker from the idle worker list.
|
| + worker = idle_workers_;
|
| + idle_workers_ = worker->idle_next_;
|
| + worker->idle_next_ = NULL;
|
| + count_idle_--;
|
| + }
|
| + count_running_++;
|
| + }
|
| + // Release ThreadPool::mutex_ before calling Worker functions.
|
| + ASSERT(worker != NULL);
|
| + worker->SetTask(task);
|
| + if (new_worker) {
|
| + // Call StartThread after we've assigned the first task.
|
| + worker->StartThread();
|
| + }
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Shutdown() {
|
| + Worker* saved = NULL;
|
| + {
|
| + MutexLocker ml(&mutex_);
|
| + shutting_down_ = true;
|
| + saved = all_workers_;
|
| + all_workers_ = NULL;
|
| + idle_workers_ = NULL;
|
| +
|
| + Worker* current = saved;
|
| + while (current != NULL) {
|
| + Worker* next = current->all_next_;
|
| + current->idle_next_ = NULL;
|
| + current->owned_ = false;
|
| + current = next;
|
| + count_stopped_++;
|
| + }
|
| +
|
| + count_idle_ = 0;
|
| + count_running_ = 0;
|
| + ASSERT(count_started_ == count_stopped_);
|
| + }
|
| + // Release ThreadPool::mutex_ before calling Worker functions.
|
| +
|
| + Worker* current = saved;
|
| + while (current != NULL) {
|
| + // We may access all_next_ without holding ThreadPool::mutex_ here
|
| + // because the worker is no longer owned by the ThreadPool.
|
| + Worker* next = current->all_next_;
|
| + current->all_next_ = NULL;
|
| + current->Shutdown();
|
| + current = next;
|
| + }
|
| +}
|
| +
|
| +
|
| +bool ThreadPool::IsIdle(Worker* worker) {
|
| + ASSERT(worker != NULL && worker->owned_);
|
| + for (Worker* current = idle_workers_;
|
| + current != NULL;
|
| + current = current->idle_next_) {
|
| + if (current == worker) {
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| +bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
|
| + ASSERT(worker != NULL && worker->owned_);
|
| + if (idle_workers_ == NULL) {
|
| + return false;
|
| + }
|
| +
|
| + // Special case head of list.
|
| + if (idle_workers_ == worker) {
|
| + idle_workers_ = worker->idle_next_;
|
| + worker->idle_next_ = NULL;
|
| + return true;
|
| + }
|
| +
|
| + for (Worker* current = idle_workers_;
|
| + current->idle_next_ != NULL;
|
| + current = current->idle_next_) {
|
| + if (current->idle_next_ == worker) {
|
| + current->idle_next_ = worker->idle_next_;
|
| + worker->idle_next_ = NULL;
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| +bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
|
| + ASSERT(worker != NULL && worker->owned_);
|
| + if (all_workers_ == NULL) {
|
| + return false;
|
| + }
|
| +
|
| + // Special case head of list.
|
| + if (all_workers_ == worker) {
|
| + all_workers_ = worker->all_next_;
|
| + worker->all_next_ = NULL;
|
| + worker->owned_ = false;
|
| + worker->pool_ = NULL;
|
| + return true;
|
| + }
|
| +
|
| + for (Worker* current = all_workers_;
|
| + current->all_next_ != NULL;
|
| + current = current->all_next_) {
|
| + if (current->all_next_ == worker) {
|
| + current->all_next_ = worker->all_next_;
|
| + worker->all_next_ = NULL;
|
| + worker->owned_ = false;
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| +void ThreadPool::SetIdle(Worker* worker) {
|
| + MutexLocker ml(&mutex_);
|
| + if (shutting_down_) {
|
| + return;
|
| + }
|
| + ASSERT(worker->owned_ && !IsIdle(worker));
|
| + worker->idle_next_ = idle_workers_;
|
| + idle_workers_ = worker;
|
| + count_idle_++;
|
| + count_running_--;
|
| +}
|
| +
|
| +
|
| +bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
|
| + MutexLocker ml(&mutex_);
|
| + if (shutting_down_) {
|
| + return false;
|
| + }
|
| + // Remove from idle list.
|
| + if (!RemoveWorkerFromIdleList(worker)) {
|
| + return false;
|
| + }
|
| + // Remove from all list.
|
| + bool found = RemoveWorkerFromAllList(worker);
|
| + ASSERT(found);
|
| +
|
| + count_stopped_++;
|
| + count_idle_--;
|
| + return true;
|
| +}
|
| +
|
| +
|
| +ThreadPool::Task::Task() {
|
| +}
|
| +
|
| +
|
| +ThreadPool::Task::~Task() {
|
| +}
|
| +
|
| +
|
| +ThreadPool::Worker::Worker(ThreadPool* pool)
|
| + : pool_(pool),
|
| + task_(NULL),
|
| + owned_(false),
|
| + all_next_(NULL),
|
| + idle_next_(NULL) {
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Worker::StartThread() {
|
| +#if defined(DEBUG)
|
| + // Must call SetTask before StartThread.
|
| + { // NOLINT
|
| + MonitorLocker ml(&monitor_);
|
| + ASSERT(task_ != NULL);
|
| + }
|
| +#endif
|
| + Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Worker::SetTask(Task* task) {
|
| + MonitorLocker ml(&monitor_);
|
| + ASSERT(task_ == NULL);
|
| + task_ = task;
|
| + ml.Notify();
|
| +}
|
| +
|
| +
|
| +static int64_t ComputeTimeout(int64_t idle_start) {
|
| + if (FLAG_worker_timeout_millis <= 0) {
|
| + // No timeout.
|
| + return 0;
|
| + } else {
|
| + int64_t waited = OS::GetCurrentTimeMillis() - idle_start;
|
| + if (waited >= FLAG_worker_timeout_millis) {
|
| + // We must have gotten a spurious wakeup just before we timed
|
| + // out. Give the worker one last desperate chance to live. We
|
| + // are merciful.
|
| + return 1;
|
| + } else {
|
| + return FLAG_worker_timeout_millis - waited;
|
| + }
|
| + }
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Worker::Loop() {
|
| + MonitorLocker ml(&monitor_);
|
| + int64_t idle_start;
|
| + while (true) {
|
| + ASSERT(task_ != NULL);
|
| + Task* task = task_;
|
| + task_ = NULL;
|
| +
|
| + // Release monitor while handling the task.
|
| + monitor_.Exit();
|
| + task->Run();
|
| + delete task;
|
| + monitor_.Enter();
|
| +
|
| + ASSERT(task_ == NULL);
|
| + if (IsDone()) {
|
| + return;
|
| + }
|
| + ASSERT(pool_ != NULL);
|
| + pool_->SetIdle(this);
|
| + idle_start = OS::GetCurrentTimeMillis();
|
| + while (true) {
|
| + Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
|
| + if (task_ != NULL) {
|
| + // We've found a task. Process it, regardless of whether the
|
| + // worker is done_.
|
| + break;
|
| + }
|
| + if (IsDone()) {
|
| + return;
|
| + }
|
| + if (result == Monitor::kTimedOut &&
|
| + pool_->ReleaseIdleWorker(this)) {
|
| + return;
|
| + }
|
| + }
|
| + }
|
| + UNREACHABLE();
|
| +}
|
| +
|
| +
|
| +void ThreadPool::Worker::Shutdown() {
|
| + MonitorLocker ml(&monitor_);
|
| + pool_ = NULL; // Fail fast if someone tries to access pool_.
|
| + ml.Notify();
|
| +}
|
| +
|
| +
|
| +// static
|
| +void ThreadPool::Worker::Main(uword args) {
|
| + Worker* worker = reinterpret_cast<Worker*>(args);
|
| + worker->Loop();
|
| +
|
| + // It should be okay to access these unlocked here in this assert.
|
| + ASSERT(!worker->owned_ &&
|
| + worker->all_next_ == NULL &&
|
| + worker->idle_next_ == NULL);
|
| +
|
| + // The exit monitor is only used during testing.
|
| + if (ThreadPool::exit_monitor_) {
|
| + MonitorLocker ml(ThreadPool::exit_monitor_);
|
| + (*ThreadPool::exit_count_)++;
|
| + ml.Notify();
|
| + }
|
| + delete worker;
|
| +}
|
| +
|
| +} // namespace dart
|
|
|