Index: runtime/vm/thread_pool.cc |
diff --git a/runtime/vm/thread_pool.cc b/runtime/vm/thread_pool.cc |
index 5b3a713d2d09b2c32be7605f622e7671669e36c6..e4453d81b62f99ad95a1182038010c7ce0284f2a 100644 |
--- a/runtime/vm/thread_pool.cc |
+++ b/runtime/vm/thread_pool.cc |
@@ -12,9 +12,6 @@ namespace dart { |
DEFINE_FLAG(int, worker_timeout_millis, 5000, |
"Free workers when they have been idle for this amount of time."); |
-Monitor* ThreadPool::exit_monitor_ = NULL; |
-int* ThreadPool::exit_count_ = NULL; |
- |
ThreadPool::ThreadPool() |
: shutting_down_(false), |
all_workers_(NULL), |
@@ -22,7 +19,9 @@ ThreadPool::ThreadPool() |
count_started_(0), |
count_stopped_(0), |
count_running_(0), |
- count_idle_(0) { |
+ count_idle_(0), |
+ shutting_down_workers_(NULL), |
+ join_list_(NULL) { |
} |
@@ -31,7 +30,7 @@ ThreadPool::~ThreadPool() { |
} |
-void ThreadPool::Run(Task* task) { |
+bool ThreadPool::Run(Task* task) { |
Worker* worker = NULL; |
bool new_worker = false; |
{ |
@@ -39,7 +38,7 @@ void ThreadPool::Run(Task* task) { |
// ThreadPool state. |
MutexLocker ml(&mutex_); |
if (shutting_down_) { |
- return; |
+ return false; |
} |
if (idle_workers_ == NULL) { |
worker = new Worker(this); |
@@ -51,15 +50,17 @@ void ThreadPool::Run(Task* task) { |
worker->all_next_ = all_workers_; |
all_workers_ = worker; |
worker->owned_ = true; |
+ count_running_++; |
} else { |
// Get the first worker from the idle worker list. |
worker = idle_workers_; |
idle_workers_ = worker->idle_next_; |
worker->idle_next_ = NULL; |
count_idle_--; |
+ count_running_++; |
} |
- count_running_++; |
} |
+ |
// Release ThreadPool::mutex_ before calling Worker functions. |
ASSERT(worker != NULL); |
worker->SetTask(task); |
@@ -67,6 +68,7 @@ void ThreadPool::Run(Task* task) { |
// Call StartThread after we've assigned the first task. |
worker->StartThread(); |
} |
+ return true; |
} |
@@ -94,15 +96,48 @@ void ThreadPool::Shutdown() { |
} |
// Release ThreadPool::mutex_ before calling Worker functions. |
- Worker* current = saved; |
- while (current != NULL) { |
- // We may access all_next_ without holding ThreadPool::mutex_ here |
- // because the worker is no longer owned by the ThreadPool. |
- Worker* next = current->all_next_; |
- current->all_next_ = NULL; |
- current->Shutdown(); |
- current = next; |
+ { |
+ MonitorLocker eml(&exit_monitor_); |
+ |
+ // First tell all the workers to shut down. |
+ Worker* current = saved; |
+ ThreadId id = OSThread::GetCurrentThreadId(); |
+ while (current != NULL) { |
+ Worker* next = current->all_next_; |
+ ThreadId currentId = current->id(); |
+ if (currentId != id) { |
+ AddWorkerToShutdownList(current); |
+ } |
+ current->Shutdown(); |
+ current = next; |
+ } |
+ saved = NULL; |
+ |
+ // Wait until all workers will exit. |
+ while (shutting_down_workers_ != NULL) { |
+ // Here, we are waiting for workers to exit. When a worker exits we will |
+ // be notified. |
+ eml.Wait(); |
+ } |
} |
+ |
+ // Extract the join list, and join on the threads. |
+ JoinList* list = NULL; |
+ { |
+ MutexLocker ml(&mutex_); |
+ list = join_list_; |
+ join_list_ = NULL; |
+ } |
+ |
+ // Join non-idle threads. |
+ JoinList::Join(&list); |
+ |
+#if defined(DEBUG) |
+ { |
+ MutexLocker ml(&mutex_); |
+ ASSERT(join_list_ == NULL); |
+ } |
+#endif |
} |
@@ -156,7 +191,7 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
all_workers_ = worker->all_next_; |
worker->all_next_ = NULL; |
worker->owned_ = false; |
- worker->pool_ = NULL; |
+ worker->done_ = true; |
return true; |
} |
@@ -174,16 +209,24 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { |
} |
-void ThreadPool::SetIdle(Worker* worker) { |
- MutexLocker ml(&mutex_); |
- if (shutting_down_) { |
- return; |
+void ThreadPool::SetIdleAndReapExited(Worker* worker) { |
+ JoinList* list = NULL; |
+ { |
+ MutexLocker ml(&mutex_); |
+ if (shutting_down_) { |
+ return; |
+ } |
+ ASSERT(worker->owned_ && !IsIdle(worker)); |
+ worker->idle_next_ = idle_workers_; |
+ idle_workers_ = worker; |
+ count_idle_++; |
+ count_running_--; |
+ |
+ // While we have the lock, opportunistically grab and clear the join_list_. |
+ list = join_list_; |
+ join_list_ = NULL; |
} |
- ASSERT(worker->owned_ && !IsIdle(worker)); |
- worker->idle_next_ = idle_workers_; |
- idle_workers_ = worker; |
- count_idle_++; |
- count_running_--; |
+ JoinList::Join(&list); |
} |
@@ -200,12 +243,62 @@ bool ThreadPool::ReleaseIdleWorker(Worker* worker) { |
bool found = RemoveWorkerFromAllList(worker); |
ASSERT(found); |
+ // The thread for worker will exit. Add its ThreadId to the join_list_ |
+ // so that we can join on it at the next opportunity. |
+ JoinList::AddLocked(OSThread::GetCurrentThreadJoinId(), &join_list_); |
count_stopped_++; |
count_idle_--; |
return true; |
} |
+// Only call while holding the exit_monitor_ |
+void ThreadPool::AddWorkerToShutdownList(Worker* worker) { |
+ worker->shutdown_next_ = shutting_down_workers_; |
+ shutting_down_workers_ = worker; |
+} |
+ |
+ |
+// Only call while holding the exit_monitor_ |
+bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) { |
+ ASSERT(worker != NULL); |
+ ASSERT(shutting_down_workers_ != NULL); |
+ |
+ // Special case head of list. |
+ if (shutting_down_workers_ == worker) { |
+ shutting_down_workers_ = worker->shutdown_next_; |
+ worker->shutdown_next_ = NULL; |
+ return true; |
+ } |
+ |
+ for (Worker* current = shutting_down_workers_; |
+ current->shutdown_next_ != NULL; |
+ current = current->shutdown_next_) { |
+ if (current->shutdown_next_ == worker) { |
+ current->shutdown_next_ = worker->shutdown_next_; |
+ worker->shutdown_next_ = NULL; |
+ return true; |
+ } |
+ } |
+ return false; |
+} |
+ |
+ |
+void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) { |
+ *list = new JoinList(id, *list); |
+} |
+ |
+ |
+void ThreadPool::JoinList::Join(JoinList** list) { |
+ while (*list != NULL) { |
+ JoinList* current = *list; |
+ *list = current->next(); |
+ OSThread::Join(current->id()); |
+ delete current; |
+ } |
+} |
+ |
+ |
ThreadPool::Task::Task() { |
} |
@@ -217,9 +310,18 @@ ThreadPool::Task::~Task() { |
ThreadPool::Worker::Worker(ThreadPool* pool) |
: pool_(pool), |
task_(NULL), |
+ id_(OSThread::kInvalidThreadId), |
+ done_(false), |
owned_(false), |
all_next_(NULL), |
- idle_next_(NULL) { |
+ idle_next_(NULL), |
+ shutdown_next_(NULL) { |
+} |
+ |
+ |
+ThreadId ThreadPool::Worker::id() { |
+ MonitorLocker ml(&monitor_); |
+ return id_; |
} |
@@ -264,7 +366,7 @@ static int64_t ComputeTimeout(int64_t idle_start) { |
} |
-void ThreadPool::Worker::Loop() { |
+bool ThreadPool::Worker::Loop() { |
MonitorLocker ml(&monitor_); |
int64_t idle_start; |
while (true) { |
@@ -281,10 +383,10 @@ void ThreadPool::Worker::Loop() { |
ASSERT(task_ == NULL); |
if (IsDone()) { |
- return; |
+ return false; |
} |
- ASSERT(pool_ != NULL); |
- pool_->SetIdle(this); |
+ ASSERT(!done_); |
+ pool_->SetIdleAndReapExited(this); |
idle_start = OS::GetCurrentTimeMillis(); |
while (true) { |
Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); |
@@ -294,21 +396,21 @@ void ThreadPool::Worker::Loop() { |
break; |
} |
if (IsDone()) { |
- return; |
+ return false; |
} |
- if (result == Monitor::kTimedOut && |
- pool_->ReleaseIdleWorker(this)) { |
- return; |
+ if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) { |
+ return true; |
} |
} |
} |
UNREACHABLE(); |
+ return false; |
} |
void ThreadPool::Worker::Shutdown() { |
MonitorLocker ml(&monitor_); |
- pool_ = NULL; // Fail fast if someone tries to access pool_. |
+ done_ = true; |
ml.Notify(); |
} |
@@ -317,20 +419,58 @@ void ThreadPool::Worker::Shutdown() { |
void ThreadPool::Worker::Main(uword args) { |
Thread::EnsureInit(); |
Worker* worker = reinterpret_cast<Worker*>(args); |
- worker->Loop(); |
+ ThreadId id = OSThread::GetCurrentThreadId(); |
+ ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(); |
+ ThreadPool* pool; |
+ |
+ { |
+ MonitorLocker ml(&worker->monitor_); |
+ ASSERT(worker->task_); |
+ worker->id_ = id; |
+ pool = worker->pool_; |
+ } |
+ |
+ bool released = worker->Loop(); |
// It should be okay to access these unlocked here in this assert. |
- ASSERT(!worker->owned_ && |
- worker->all_next_ == NULL && |
- worker->idle_next_ == NULL); |
- |
- // The exit monitor is only used during testing. |
- if (ThreadPool::exit_monitor_) { |
- MonitorLocker ml(ThreadPool::exit_monitor_); |
- (*ThreadPool::exit_count_)++; |
- ml.Notify(); |
+ // worker->all_next_ is retained by the pool for shutdown monitoring. |
+ ASSERT(!worker->owned_ && (worker->idle_next_ == NULL)); |
+ |
+ if (!released) { |
+ // This worker is exiting because the thread pool is being shut down. |
+ // Inform the thread pool that we are exiting. We remove this worker from |
+ // shutting_down_workers_ list because there will be no need for the |
+ // ThreadPool to take action for this worker. |
+ { |
+ MutexLocker ml(&pool->mutex_); |
+ JoinList::AddLocked(join_id, &pool->join_list_); |
+ } |
+ |
+ // worker->id_ should never be read again, so set to invalid in debug mode |
+ // for asserts. |
+#if defined(DEBUG) |
+ { |
+ MonitorLocker ml(&worker->monitor_); |
+ worker->id_ = OSThread::kInvalidThreadId; |
+ } |
+#endif |
+ |
+ // Remove from the shutdown list, delete, and notify the thread pool. |
+ { |
+ MonitorLocker eml(&pool->exit_monitor_); |
+ pool->RemoveWorkerFromShutdownList(worker); |
+ delete worker; |
+ eml.Notify(); |
+ } |
+ } else { |
+ // This worker is going down because it was idle for too long. This case |
+ // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker. |
+ // The worker's id is added to the thread pool's join list by |
+ // ReleaseIdleWorker, so in the case that the thread pool begins shutting |
+ // down immediately after returning from worker->Loop() above, we still |
+ // wait for the thread to exit by joining on it in Shutdown(). |
+ delete worker; |
} |
- delete worker; |
#if defined(TARGET_OS_WINDOWS) |
Thread::CleanUp(); |
#endif |