Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(643)

Unified Diff: runtime/bin/thread_pool.cc

Issue 9212043: Refactored thread pool (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r4180 Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/thread_pool.h ('k') | runtime/bin/thread_pool_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/thread_pool.cc
diff --git a/runtime/bin/thread_pool.cc b/runtime/bin/thread_pool.cc
index cf1ec3b9d4cc941322d0857bc81f98f987613489..19b6961604c814c2ae570b24c9c6c6cb93942d75 100644
--- a/runtime/bin/thread_pool.cc
+++ b/runtime/bin/thread_pool.cc
@@ -6,29 +6,32 @@
#include "bin/thread.h"
-void TaskQueue::Insert(TaskQueueEntry* entry) {
- MonitorLocker monitor(&monitor_);
- monitor_.Enter();
+
+bool ThreadPool::InsertTask(Task task) {
+ TaskQueueEntry* entry = new TaskQueueEntry(task);
+ MonitorLocker locker(&monitor_);
+ if (terminate_) return false;
if (head_ == NULL) {
head_ = entry;
tail_ = entry;
- monitor.Notify();
+ locker.Notify();
} else {
tail_->set_next(entry);
tail_ = entry;
}
+ return true;
}
-TaskQueueEntry* TaskQueue::Remove() {
- MonitorLocker monitor(&monitor_);
+ThreadPool::TaskQueueEntry* ThreadPool::WaitForTask() {
+ MonitorLocker locker(&monitor_);
+ if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) {
+ return NULL;
+ }
TaskQueueEntry* result = head_;
while (result == NULL) {
- if (terminate_) {
- return NULL;
- }
- monitor.Wait();
- if (terminate_) {
+ locker.Wait();
+ if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) {
return NULL;
}
result = head_;
@@ -39,56 +42,47 @@ TaskQueueEntry* TaskQueue::Remove() {
}
-void TaskQueue::Shutdown() {
- MonitorLocker monitor(&monitor_);
- terminate_ = true;
- monitor.NotifyAll();
-}
-
-
-void ThreadPool::InsertTask(Task task) {
- TaskQueueEntry* entry = new TaskQueueEntry(task);
- queue_.Insert(entry);
-}
-
-
-Task ThreadPool::WaitForTask() {
- TaskQueueEntry* entry = queue_.Remove();
- if (entry == NULL) {
- return NULL;
- }
- Task task = entry->task();
- delete entry;
- return task;
-}
-
-
void ThreadPool::Start() {
- MonitorLocker monitor(&monitor_);
- for (int i = 0; i < size_; i++) {
+ MonitorLocker locker(&monitor_);
+ terminate_ = false;
+ for (int i = 0; i < initial_number_of_threads_; i++) {
int result = dart::Thread::Start(&ThreadPool::Main,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start thread pool thread %d", result);
}
+ number_of_threads_++;
}
}
-void ThreadPool::Shutdown() {
+void ThreadPool::Shutdown(DrainFlag drain_flag) {
+ MonitorLocker locker(&monitor_);
terminate_ = true;
- queue_.Shutdown();
- MonitorLocker monitor(&monitor_);
- while (size_ > 0) {
- monitor.Wait();
+ drain_flag_ = drain_flag;
+ locker.NotifyAll();
+ int shutdown_count = 0;
+ while (number_of_threads_ > 0 && shutdown_count < 10) {
+ locker.Wait(1000);
+ shutdown_count++;
+ if (number_of_threads_ > 0) {
+ fprintf(stderr,
+ "Waiting for thread pool termination, %d running threads\n",
+ number_of_threads_);
+ }
+ }
+ if (number_of_threads_ > 0) {
+ fprintf(stderr,
+ "Failed thread pool termination, still %d running threads\n",
+ number_of_threads_);
}
}
void ThreadPool::ThreadTerminated() {
- MonitorLocker monitor(&monitor_);
- size_--;
- monitor.Notify();
+ MonitorLocker locker(&monitor_);
+ number_of_threads_--;
+ locker.Notify();
}
@@ -97,13 +91,13 @@ void ThreadPool::Main(uword args) {
printf("Thread pool thread started\n");
}
ThreadPool* pool = reinterpret_cast<ThreadPool*>(args);
- while (!pool->terminate_) {
+ while (true) {
if (Dart_IsVMFlagSet("trace_thread_pool")) {
printf("Waiting for task\n");
}
- Task task = pool->WaitForTask();
- if (pool->terminate_) break;
- (*(pool->task_handler_))(task);
+ TaskQueueEntry* task = pool->WaitForTask();
+ if (task == NULL) break;
+ (*(pool->task_handler_))(task->task());
}
pool->ThreadTerminated();
};
« no previous file with comments | « runtime/bin/thread_pool.h ('k') | runtime/bin/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698