Index: runtime/vm/thread_pool.cc |
=================================================================== |
--- runtime/vm/thread_pool.cc (revision 0) |
+++ runtime/vm/thread_pool.cc (revision 0) |
@@ -0,0 +1,278 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+#include "vm/thread_pool.h" |
+ |
+namespace dart { |
+ |
+DEFINE_FLAG(int, worker_timeout_millis, 5000, |
+ "Free workers when they have been idle for this amount of time."); |
+ |
+ThreadPool::ThreadPool() |
+ : shutting_down_(false), |
+ all_workers_(NULL), |
+ idle_workers_(NULL), |
+ count_started_(0), |
+ count_stopped_(0), |
+ count_running_(0), |
+ count_idle_(0) { |
+} |
+ |
+ |
+ThreadPool::~ThreadPool() { |
+ Shutdown(); |
+} |
+ |
+ |
+void ThreadPool::Run(Task* task) { |
+ Worker* worker = NULL; |
+ { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return; |
+ } |
+ worker = GetIdleWorker(); |
siva
2012/03/09 18:36:29
Why not treat the new thread creation case differe
turnidge
2012/03/09 21:40:38
Done.
|
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
siva
2012/03/09 18:36:29
ASSERT(worker != NULL);
turnidge
2012/03/09 21:40:38
Done.
|
+ worker->Run(task); |
+} |
+ |
+ |
+ThreadPool::Worker* ThreadPool::GetIdleWorker() { |
+ // Lock is claimed in ThreadPool::Run. |
+ ASSERT(!shutting_down_); |
+ Worker* worker = NULL; |
+ if (idle_workers_ == NULL) { |
+ worker = new Worker(this); |
+ ASSERT(worker != NULL); |
+ count_started_++; |
+ |
+ // Add worker to the all_workers_ list. |
+ worker->all_next_ = all_workers_; |
+ all_workers_ = worker; |
+ |
+ } else { |
+ // Get the first worker from the idle worker list. |
+ worker = idle_workers_; |
+ idle_workers_ = worker->idle_next_; |
+ worker->idle_next_ = NULL; |
+ count_idle_--; |
+ } |
+ worker->state_ = kRunning; |
+ count_running_++; |
+ return worker; |
+} |
+ |
+ |
+void ThreadPool::Shutdown() { |
+ Worker* saved = NULL; |
+ { |
+ MutexLocker ml(&mutex_); |
+ shutting_down_ = true; |
+ saved = all_workers_; |
+ all_workers_ = NULL; |
+ idle_workers_ = NULL; |
+ |
+ Worker* current = saved; |
+ while (current != NULL) { |
+ Worker* next = current->all_next_; |
+ current->idle_next_ = NULL; |
+ current->state_ = kDone; |
+ current = next; |
+ } |
+ |
+ count_idle_ = 0; |
+ count_running_ = 0; |
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
+ |
+ Worker* current = saved; |
+ while (current != NULL) { |
+ Worker* next = current->all_next_; |
+ current->all_next_ = NULL; |
+ current->Shutdown(); |
+ current = next; |
+ } |
+} |
+ |
+ |
+void ThreadPool::SetIdle(Worker* worker) { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return; |
+ } |
+ ASSERT(worker->state_ == kRunning); |
+ worker->idle_next_ = idle_workers_; |
+ idle_workers_ = worker; |
+ worker->state_ = kIdle; |
+ count_idle_++; |
+ count_running_--; |
+} |
+ |
+ |
+bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
+ ASSERT(worker != NULL && all_workers_ != NULL); |
+ ASSERT(worker->state_ != kDone); |
+ |
+ // Special case head of list. |
+ if (all_workers_ == worker) { |
+ all_workers_ = worker->all_next_; |
+ worker->all_next_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = all_workers_; |
+ current->all_next_ != NULL; |
+ current = current->all_next_) { |
+ if (current->all_next_ == worker) { |
+ current->all_next_ = current->all_next_->all_next_; |
siva
2012/03/09 18:36:29
current->all_next_ = worker->all_next_;
maybe more
turnidge
2012/03/09 21:40:38
Done.
|
+ worker->all_next_ = NULL; |
+ return true; |
+ } |
+ } |
+} |
+ |
+ |
+bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { |
+ ASSERT(worker != NULL && idle_workers_ != NULL); |
+ ASSERT(worker->state_ == kIdle); |
+ |
+ // Special case head of list. |
+ if (idle_workers_ == worker) { |
+ idle_workers_ = worker->idle_next_; |
+ worker->idle_next_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = idle_workers_; |
+ current->idle_next_ != NULL; |
+ current = current->idle_next_) { |
+ if (current->idle_next_ == worker) { |
+ current->idle_next_ = current->idle_next_->idle_next_; |
siva
2012/03/09 18:36:29
current->idle_next_ = worker->idle_next_;
maybe mo
turnidge
2012/03/09 21:40:38
Done.
|
+ worker->idle_next_ = NULL; |
+ return true; |
+ } |
+ } |
+} |
+ |
+ |
+bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return false; |
+ } |
+ if (worker->state_ != kIdle) { |
+ return false; |
+ } |
+ |
+ // Remove from idle list. |
+ bool found = RemoveWorkerFromIdleList(worker); |
+ ASSERT(found); |
+ |
+ // Remove from all list. |
+ found = RemoveWorkerFromAllList(worker); |
+ ASSERT(found); |
+ |
+ worker->state_ = kDone; |
+ count_stopped_++; |
+ count_idle_--; |
+ return true; |
+} |
+ |
+ |
+ThreadPool::Task::Task() { |
+} |
+ |
+ |
+ThreadPool::Task::~Task() { |
+} |
+ |
+ |
+ThreadPool::Worker::Worker(ThreadPool* pool) |
+ : pool_(pool), |
+ task_(NULL), |
+ started_(false), |
+ done_(false), |
+ state_(kInvalid), |
+ all_next_(NULL), |
+ idle_next_(NULL) { |
+} |
+ |
+ |
+void ThreadPool::Worker::Run(Task* task) { |
+ MonitorLocker ml(&monitor_); |
+ ASSERT(task_ == NULL); |
+ if (!started_) { |
+ Thread::Start(&Worker::Main, reinterpret_cast<uword>(this)); |
+ started_ = true; |
+ } |
+ task_ = task; |
+ ml.Notify(); |
+} |
+ |
+ |
+void ThreadPool::Worker::Loop() { |
+ MonitorLocker ml(&monitor_); |
+ if (done_) { |
+ return; |
+ } |
+ |
+ ASSERT(started_ && task_ != NULL); |
+ int64_t idle_start = 0; |
+ while (true) { |
+ Task* task = task_; |
+ task_ = NULL; |
+ |
+ // Release monitor while handling the task. |
+ monitor_.Exit(); |
+ task->Run(); |
+ delete task; |
+ monitor_.Enter(); |
+ |
+ if (done_) { |
+ return; |
+ } |
+ ASSERT(task_ == NULL); |
+ |
+ pool_->SetIdle(this); |
+ idle_start = OS::GetCurrentTimeMillis(); |
+ do { |
+ Monitor::WaitResult result = ml.Wait(FLAG_worker_timeout_millis); |
+ if (done_) { |
+ return; |
+ } |
siva
2012/03/09 18:36:29
Threads which are in the idle list will not remove
turnidge
2012/03/09 21:40:38
Discussed offline -- already removed from idle lis
|
+ if (task_ == NULL && |
+ result == Monitor::kTimedOut && |
+ FLAG_worker_timeout_millis > 0) { |
+ bool timeout = |
+ (OS::GetCurrentTimeMillis() - idle_start) > |
+ FLAG_worker_timeout_millis; |
+ if (timeout && pool_->ReleaseIdleWorker(this)) { |
+ return; |
+ } |
+ } |
+ } while (task_ == NULL); |
+ } |
siva
2012/03/09 18:36:29
MonitorLocker ml(&monitor_);
ASSERT(task_ != NULL)
turnidge
2012/03/09 21:40:38
Discussed offilne.
On 2012/03/09 18:36:29, asiva
|
+ UNREACHABLE(); |
+} |
+ |
+ |
+void ThreadPool::Worker::Shutdown() { |
+ MonitorLocker ml(&monitor_); |
+ done_ = true; |
+ ml.Notify(); |
+} |
+ |
+ |
+// static |
+void ThreadPool::Worker::Main(uword args) { |
+ Worker* worker = reinterpret_cast<Worker*>(args); |
+ worker->Loop(); |
+ |
+ // It should be okay to access these unlocked here in this assert. |
+ ASSERT(worker->state_ == kDone && worker->idle_next_ == NULL); |
siva
2012/03/09 18:36:29
Also (worker->all_next_ == NULL)?
turnidge
2012/03/09 21:40:38
Done.
|
+ delete worker; |
+} |
+ |
+} // namespace dart |