OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "bin/thread_pool.h" | 5 #include "bin/thread_pool.h" |
6 | 6 |
7 #include "bin/thread.h" | 7 #include "bin/thread.h" |
8 | 8 |
9 void TaskQueue::Insert(TaskQueueEntry* entry) { | 9 |
10 bool ThreadPool::InsertTask(Task task) { | |
11 TaskQueueEntry* entry = new TaskQueueEntry(task); | |
10 MonitorLocker monitor(&monitor_); | 12 MonitorLocker monitor(&monitor_); |
Mads Ager (google)
2012/01/24 12:29:39
MonitorLocker locker(&monitor_); ?
Similarly for
Søren Gjesse
2012/01/24 13:04:35
Done.
| |
11 monitor_.Enter(); | 13 if (terminate_) return false; |
12 if (head_ == NULL) { | 14 if (head_ == NULL) { |
13 head_ = entry; | 15 head_ = entry; |
14 tail_ = entry; | 16 tail_ = entry; |
15 monitor.Notify(); | 17 monitor.Notify(); |
16 } else { | 18 } else { |
17 tail_->set_next(entry); | 19 tail_->set_next(entry); |
18 tail_ = entry; | 20 tail_ = entry; |
19 } | 21 } |
22 return true; | |
20 } | 23 } |
21 | 24 |
22 | 25 |
23 TaskQueueEntry* TaskQueue::Remove() { | 26 ThreadPool::TaskQueueEntry* ThreadPool::WaitForTask() { |
24 MonitorLocker monitor(&monitor_); | 27 MonitorLocker monitor(&monitor_); |
28 if (terminate_ && !drain_) return NULL; | |
25 TaskQueueEntry* result = head_; | 29 TaskQueueEntry* result = head_; |
26 while (result == NULL) { | 30 while (result == NULL) { |
27 if (terminate_) { | 31 if (terminate_ && (!drain_ || result == NULL)) { |
Mads Ager (google)
2012/01/24 12:29:39
I don't understand this condition. Shouldn't this
Søren Gjesse
2012/01/24 13:04:35
Good catch! result would always be NULL here.
Mov
| |
28 return NULL; | 32 return NULL; |
29 } | 33 } |
30 monitor.Wait(); | 34 monitor.Wait(); |
31 if (terminate_) { | |
32 return NULL; | |
33 } | |
34 result = head_; | 35 result = head_; |
35 } | 36 } |
36 head_ = result->next(); | 37 head_ = result->next(); |
37 ASSERT(head_ != NULL || tail_ == result); | 38 ASSERT(head_ != NULL || tail_ == result); |
38 return result; | 39 return result; |
39 } | 40 } |
40 | 41 |
41 | 42 |
42 void TaskQueue::Shutdown() { | |
43 MonitorLocker monitor(&monitor_); | |
44 terminate_ = true; | |
45 monitor.NotifyAll(); | |
46 } | |
47 | |
48 | |
49 void ThreadPool::InsertTask(Task task) { | |
50 TaskQueueEntry* entry = new TaskQueueEntry(task); | |
51 queue_.Insert(entry); | |
52 } | |
53 | |
54 | |
55 Task ThreadPool::WaitForTask() { | |
56 TaskQueueEntry* entry = queue_.Remove(); | |
57 if (entry == NULL) { | |
58 return NULL; | |
59 } | |
60 Task task = entry->task(); | |
61 delete entry; | |
62 return task; | |
63 } | |
64 | |
65 | |
66 void ThreadPool::Start() { | 43 void ThreadPool::Start() { |
67 MonitorLocker monitor(&monitor_); | 44 MonitorLocker monitor(&monitor_); |
68 for (int i = 0; i < size_; i++) { | 45 terminate_ = false; |
46 drain_ = false; | |
47 for (int i = 0; i < initial_size_; i++) { | |
69 int result = dart::Thread::Start(&ThreadPool::Main, | 48 int result = dart::Thread::Start(&ThreadPool::Main, |
70 reinterpret_cast<uword>(this)); | 49 reinterpret_cast<uword>(this)); |
71 if (result != 0) { | 50 if (result != 0) { |
72 FATAL1("Failed to start thread pool thread %d", result); | 51 FATAL1("Failed to start thread pool thread %d", result); |
73 } | 52 } |
53 size_++; | |
74 } | 54 } |
75 } | 55 } |
76 | 56 |
77 | 57 |
78 void ThreadPool::Shutdown() { | 58 void ThreadPool::Shutdown(bool drain) { |
59 MonitorLocker monitor(&monitor_); | |
79 terminate_ = true; | 60 terminate_ = true; |
80 queue_.Shutdown(); | 61 drain_ = drain; |
81 MonitorLocker monitor(&monitor_); | 62 monitor.NotifyAll(); |
82 while (size_ > 0) { | 63 while (size_ > 0) { |
83 monitor.Wait(); | 64 monitor.Wait(); |
84 } | 65 } |
85 } | 66 } |
86 | 67 |
87 | 68 |
88 void ThreadPool::ThreadTerminated() { | 69 void ThreadPool::ThreadTerminated() { |
89 MonitorLocker monitor(&monitor_); | 70 MonitorLocker monitor(&monitor_); |
90 size_--; | 71 size_--; |
91 monitor.Notify(); | 72 monitor.Notify(); |
92 } | 73 } |
93 | 74 |
94 | 75 |
95 void ThreadPool::Main(uword args) { | 76 void ThreadPool::Main(uword args) { |
96 if (Dart_IsVMFlagSet("trace_thread_pool")) { | 77 if (Dart_IsVMFlagSet("trace_thread_pool")) { |
97 printf("Thread pool thread started\n"); | 78 printf("Thread pool thread started\n"); |
98 } | 79 } |
99 ThreadPool* pool = reinterpret_cast<ThreadPool*>(args); | 80 ThreadPool* pool = reinterpret_cast<ThreadPool*>(args); |
100 while (!pool->terminate_) { | 81 while (true) { |
101 if (Dart_IsVMFlagSet("trace_thread_pool")) { | 82 if (Dart_IsVMFlagSet("trace_thread_pool")) { |
102 printf("Waiting for task\n"); | 83 printf("Waiting for task\n"); |
103 } | 84 } |
104 Task task = pool->WaitForTask(); | 85 TaskQueueEntry* task = pool->WaitForTask(); |
105 if (pool->terminate_) break; | 86 if (task == NULL) break; |
106 (*(pool->task_handler_))(task); | 87 (*(pool->task_handler_))(task->task()); |
107 } | 88 } |
108 pool->ThreadTerminated(); | 89 pool->ThreadTerminated(); |
109 }; | 90 }; |
OLD | NEW |