Index: runtime/bin/thread_pool.cc |
diff --git a/runtime/bin/thread_pool.cc b/runtime/bin/thread_pool.cc |
index cf1ec3b9d4cc941322d0857bc81f98f987613489..574e3f98f0cfa7eefdd8a538dfbc6cdb405ac9e5 100644 |
--- a/runtime/bin/thread_pool.cc |
+++ b/runtime/bin/thread_pool.cc |
@@ -6,29 +6,32 @@ |
#include "bin/thread.h" |
-void TaskQueue::Insert(TaskQueueEntry* entry) { |
- MonitorLocker monitor(&monitor_); |
- monitor_.Enter(); |
+ |
+bool ThreadPool::InsertTask(Task task) { |
+ TaskQueueEntry* entry = new TaskQueueEntry(task); |
+ MonitorLocker locker(&monitor_); |
+ if (terminate_) return false; |
if (head_ == NULL) { |
head_ = entry; |
tail_ = entry; |
- monitor.Notify(); |
+ locker.Notify(); |
} else { |
tail_->set_next(entry); |
tail_ = entry; |
} |
+ return true; |
} |
-TaskQueueEntry* TaskQueue::Remove() { |
- MonitorLocker monitor(&monitor_); |
+ThreadPool::TaskQueueEntry* ThreadPool::WaitForTask() { |
+ MonitorLocker locker(&monitor_); |
+ if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) { |
+ return NULL; |
+ } |
TaskQueueEntry* result = head_; |
while (result == NULL) { |
- if (terminate_) { |
- return NULL; |
- } |
- monitor.Wait(); |
- if (terminate_) { |
+ locker.Wait(); |
+ if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) { |
return NULL; |
} |
result = head_; |
@@ -39,56 +42,35 @@ TaskQueueEntry* TaskQueue::Remove() { |
} |
-void TaskQueue::Shutdown() { |
- MonitorLocker monitor(&monitor_); |
- terminate_ = true; |
- monitor.NotifyAll(); |
-} |
- |
- |
-void ThreadPool::InsertTask(Task task) { |
- TaskQueueEntry* entry = new TaskQueueEntry(task); |
- queue_.Insert(entry); |
-} |
- |
- |
-Task ThreadPool::WaitForTask() { |
- TaskQueueEntry* entry = queue_.Remove(); |
- if (entry == NULL) { |
- return NULL; |
- } |
- Task task = entry->task(); |
- delete entry; |
- return task; |
-} |
- |
- |
void ThreadPool::Start() { |
- MonitorLocker monitor(&monitor_); |
- for (int i = 0; i < size_; i++) { |
+ MonitorLocker locker(&monitor_); |
+ terminate_ = false; |
+ for (int i = 0; i < initial_size_; i++) { |
int result = dart::Thread::Start(&ThreadPool::Main, |
reinterpret_cast<uword>(this)); |
if (result != 0) { |
FATAL1("Failed to start thread pool thread %d", result); |
} |
+ size_++; |
} |
} |
-void ThreadPool::Shutdown() { |
+void ThreadPool::Shutdown(DrainFlag drain_flag) { |
+ MonitorLocker locker(&monitor_); |
terminate_ = true; |
- queue_.Shutdown(); |
- MonitorLocker monitor(&monitor_); |
+ drain_flag_ = drain_flag; |
+ locker.NotifyAll(); |
while (size_ > 0) { |
- monitor.Wait(); |
+ locker.Wait(); |
} |
} |
void ThreadPool::ThreadTerminated() { |
- MonitorLocker monitor(&monitor_); |
+ MonitorLocker locker(&monitor_); |
size_--; |
- monitor.Notify(); |
+ locker.Notify(); |
} |
@@ -97,13 +79,13 @@ void ThreadPool::Main(uword args) { |
printf("Thread pool thread started\n"); |
} |
ThreadPool* pool = reinterpret_cast<ThreadPool*>(args); |
- while (!pool->terminate_) { |
+ while (true) { |
if (Dart_IsVMFlagSet("trace_thread_pool")) { |
printf("Waiting for task\n"); |
} |
- Task task = pool->WaitForTask(); |
- if (pool->terminate_) break; |
- (*(pool->task_handler_))(task); |
+ TaskQueueEntry* task = pool->WaitForTask(); |
+ if (task == NULL) break; |
+ (*(pool->task_handler_))(task->task()); |
} |
pool->ThreadTerminated(); |
}; |