Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(511)

Unified Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/vm/thread_pool.cc
===================================================================
--- runtime/vm/thread_pool.cc (revision 0)
+++ runtime/vm/thread_pool.cc (revision 0)
@@ -0,0 +1,281 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+#include "vm/thread_pool.h"
+
+namespace dart {
+
+DEFINE_FLAG(int, worker_timeout_millis, 5000,
+ "Free workers when they have been idle for this amount of time.");
+
+ThreadPool::ThreadPool()
+ : shutting_down_(false),
+ all_workers_(NULL),
+ idle_workers_(NULL),
+ count_started_(0),
+ count_stopped_(0),
+ count_running_(0),
+ count_idle_(0) {
+}
+
+
+ThreadPool::~ThreadPool() {
+ Shutdown();
+}
+
+
+void ThreadPool::Run(Task* task) {
+ Worker* worker = NULL;
+ {
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return;
+ }
+ worker = GetIdleWorker();
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+ worker->Run(task);
+}
+
+
+ThreadPool::Worker* ThreadPool::GetIdleWorker() {
+ // Lock is claimed in ThreadPool::Run.
+ ASSERT(!shutting_down_);
+ Worker* worker = NULL;
+ if (idle_workers_ == NULL) {
+ worker = new Worker(this);
+ ASSERT(worker != NULL);
+ count_started_++;
+
+ // Add worker to the all_workers_ list.
+ worker->all_next_ = all_workers_;
+ all_workers_ = worker;
+
+ } else {
+ // Get the first worker from the idle worker list.
+ worker = idle_workers_;
+ idle_workers_ = worker->idle_next_;
+ worker->idle_next_ = NULL;
+ count_idle_--;
+ }
+ worker->state_ = kRunning;
+ count_running_++;
+ return worker;
+}
+
+
+void ThreadPool::Shutdown() {
+ Worker* current = NULL;
+ {
+ MutexLocker ml(&mutex_);
+ shutting_down_ = true;
+ current = all_workers_;
+ all_workers_ = NULL;
+ idle_workers_ = NULL;
+ count_idle_ = 0;
+ count_running_ = 0;
+
+ while (current != NULL) {
+ Worker* next = current->all_next_;
+ current->all_next_ = NULL;
+ current->idle_next_ = NULL;
+ current->state_ = kDone;
+ current = next;
+ }
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+
+ while (current != NULL) {
Ivan Posva 2012/03/07 01:06:05 Isn't current guaranteed to be NULL here?
turnidge 2012/03/07 18:02:20 Oops, that broke in my most recent changes. Fixed
+ Worker* next = current->all_next_;
+ current->Shutdown();
+ current = next;
+ }
+}
+
+
+void ThreadPool::SetIdle(Worker* worker) {
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return;
+ }
+ ASSERT(worker->state_ == kRunning);
+ worker->idle_next_ = idle_workers_;
+ idle_workers_ = worker;
+ worker->state_ = kIdle;
+ count_idle_++;
+ count_running_--;
+}
+
+
+bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
+ Worker* head = all_workers_;
+ ASSERT(worker != NULL && head != NULL);
+ ASSERT(worker->state_ != kDone);
+
+ // Special case head of list.
+ if (head == worker) {
+ head = worker->all_next_;
+ worker->all_next_ = NULL;
+ return true;
+ }
+
+ // We only want to remove a worker if it is idle.
Ivan Posva 2012/03/07 01:06:05 Comment out of place?
turnidge 2012/03/07 18:02:20 Comment removed, here and below.
+ for (Worker* current = head;
+ current->all_next_ != NULL;
+ current = current->all_next_) {
+ if (current->all_next_ == worker) {
+ // We found this worker in the idle list. Remove it.
Ivan Posva 2012/03/07 01:06:05 Comment seems out of place.
turnidge 2012/03/07 18:02:20 Comment removed.
+ current->all_next_ = current->all_next_->all_next_;
+ worker->all_next_ = NULL;
+ return true;
+ }
+ }
+}
+
+
+bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
+ Worker* head = idle_workers_;
+ ASSERT(worker != NULL && head != NULL);
+ ASSERT(worker->state_ == kIdle);
+
+ // Special case head of list.
+ if (head == worker) {
+ head = worker->idle_next_;
+ worker->idle_next_ = NULL;
+ return true;
+ }
+
+ // We only want to remove a worker if it is idle.
+ for (Worker* current = head;
+ current->idle_next_ != NULL;
+ current = current->idle_next_) {
+ if (current->idle_next_ == worker) {
+ // We found this worker in the idle list. Remove it.
+ current->idle_next_ = current->idle_next_->idle_next_;
+ worker->idle_next_ = NULL;
+ return true;
+ }
+ }
+}
+
+
+bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
+ MutexLocker ml(&mutex_);
+ if (shutting_down_) {
+ return false;
+ }
+ if (worker->state_ != kIdle) {
+ return false;
+ }
+
+ // Remove from idle list.
+ bool found = RemoveWorkerFromIdleList(worker);
+ ASSERT(found);
+
+ // Remove from all list.
+ found = RemoveWorkerFromAllList(worker);
+ ASSERT(found);
+
+ worker->state_ = kDone;
+ count_stopped_++;
+ count_idle_--;
+ return true;
+}
+
+
+ThreadPool::Task::Task() {
+}
+
+
+ThreadPool::Task::~Task() {
+}
+
+
+ThreadPool::Worker::Worker(ThreadPool* pool)
+ : pool_(pool),
+ task_(NULL),
+ started_(false),
+ done_(false),
+ state_(kInvalid),
+ all_next_(NULL),
+ idle_next_(NULL) {
+}
+
+
+void ThreadPool::Worker::Run(Task* task) {
+ MonitorLocker ml(&monitor_);
+ ASSERT(task_ == NULL);
+ if (!started_) {
+ Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
+ started_ = true;
+ }
+ task_ = task;
+ ml.Notify();
+}
+
+
+void ThreadPool::Worker::Loop() {
+ MonitorLocker ml(&monitor_);
+ if (done_) {
+ return;
+ }
+
+ ASSERT(started_ && task_ != NULL);
+ int64_t idle_start = 0;
+ while (true) {
+ Task* task = task_;
+ task_ = NULL;
+
+ // Release monitor while handling the task.
+ monitor_.Exit();
+ task->Run();
+ delete task;
+ monitor_.Enter();
+
+ if (done_) {
+ return;
+ }
+ ASSERT(task_ == NULL);
+
+ pool_->SetIdle(this);
+ idle_start = OS::GetCurrentTimeMillis();
+ do {
+ Monitor::WaitResult result = ml.Wait(FLAG_worker_timeout_millis);
+ if (done_) {
+ return;
+ }
+ if (task_ == NULL &&
+ result == Monitor::kTimedOut &&
+ FLAG_worker_timeout_millis > 0) {
+ bool timeout =
+ (OS::GetCurrentTimeMillis() - idle_start) >
+ FLAG_worker_timeout_millis;
+ if (timeout && pool_->ReleaseIdleWorker(this)) {
+ return;
+ }
+ }
+ } while (task_ == NULL);
+ }
+ UNREACHABLE();
+}
+
+
+void ThreadPool::Worker::Shutdown() {
+ MonitorLocker ml(&monitor_);
+ done_ = true;
+ ml.Notify();
+}
+
+
+// static
+void ThreadPool::Worker::Main(uword args) {
+ Worker* worker = reinterpret_cast<Worker*>(args);
+ worker->Loop();
+
+ // It should be okay to access these unlocked here in this assert.
+ ASSERT(worker->state_ == kDone && worker->idle_next_ == NULL);
+ delete worker;
+}
+
+} // namespace dart
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698