Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(776)

Unified Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/vm/thread_pool.cc
===================================================================
--- runtime/vm/thread_pool.cc (revision 0)
+++ runtime/vm/thread_pool.cc (revision 0)
@@ -0,0 +1,244 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+#include "vm/thread_pool.h"
+
+namespace dart {
+
+DEFINE_FLAG(int, worker_timeout_millis, 5000,
+ "Free workers when they have been idle for this amount of time.");
+
+ThreadPool::ThreadPool()
+ : mutex_(new Mutex()),
+ done_(false),
+ idle_workers_(NULL),
+ running_workers_(NULL),
+ workers_started_(0),
+ workers_stopped_(0) {
+}
+
+
+ThreadPool::~ThreadPool() {
+ Shutdown();
+ delete mutex_;
+ mutex_ = NULL;
+}
+
+
+void ThreadPool::Run(Task* task) {
+ Worker* worker = NULL;
+ {
+ MutexLocker ml(mutex_);
+ if (done_) {
+ return;
+ }
+ worker = GetIdleWorker();
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+ worker->Run(task);
+}
+
+
+void ThreadPool::Shutdown() {
+ Worker* idle_workers = NULL;
+ Worker* running_workers = NULL;
+ {
+ MutexLocker ml(mutex_);
+ done_ = true;
+ idle_workers = idle_workers_;
+ idle_workers_ = NULL;
+ running_workers = running_workers_;
+ running_workers = NULL;
+ }
+ // Release ThreadPool::mutex_ before calling Worker functions.
+ ShutdownWorkerList(idle_workers);
+ ShutdownWorkerList(running_workers);
+}
+
+
+void ThreadPool::RemoveWorkerFromList(Worker* worker, Worker** head) {
siva 2012/03/06 05:07:00 ASSERT(worker != NULL && head != NULL);
turnidge 2012/03/06 22:34:01 Function removed, but added assert in some similar
+ Worker* next = worker->next_;
+ Worker* prev = worker->prev_;
+ if (next) {
+ next->prev_ = prev;
+ }
+ if (prev) {
+ ASSERT(*head != worker);
+ prev->next_ = next;
+ } else {
+ // This worker is at the head of the list.
+ ASSERT(*head == worker);
+ ASSERT(worker->prev_ == NULL);
+ *head = next;
+ }
+ worker->next_ = NULL;
+ worker->prev_ = NULL;
+}
+
+
+void ThreadPool::AddWorkerToList(Worker* worker, Worker** head) {
siva 2012/03/06 05:07:00 ASSERT(worker != NULL && head != NULL);
turnidge 2012/03/06 22:34:01 Ditto above.
+ ASSERT(worker->next_ == NULL && worker->prev_ == NULL);
+ worker->next_ = *head;
+ worker->prev_ = NULL;
+ if (*head) {
+ (*head)->prev_ = worker;
+ }
+ *head = worker;
+}
+
+
+ThreadPool::Worker* ThreadPool::GetIdleWorker() {
siva 2012/03/06 05:07:00 Missing MutexLocker ml(mutex_); ?
turnidge 2012/03/06 22:34:01 We are already holding the lock. The only caller
+ ASSERT(!done_);
+ Worker* worker = NULL;
+ if (idle_workers_ == NULL) {
+ worker = new Worker(this);
siva 2012/03/06 05:07:00 ASSERT(worker != NULL);
turnidge 2012/03/06 22:34:01 Should I be doing this on every "new" then?
+ Thread::Start(&ThreadPool::Main, reinterpret_cast<uword>(worker));
+ workers_started_++;
+ } else {
+ // Get the first worker from the idle worker list.
+ worker = idle_workers_;
+ RemoveWorkerFromList(worker, &idle_workers_);
+ }
+ // Move worker to the running list.
+ AddWorkerToList(worker, &running_workers_);
+ return worker;
+}
+
+
+void ThreadPool::SetIdle(Worker* worker) {
+ MutexLocker ml(mutex_);
+ ASSERT(!done_);
siva 2012/03/06 05:07:00 Instead of asserting shouldn't you just check for
turnidge 2012/03/06 22:34:01 Fixed.
+
+ RemoveWorkerFromList(worker, &running_workers_);
+ AddWorkerToList(worker, &idle_workers_);
+}
+
+
+bool ThreadPool::RemoveIdleWorker(Worker* worker) {
+ MutexLocker ml(mutex_);
+ ASSERT(!done_);
siva 2012/03/06 05:07:00 Instead of asserting for !done_ here shouldn't you
turnidge 2012/03/06 22:34:01 Fixed.
+
+ // We only want to remove a worker if it is idle.
+ for (Worker* current = idle_workers_;
+ current != NULL;
+ current = current->next_) {
+ if (current == worker) {
+ // We found this worker in the idle list. Remove it.
+ RemoveWorkerFromList(worker, &idle_workers_);
+ workers_stopped_++;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+void ThreadPool::ShutdownWorkerList(Worker* head) {
+ Worker* worker = head;
+ while (worker != NULL) {
+ Worker* next = worker->next_;
+ worker->prev_ = NULL;
+ worker->next_ = NULL;
+ worker->Shutdown();
+ worker = next;
+ }
+}
+
+
+void ThreadPool::Main(uword args) {
+ Worker* worker = reinterpret_cast<Worker*>(args);
+ worker->Loop();
+ ASSERT(worker->next_ == NULL && worker->prev_ == NULL);
+ delete worker;
+}
+
+
+ThreadPool::Task::Task() {
+}
+
+
+ThreadPool::Task::~Task() {
+}
+
+
+ThreadPool::Worker::Worker(ThreadPool* pool)
+ : monitor_(new Monitor()),
+ pool_(pool),
+ task_(NULL),
+ done_(false),
+ next_(NULL),
+ prev_(NULL) {
+}
+
+
+ThreadPool::Worker::~Worker() {
+ delete monitor_;
+ monitor_ = NULL;
+}
+
+
+void ThreadPool::Worker::Run(Task* task) {
+ MonitorLocker ml(monitor_);
+ ASSERT(task_ == NULL);
+ task_ = task;
+ ml.Notify();
+}
+
+
+void ThreadPool::Worker::Loop() {
+ MonitorLocker ml(monitor_);
+
+ // Wait for first task.
+ while (task_ == NULL) {
+ ml.Wait();
+ if (done_) {
+ return;
+ }
+ }
siva 2012/03/06 05:07:00 Do we need this initial while loop, there is an im
turnidge 2012/03/06 22:34:01 I moved the call to Thread::Start to be under lock
+
+ int64_t idle_start = 0;
+ while (true) {
+ Task* task = task_;
+ task_ = NULL;
+
+ // Release monitor while handling the task.
+ monitor_->Exit();
+ task->Run();
+ delete task;
+ monitor_->Enter();
+
+ if (done_) {
+ return;
+ }
+ ASSERT(task_ == NULL);
+
+ pool_->SetIdle(this);
+ idle_start = OS::GetCurrentTimeMillis();
+ do {
+ ml.Wait(FLAG_worker_timeout_millis);
siva 2012/03/06 05:07:00 Monitor::WaitResult result = ml.Wait(...); and yo
turnidge 2012/03/06 22:34:01 Done.
+ if (done_) {
+ return;
+ }
+ if (task_ == NULL && FLAG_worker_timeout_millis > 0) {
+ bool timeout =
+ (OS::GetCurrentTimeMillis() - idle_start) >
+ FLAG_worker_timeout_millis;
+ if (timeout && pool_->RemoveIdleWorker(this)) {
+ return;
+ }
+ }
+ } while (task_ == NULL);
+ }
+ UNREACHABLE();
+}
+
+
+void ThreadPool::Worker::Shutdown() {
+ MonitorLocker ml(monitor_);
+ done_ = true;
+ ml.Notify();
+}
+
+
+} // namespace dart

Powered by Google App Engine
This is Rietveld 408576698