| Index: runtime/vm/thread_pool.cc
|
| diff --git a/runtime/vm/thread_pool.cc b/runtime/vm/thread_pool.cc
|
| index 5b3a713d2d09b2c32be7605f622e7671669e36c6..e4453d81b62f99ad95a1182038010c7ce0284f2a 100644
|
| --- a/runtime/vm/thread_pool.cc
|
| +++ b/runtime/vm/thread_pool.cc
|
| @@ -12,9 +12,6 @@ namespace dart {
|
| DEFINE_FLAG(int, worker_timeout_millis, 5000,
|
| "Free workers when they have been idle for this amount of time.");
|
|
|
| -Monitor* ThreadPool::exit_monitor_ = NULL;
|
| -int* ThreadPool::exit_count_ = NULL;
|
| -
|
| ThreadPool::ThreadPool()
|
| : shutting_down_(false),
|
| all_workers_(NULL),
|
| @@ -22,7 +19,9 @@ ThreadPool::ThreadPool()
|
| count_started_(0),
|
| count_stopped_(0),
|
| count_running_(0),
|
| - count_idle_(0) {
|
| + count_idle_(0),
|
| + shutting_down_workers_(NULL),
|
| + join_list_(NULL) {
|
| }
|
|
|
|
|
| @@ -31,7 +30,7 @@ ThreadPool::~ThreadPool() {
|
| }
|
|
|
|
|
| -void ThreadPool::Run(Task* task) {
|
| +bool ThreadPool::Run(Task* task) {
|
| Worker* worker = NULL;
|
| bool new_worker = false;
|
| {
|
| @@ -39,7 +38,7 @@ void ThreadPool::Run(Task* task) {
|
| // ThreadPool state.
|
| MutexLocker ml(&mutex_);
|
| if (shutting_down_) {
|
| - return;
|
| + return false;
|
| }
|
| if (idle_workers_ == NULL) {
|
| worker = new Worker(this);
|
| @@ -51,15 +50,17 @@ void ThreadPool::Run(Task* task) {
|
| worker->all_next_ = all_workers_;
|
| all_workers_ = worker;
|
| worker->owned_ = true;
|
| + count_running_++;
|
| } else {
|
| // Get the first worker from the idle worker list.
|
| worker = idle_workers_;
|
| idle_workers_ = worker->idle_next_;
|
| worker->idle_next_ = NULL;
|
| count_idle_--;
|
| + count_running_++;
|
| }
|
| - count_running_++;
|
| }
|
| +
|
| // Release ThreadPool::mutex_ before calling Worker functions.
|
| ASSERT(worker != NULL);
|
| worker->SetTask(task);
|
| @@ -67,6 +68,7 @@ void ThreadPool::Run(Task* task) {
|
| // Call StartThread after we've assigned the first task.
|
| worker->StartThread();
|
| }
|
| + return true;
|
| }
|
|
|
|
|
| @@ -94,15 +96,48 @@ void ThreadPool::Shutdown() {
|
| }
|
| // Release ThreadPool::mutex_ before calling Worker functions.
|
|
|
| - Worker* current = saved;
|
| - while (current != NULL) {
|
| - // We may access all_next_ without holding ThreadPool::mutex_ here
|
| - // because the worker is no longer owned by the ThreadPool.
|
| - Worker* next = current->all_next_;
|
| - current->all_next_ = NULL;
|
| - current->Shutdown();
|
| - current = next;
|
| + {
|
| + MonitorLocker eml(&exit_monitor_);
|
| +
|
| + // First tell all the workers to shut down.
|
| + Worker* current = saved;
|
| + ThreadId id = OSThread::GetCurrentThreadId();
|
| + while (current != NULL) {
|
| + Worker* next = current->all_next_;
|
| + ThreadId currentId = current->id();
|
| + if (currentId != id) {
|
| + AddWorkerToShutdownList(current);
|
| + }
|
| + current->Shutdown();
|
| + current = next;
|
| + }
|
| + saved = NULL;
|
| +
|
| + // Wait until all workers will exit.
|
| + while (shutting_down_workers_ != NULL) {
|
| + // Here, we are waiting for workers to exit. When a worker exits we will
|
| + // be notified.
|
| + eml.Wait();
|
| + }
|
| }
|
| +
|
| + // Extract the join list, and join on the threads.
|
| + JoinList* list = NULL;
|
| + {
|
| + MutexLocker ml(&mutex_);
|
| + list = join_list_;
|
| + join_list_ = NULL;
|
| + }
|
| +
|
| + // Join non-idle threads.
|
| + JoinList::Join(&list);
|
| +
|
| +#if defined(DEBUG)
|
| + {
|
| + MutexLocker ml(&mutex_);
|
| + ASSERT(join_list_ == NULL);
|
| + }
|
| +#endif
|
| }
|
|
|
|
|
| @@ -156,7 +191,7 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
|
| all_workers_ = worker->all_next_;
|
| worker->all_next_ = NULL;
|
| worker->owned_ = false;
|
| - worker->pool_ = NULL;
|
| + worker->done_ = true;
|
| return true;
|
| }
|
|
|
| @@ -174,16 +209,24 @@ bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
|
| }
|
|
|
|
|
| -void ThreadPool::SetIdle(Worker* worker) {
|
| - MutexLocker ml(&mutex_);
|
| - if (shutting_down_) {
|
| - return;
|
| +void ThreadPool::SetIdleAndReapExited(Worker* worker) {
|
| + JoinList* list = NULL;
|
| + {
|
| + MutexLocker ml(&mutex_);
|
| + if (shutting_down_) {
|
| + return;
|
| + }
|
| + ASSERT(worker->owned_ && !IsIdle(worker));
|
| + worker->idle_next_ = idle_workers_;
|
| + idle_workers_ = worker;
|
| + count_idle_++;
|
| + count_running_--;
|
| +
|
| + // While we have the lock, opportunistically grab and clear the join_list_.
|
| + list = join_list_;
|
| + join_list_ = NULL;
|
| }
|
| - ASSERT(worker->owned_ && !IsIdle(worker));
|
| - worker->idle_next_ = idle_workers_;
|
| - idle_workers_ = worker;
|
| - count_idle_++;
|
| - count_running_--;
|
| + JoinList::Join(&list);
|
| }
|
|
|
|
|
| @@ -200,12 +243,62 @@ bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
|
| bool found = RemoveWorkerFromAllList(worker);
|
| ASSERT(found);
|
|
|
| + // The thread for worker will exit. Add its ThreadId to the join_list_
|
| + // so that we can join on it at the next opportunity.
|
| + JoinList::AddLocked(OSThread::GetCurrentThreadJoinId(), &join_list_);
|
| count_stopped_++;
|
| count_idle_--;
|
| return true;
|
| }
|
|
|
|
|
| +// Only call while holding the exit_monitor_
|
| +void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
|
| + worker->shutdown_next_ = shutting_down_workers_;
|
| + shutting_down_workers_ = worker;
|
| +}
|
| +
|
| +
|
| +// Only call while holding the exit_monitor_
|
| +bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
|
| + ASSERT(worker != NULL);
|
| + ASSERT(shutting_down_workers_ != NULL);
|
| +
|
| + // Special case head of list.
|
| + if (shutting_down_workers_ == worker) {
|
| + shutting_down_workers_ = worker->shutdown_next_;
|
| + worker->shutdown_next_ = NULL;
|
| + return true;
|
| + }
|
| +
|
| + for (Worker* current = shutting_down_workers_;
|
| + current->shutdown_next_ != NULL;
|
| + current = current->shutdown_next_) {
|
| + if (current->shutdown_next_ == worker) {
|
| + current->shutdown_next_ = worker->shutdown_next_;
|
| + worker->shutdown_next_ = NULL;
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| +}
|
| +
|
| +
|
| +void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) {
|
| + *list = new JoinList(id, *list);
|
| +}
|
| +
|
| +
|
| +void ThreadPool::JoinList::Join(JoinList** list) {
|
| + while (*list != NULL) {
|
| + JoinList* current = *list;
|
| + *list = current->next();
|
| + OSThread::Join(current->id());
|
| + delete current;
|
| + }
|
| +}
|
| +
|
| +
|
| ThreadPool::Task::Task() {
|
| }
|
|
|
| @@ -217,9 +310,18 @@ ThreadPool::Task::~Task() {
|
| ThreadPool::Worker::Worker(ThreadPool* pool)
|
| : pool_(pool),
|
| task_(NULL),
|
| + id_(OSThread::kInvalidThreadId),
|
| + done_(false),
|
| owned_(false),
|
| all_next_(NULL),
|
| - idle_next_(NULL) {
|
| + idle_next_(NULL),
|
| + shutdown_next_(NULL) {
|
| +}
|
| +
|
| +
|
| +ThreadId ThreadPool::Worker::id() {
|
| + MonitorLocker ml(&monitor_);
|
| + return id_;
|
| }
|
|
|
|
|
| @@ -264,7 +366,7 @@ static int64_t ComputeTimeout(int64_t idle_start) {
|
| }
|
|
|
|
|
| -void ThreadPool::Worker::Loop() {
|
| +bool ThreadPool::Worker::Loop() {
|
| MonitorLocker ml(&monitor_);
|
| int64_t idle_start;
|
| while (true) {
|
| @@ -281,10 +383,10 @@ void ThreadPool::Worker::Loop() {
|
|
|
| ASSERT(task_ == NULL);
|
| if (IsDone()) {
|
| - return;
|
| + return false;
|
| }
|
| - ASSERT(pool_ != NULL);
|
| - pool_->SetIdle(this);
|
| + ASSERT(!done_);
|
| + pool_->SetIdleAndReapExited(this);
|
| idle_start = OS::GetCurrentTimeMillis();
|
| while (true) {
|
| Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
|
| @@ -294,21 +396,21 @@ void ThreadPool::Worker::Loop() {
|
| break;
|
| }
|
| if (IsDone()) {
|
| - return;
|
| + return false;
|
| }
|
| - if (result == Monitor::kTimedOut &&
|
| - pool_->ReleaseIdleWorker(this)) {
|
| - return;
|
| + if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
|
| + return true;
|
| }
|
| }
|
| }
|
| UNREACHABLE();
|
| + return false;
|
| }
|
|
|
|
|
| void ThreadPool::Worker::Shutdown() {
|
| MonitorLocker ml(&monitor_);
|
| - pool_ = NULL; // Fail fast if someone tries to access pool_.
|
| + done_ = true;
|
| ml.Notify();
|
| }
|
|
|
| @@ -317,20 +419,58 @@ void ThreadPool::Worker::Shutdown() {
|
| void ThreadPool::Worker::Main(uword args) {
|
| Thread::EnsureInit();
|
| Worker* worker = reinterpret_cast<Worker*>(args);
|
| - worker->Loop();
|
| + ThreadId id = OSThread::GetCurrentThreadId();
|
| + ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId();
|
| + ThreadPool* pool;
|
| +
|
| + {
|
| + MonitorLocker ml(&worker->monitor_);
|
| + ASSERT(worker->task_);
|
| + worker->id_ = id;
|
| + pool = worker->pool_;
|
| + }
|
| +
|
| + bool released = worker->Loop();
|
|
|
| // It should be okay to access these unlocked here in this assert.
|
| - ASSERT(!worker->owned_ &&
|
| - worker->all_next_ == NULL &&
|
| - worker->idle_next_ == NULL);
|
| -
|
| - // The exit monitor is only used during testing.
|
| - if (ThreadPool::exit_monitor_) {
|
| - MonitorLocker ml(ThreadPool::exit_monitor_);
|
| - (*ThreadPool::exit_count_)++;
|
| - ml.Notify();
|
| + // worker->all_next_ is retained by the pool for shutdown monitoring.
|
| + ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
|
| +
|
| + if (!released) {
|
| + // This worker is exiting because the thread pool is being shut down.
|
| + // Inform the thread pool that we are exiting. We remove this worker from
|
| + // shutting_down_workers_ list because there will be no need for the
|
| + // ThreadPool to take action for this worker.
|
| + {
|
| + MutexLocker ml(&pool->mutex_);
|
| + JoinList::AddLocked(join_id, &pool->join_list_);
|
| + }
|
| +
|
| + // worker->id_ should never be read again, so set to invalid in debug mode
|
| + // for asserts.
|
| +#if defined(DEBUG)
|
| + {
|
| + MonitorLocker ml(&worker->monitor_);
|
| + worker->id_ = OSThread::kInvalidThreadId;
|
| + }
|
| +#endif
|
| +
|
| + // Remove from the shutdown list, delete, and notify the thread pool.
|
| + {
|
| + MonitorLocker eml(&pool->exit_monitor_);
|
| + pool->RemoveWorkerFromShutdownList(worker);
|
| + delete worker;
|
| + eml.Notify();
|
| + }
|
| + } else {
|
| + // This worker is going down because it was idle for too long. This case
|
| + // is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
|
| + // The worker's id is added to the thread pool's join list by
|
| + // ReleaseIdleWorker, so in the case that the thread pool begins shutting
|
| + // down immediately after returning from worker->Loop() above, we still
|
| + // wait for the thread to exit by joining on it in Shutdown().
|
| + delete worker;
|
| }
|
| - delete worker;
|
| #if defined(TARGET_OS_WINDOWS)
|
| Thread::CleanUp();
|
| #endif
|
|
|