Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(970)

Unified Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/vm/thread_pool.cc
===================================================================
--- runtime/vm/thread_pool.cc (revision 0)
+++ runtime/vm/thread_pool.cc (revision 0)
@@ -0,0 +1,328 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+#include "vm/thread_pool.h"
+
+namespace dart {
+
+DEFINE_FLAG(int, worker_timeout_millis, 5000,
+ "Free workers when they have been idle for this amount of time.");
+
+Monitor* ThreadPool::exit_monitor_ = NULL;
+int* ThreadPool::exit_count_ = NULL;
+
+ThreadPool::ThreadPool()
+ : shutting_down_(false),
+ all_workers_(NULL),
+ idle_workers_(NULL),
+ count_started_(0),
+ count_stopped_(0),
+ count_running_(0),
+ count_idle_(0) {
+}
+
+
+ThreadPool::~ThreadPool() {
+ Shutdown();
+}
+
+
+void ThreadPool::Run(Task* task) {
+ Worker* worker = NULL;
+ bool new_worker = false;
+ {
+ // We need ThreadPool::mutex_ to access worker lists and other
+ // ThreadPool state.
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return;
+ }
+ if (idle_workers_ == NULL) {
+ worker = new Worker(this);
+ ASSERT(worker != NULL);
+ new_worker = true;
+ count_started_++;
+
+ // Add worker to the all_workers_ list.
+ worker->all_next_ = all_workers_;
+ all_workers_ = worker;
+ worker->owned_ = true;
+ } else {
+ // Get the first worker from the idle worker list.
+ worker = idle_workers_;
+ idle_workers_ = worker->idle_next_;
+ worker->idle_next_ = NULL;
+ count_idle_--;
+ }
+ count_running_++;
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+ ASSERT(worker != NULL);
+ worker->SetTask(task);
+ if (new_worker) {
+ // Call StartThread after we've assigned the first task.
+ worker->StartThread();
+ }
+}
+
+
+void ThreadPool::Shutdown() {
+ Worker* saved = NULL;
+ {
+ MutexLocker ml(&mutex_);
+ shutting_down_ = true;
+ saved = all_workers_;
+ all_workers_ = NULL;
+ idle_workers_ = NULL;
+
+ Worker* current = saved;
+ while (current != NULL) {
+ Worker* next = current->all_next_;
+ current->idle_next_ = NULL;
+ current->owned_ = false;
+ current = next;
+ count_stopped_++;
+ }
+
+ count_idle_ = 0;
+ count_running_ = 0;
+ ASSERT(count_started_ == count_stopped_);
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+
+ Worker* current = saved;
+ while (current != NULL) {
+ // We may access all_next_ without holding ThreadPool::mutex_ here
+ // because the worker is no longer owned by the ThreadPool.
+ Worker* next = current->all_next_;
+ current->all_next_ = NULL;
+ current->Shutdown();
+ current = next;
+ }
+}
+
+
+bool ThreadPool::IsIdle(Worker* worker) {
+ ASSERT(worker != NULL && worker->owned_);
+ for (Worker* current = idle_workers_;
+ current != NULL;
+ current = current->idle_next_) {
+ if (current == worker) {
+ return true;
+ }
+ }
+ return false;
+}
+
+
+bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
+ ASSERT(worker != NULL && worker->owned_);
+ if (idle_workers_ == NULL) {
+ return false;
+ }
+
+ // Special case head of list.
+ if (idle_workers_ == worker) {
+ idle_workers_ = worker->idle_next_;
+ worker->idle_next_ = NULL;
+ return true;
+ }
+
+ for (Worker* current = idle_workers_;
+ current->idle_next_ != NULL;
+ current = current->idle_next_) {
+ if (current->idle_next_ == worker) {
+ current->idle_next_ = worker->idle_next_;
+ worker->idle_next_ = NULL;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
+ ASSERT(worker != NULL && worker->owned_);
+ if (all_workers_ == NULL) {
+ return false;
+ }
+
+ // Special case head of list.
+ if (all_workers_ == worker) {
+ all_workers_ = worker->all_next_;
+ worker->all_next_ = NULL;
+ worker->owned_ = false;
+ worker->pool_ = NULL;
+ return true;
+ }
+
+ for (Worker* current = all_workers_;
+ current->all_next_ != NULL;
+ current = current->all_next_) {
+ if (current->all_next_ == worker) {
+ current->all_next_ = worker->all_next_;
+ worker->all_next_ = NULL;
+ worker->owned_ = false;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+void ThreadPool::SetIdle(Worker* worker) {
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return;
+ }
+ ASSERT(worker->owned_ && !IsIdle(worker));
+ worker->idle_next_ = idle_workers_;
+ idle_workers_ = worker;
+ count_idle_++;
+ count_running_--;
+}
+
+
+bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return false;
+ }
+ // Remove from idle list.
+ if (!RemoveWorkerFromIdleList(worker)) {
+ return false;
+ }
+ // Remove from all list.
+ bool found = RemoveWorkerFromAllList(worker);
+ ASSERT(found);
+
+ count_stopped_++;
+ count_idle_--;
+ return true;
+}
+
+
+ThreadPool::Task::Task() {
+}
+
+
+ThreadPool::Task::~Task() {
+}
+
+
+ThreadPool::Worker::Worker(ThreadPool* pool)
+ : pool_(pool),
+ task_(NULL),
+ owned_(false),
+ all_next_(NULL),
+ idle_next_(NULL) {
+}
+
+
+void ThreadPool::Worker::StartThread() {
+#if defined(DEBUG)
+ // Must call SetTask before StartThread.
+ { // NOLINT
+ MonitorLocker ml(&monitor_);
+ ASSERT(task_ != NULL);
+ }
+#endif
+ Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
+}
+
+
+void ThreadPool::Worker::SetTask(Task* task) {
+ MonitorLocker ml(&monitor_);
+ ASSERT(task_ == NULL);
+ task_ = task;
+ ml.Notify();
+}
+
+
+static int64_t ComputeTimeout(int64_t idle_start) {
+ if (FLAG_worker_timeout_millis <= 0) {
+ // No timeout.
+ return 0;
+ } else {
+ int64_t waited = OS::GetCurrentTimeMillis() - idle_start;
+ if (waited >= FLAG_worker_timeout_millis) {
+ // We must have gotten a spurious wakeup just before we timed
+ // out. Give the worker one last desperate chance to live. We
+ // are merciful.
+ return 1;
+ } else {
+ return FLAG_worker_timeout_millis - waited;
+ }
+ }
+}
+
+
+void ThreadPool::Worker::Loop() {
+ MonitorLocker ml(&monitor_);
+ int64_t idle_start;
+ while (true) {
+ ASSERT(task_ != NULL);
+ Task* task = task_;
+ task_ = NULL;
+
+ // Release monitor while handling the task.
+ monitor_.Exit();
+ task->Run();
+ delete task;
+ monitor_.Enter();
+
+ ASSERT(task_ == NULL);
+ if (IsDone()) {
+ return;
+ }
+ ASSERT(pool_ != NULL);
+ pool_->SetIdle(this);
+ idle_start = OS::GetCurrentTimeMillis();
+ while (true) {
+ Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
+ if (task_ != NULL) {
+ // We've found a task. Process it, regardless of whether the
+ // worker is done_.
+ break;
+ }
+ if (IsDone()) {
+ return;
+ }
+ if (result == Monitor::kTimedOut &&
+ pool_->ReleaseIdleWorker(this)) {
+ return;
+ }
+ }
+ }
+ UNREACHABLE();
+}
+
+
+void ThreadPool::Worker::Shutdown() {
+ MonitorLocker ml(&monitor_);
+ pool_ = NULL; // Fail fast if someone tries to access pool_.
+ ml.Notify();
+}
+
+
+// static
+void ThreadPool::Worker::Main(uword args) {
+ Worker* worker = reinterpret_cast<Worker*>(args);
+ worker->Loop();
+
+ // It should be okay to access these unlocked here in this assert.
+ ASSERT(!worker->owned_ &&
+ worker->all_next_ == NULL &&
+ worker->idle_next_ == NULL);
+
+ // The exit monitor is only used during testing.
+ if (ThreadPool::exit_monitor_) {
+ MonitorLocker ml(ThreadPool::exit_monitor_);
+ (*ThreadPool::exit_count_)++;
+ ml.Notify();
+ }
+ delete worker;
+}
+
+} // namespace dart

Powered by Google App Engine
This is Rietveld 408576698