Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(192)

Side by Side Diff: runtime/bin/thread_pool.cc

Issue 9212043: Refactored thread pool (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r4180 Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/thread_pool.h ('k') | runtime/bin/thread_pool_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "bin/thread_pool.h" 5 #include "bin/thread_pool.h"
6 6
7 #include "bin/thread.h" 7 #include "bin/thread.h"
8 8
9 void TaskQueue::Insert(TaskQueueEntry* entry) { 9
10 MonitorLocker monitor(&monitor_); 10 bool ThreadPool::InsertTask(Task task) {
11 monitor_.Enter(); 11 TaskQueueEntry* entry = new TaskQueueEntry(task);
12 MonitorLocker locker(&monitor_);
13 if (terminate_) return false;
12 if (head_ == NULL) { 14 if (head_ == NULL) {
13 head_ = entry; 15 head_ = entry;
14 tail_ = entry; 16 tail_ = entry;
15 monitor.Notify(); 17 locker.Notify();
16 } else { 18 } else {
17 tail_->set_next(entry); 19 tail_->set_next(entry);
18 tail_ = entry; 20 tail_ = entry;
19 } 21 }
22 return true;
20 } 23 }
21 24
22 25
23 TaskQueueEntry* TaskQueue::Remove() { 26 ThreadPool::TaskQueueEntry* ThreadPool::WaitForTask() {
24 MonitorLocker monitor(&monitor_); 27 MonitorLocker locker(&monitor_);
28 if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) {
29 return NULL;
30 }
25 TaskQueueEntry* result = head_; 31 TaskQueueEntry* result = head_;
26 while (result == NULL) { 32 while (result == NULL) {
27 if (terminate_) { 33 locker.Wait();
28 return NULL; 34 if (terminate_ && (drain_flag_ == kDoNotDrain || head_ == NULL)) {
29 }
30 monitor.Wait();
31 if (terminate_) {
32 return NULL; 35 return NULL;
33 } 36 }
34 result = head_; 37 result = head_;
35 } 38 }
36 head_ = result->next(); 39 head_ = result->next();
37 ASSERT(head_ != NULL || tail_ == result); 40 ASSERT(head_ != NULL || tail_ == result);
38 return result; 41 return result;
39 } 42 }
40 43
41 44
42 void TaskQueue::Shutdown() {
43 MonitorLocker monitor(&monitor_);
44 terminate_ = true;
45 monitor.NotifyAll();
46 }
47
48
49 void ThreadPool::InsertTask(Task task) {
50 TaskQueueEntry* entry = new TaskQueueEntry(task);
51 queue_.Insert(entry);
52 }
53
54
55 Task ThreadPool::WaitForTask() {
56 TaskQueueEntry* entry = queue_.Remove();
57 if (entry == NULL) {
58 return NULL;
59 }
60 Task task = entry->task();
61 delete entry;
62 return task;
63 }
64
65
66 void ThreadPool::Start() { 45 void ThreadPool::Start() {
67 MonitorLocker monitor(&monitor_); 46 MonitorLocker locker(&monitor_);
68 for (int i = 0; i < size_; i++) { 47 terminate_ = false;
48 for (int i = 0; i < initial_number_of_threads_; i++) {
69 int result = dart::Thread::Start(&ThreadPool::Main, 49 int result = dart::Thread::Start(&ThreadPool::Main,
70 reinterpret_cast<uword>(this)); 50 reinterpret_cast<uword>(this));
71 if (result != 0) { 51 if (result != 0) {
72 FATAL1("Failed to start thread pool thread %d", result); 52 FATAL1("Failed to start thread pool thread %d", result);
73 } 53 }
54 number_of_threads_++;
74 } 55 }
75 } 56 }
76 57
77 58
78 void ThreadPool::Shutdown() { 59 void ThreadPool::Shutdown(DrainFlag drain_flag) {
60 MonitorLocker locker(&monitor_);
79 terminate_ = true; 61 terminate_ = true;
80 queue_.Shutdown(); 62 drain_flag_ = drain_flag;
81 MonitorLocker monitor(&monitor_); 63 locker.NotifyAll();
82 while (size_ > 0) { 64 int shutdown_count = 0;
83 monitor.Wait(); 65 while (number_of_threads_ > 0 && shutdown_count < 10) {
66 locker.Wait(1000);
67 shutdown_count++;
68 if (number_of_threads_ > 0) {
69 fprintf(stderr,
70 "Waiting for thread pool termination, %d running threads\n",
71 number_of_threads_);
72 }
73 }
74 if (number_of_threads_ > 0) {
75 fprintf(stderr,
76 "Failed thread pool termination, still %d running threads\n",
77 number_of_threads_);
84 } 78 }
85 } 79 }
86 80
87 81
88 void ThreadPool::ThreadTerminated() { 82 void ThreadPool::ThreadTerminated() {
89 MonitorLocker monitor(&monitor_); 83 MonitorLocker locker(&monitor_);
90 size_--; 84 number_of_threads_--;
91 monitor.Notify(); 85 locker.Notify();
92 } 86 }
93 87
94 88
95 void ThreadPool::Main(uword args) { 89 void ThreadPool::Main(uword args) {
96 if (Dart_IsVMFlagSet("trace_thread_pool")) { 90 if (Dart_IsVMFlagSet("trace_thread_pool")) {
97 printf("Thread pool thread started\n"); 91 printf("Thread pool thread started\n");
98 } 92 }
99 ThreadPool* pool = reinterpret_cast<ThreadPool*>(args); 93 ThreadPool* pool = reinterpret_cast<ThreadPool*>(args);
100 while (!pool->terminate_) { 94 while (true) {
101 if (Dart_IsVMFlagSet("trace_thread_pool")) { 95 if (Dart_IsVMFlagSet("trace_thread_pool")) {
102 printf("Waiting for task\n"); 96 printf("Waiting for task\n");
103 } 97 }
104 Task task = pool->WaitForTask(); 98 TaskQueueEntry* task = pool->WaitForTask();
105 if (pool->terminate_) break; 99 if (task == NULL) break;
106 (*(pool->task_handler_))(task); 100 (*(pool->task_handler_))(task->task());
107 } 101 }
108 pool->ThreadTerminated(); 102 pool->ThreadTerminated();
109 }; 103 };
OLDNEW
« no previous file with comments | « runtime/bin/thread_pool.h ('k') | runtime/bin/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698