Index: runtime/vm/thread_pool.cc |
=================================================================== |
--- runtime/vm/thread_pool.cc (revision 0) |
+++ runtime/vm/thread_pool.cc (revision 0) |
@@ -0,0 +1,244 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+#include "vm/thread_pool.h" |
+ |
+namespace dart { |
+ |
+DEFINE_FLAG(int, worker_timeout_millis, 5000, |
+ "Free workers when they have been idle for this amount of time."); |
+ |
+ThreadPool::ThreadPool() |
+ : mutex_(new Mutex()), |
+ done_(false), |
+ idle_workers_(NULL), |
+ running_workers_(NULL), |
+ workers_started_(0), |
+ workers_stopped_(0) { |
+} |
+ |
+ |
+ThreadPool::~ThreadPool() { |
+ Shutdown(); |
+ delete mutex_; |
+ mutex_ = NULL; |
+} |
+ |
+ |
+void ThreadPool::Run(Task* task) { |
+ Worker* worker = NULL; |
+ { |
+ MutexLocker ml(mutex_); |
+ if (done_) { |
+ return; |
+ } |
+ worker = GetIdleWorker(); |
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
+ worker->Run(task); |
+} |
+ |
+ |
+void ThreadPool::Shutdown() { |
+ Worker* idle_workers = NULL; |
+ Worker* running_workers = NULL; |
+ { |
+ MutexLocker ml(mutex_); |
+ done_ = true; |
+ idle_workers = idle_workers_; |
+ idle_workers_ = NULL; |
+ running_workers = running_workers_; |
+ running_workers = NULL; |
+ } |
+ // Release ThreadPool::mutex_ before calling Worker functions. |
+ ShutdownWorkerList(idle_workers); |
+ ShutdownWorkerList(running_workers); |
+} |
+ |
+ |
+void ThreadPool::RemoveWorkerFromList(Worker* worker, Worker** head) { |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL && head != NULL);
turnidge
2012/03/06 22:34:01
Function removed, but added assert in some similar
|
+ Worker* next = worker->next_; |
+ Worker* prev = worker->prev_; |
+ if (next) { |
+ next->prev_ = prev; |
+ } |
+ if (prev) { |
+ ASSERT(*head != worker); |
+ prev->next_ = next; |
+ } else { |
+ // This worker is at the head of the list. |
+ ASSERT(*head == worker); |
+ ASSERT(worker->prev_ == NULL); |
+ *head = next; |
+ } |
+ worker->next_ = NULL; |
+ worker->prev_ = NULL; |
+} |
+ |
+ |
+void ThreadPool::AddWorkerToList(Worker* worker, Worker** head) { |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL && head != NULL);
turnidge
2012/03/06 22:34:01
Ditto above.
|
+ ASSERT(worker->next_ == NULL && worker->prev_ == NULL); |
+ worker->next_ = *head; |
+ worker->prev_ = NULL; |
+ if (*head) { |
+ (*head)->prev_ = worker; |
+ } |
+ *head = worker; |
+} |
+ |
+ |
+ThreadPool::Worker* ThreadPool::GetIdleWorker() { |
siva
2012/03/06 05:07:00
Missing MutexLocker ml(mutex_); ?
turnidge
2012/03/06 22:34:01
We are already holding the lock. The only caller
|
+ ASSERT(!done_); |
+ Worker* worker = NULL; |
+ if (idle_workers_ == NULL) { |
+ worker = new Worker(this); |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL);
turnidge
2012/03/06 22:34:01
Should I be doing this on every "new" then?
|
+ Thread::Start(&ThreadPool::Main, reinterpret_cast<uword>(worker)); |
+ workers_started_++; |
+ } else { |
+ // Get the first worker from the idle worker list. |
+ worker = idle_workers_; |
+ RemoveWorkerFromList(worker, &idle_workers_); |
+ } |
+ // Move worker to the running list. |
+ AddWorkerToList(worker, &running_workers_); |
+ return worker; |
+} |
+ |
+ |
+void ThreadPool::SetIdle(Worker* worker) { |
+ MutexLocker ml(mutex_); |
+ ASSERT(!done_); |
siva
2012/03/06 05:07:00
Instead of asserting shouldn't you just check for
turnidge
2012/03/06 22:34:01
Fixed.
|
+ |
+ RemoveWorkerFromList(worker, &running_workers_); |
+ AddWorkerToList(worker, &idle_workers_); |
+} |
+ |
+ |
+bool ThreadPool::RemoveIdleWorker(Worker* worker) { |
+ MutexLocker ml(mutex_); |
+ ASSERT(!done_); |
siva
2012/03/06 05:07:00
Instead of asserting for !done_ here shouldn't you
turnidge
2012/03/06 22:34:01
Fixed.
|
+ |
+ // We only want to remove a worker if it is idle. |
+ for (Worker* current = idle_workers_; |
+ current != NULL; |
+ current = current->next_) { |
+ if (current == worker) { |
+ // We found this worker in the idle list. Remove it. |
+ RemoveWorkerFromList(worker, &idle_workers_); |
+ workers_stopped_++; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
+void ThreadPool::ShutdownWorkerList(Worker* head) { |
+ Worker* worker = head; |
+ while (worker != NULL) { |
+ Worker* next = worker->next_; |
+ worker->prev_ = NULL; |
+ worker->next_ = NULL; |
+ worker->Shutdown(); |
+ worker = next; |
+ } |
+} |
+ |
+ |
+void ThreadPool::Main(uword args) { |
+ Worker* worker = reinterpret_cast<Worker*>(args); |
+ worker->Loop(); |
+ ASSERT(worker->next_ == NULL && worker->prev_ == NULL); |
+ delete worker; |
+} |
+ |
+ |
+ThreadPool::Task::Task() { |
+} |
+ |
+ |
+ThreadPool::Task::~Task() { |
+} |
+ |
+ |
+ThreadPool::Worker::Worker(ThreadPool* pool) |
+ : monitor_(new Monitor()), |
+ pool_(pool), |
+ task_(NULL), |
+ done_(false), |
+ next_(NULL), |
+ prev_(NULL) { |
+} |
+ |
+ |
+ThreadPool::Worker::~Worker() { |
+ delete monitor_; |
+ monitor_ = NULL; |
+} |
+ |
+ |
+void ThreadPool::Worker::Run(Task* task) { |
+ MonitorLocker ml(monitor_); |
+ ASSERT(task_ == NULL); |
+ task_ = task; |
+ ml.Notify(); |
+} |
+ |
+ |
+void ThreadPool::Worker::Loop() { |
+ MonitorLocker ml(monitor_); |
+ |
+ // Wait for first task. |
+ while (task_ == NULL) { |
+ ml.Wait(); |
+ if (done_) { |
+ return; |
+ } |
+ } |
siva
2012/03/06 05:07:00
Do we need this initial while loop, there is an im
turnidge
2012/03/06 22:34:01
I moved the call to Thread::Start to be under lock
|
+ |
+ int64_t idle_start = 0; |
+ while (true) { |
+ Task* task = task_; |
+ task_ = NULL; |
+ |
+ // Release monitor while handling the task. |
+ monitor_->Exit(); |
+ task->Run(); |
+ delete task; |
+ monitor_->Enter(); |
+ |
+ if (done_) { |
+ return; |
+ } |
+ ASSERT(task_ == NULL); |
+ |
+ pool_->SetIdle(this); |
+ idle_start = OS::GetCurrentTimeMillis(); |
+ do { |
+ ml.Wait(FLAG_worker_timeout_millis); |
siva
2012/03/06 05:07:00
Monitor::WaitResult result = ml.Wait(...);
and yo
turnidge
2012/03/06 22:34:01
Done.
|
+ if (done_) { |
+ return; |
+ } |
+ if (task_ == NULL && FLAG_worker_timeout_millis > 0) { |
+ bool timeout = |
+ (OS::GetCurrentTimeMillis() - idle_start) > |
+ FLAG_worker_timeout_millis; |
+ if (timeout && pool_->RemoveIdleWorker(this)) { |
+ return; |
+ } |
+ } |
+ } while (task_ == NULL); |
+ } |
+ UNREACHABLE(); |
+} |
+ |
+ |
+void ThreadPool::Worker::Shutdown() { |
+ MonitorLocker ml(monitor_); |
+ done_ = true; |
+ ml.Notify(); |
+} |
+ |
+ |
+} // namespace dart |