OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 #include "vm/thread_pool.h" | |
6 | |
7 namespace dart { | |
8 | |
9 DEFINE_FLAG(int, worker_timeout_millis, 5000, | |
10 "Free workers when they have been idle for this amount of time."); | |
11 | |
12 ThreadPool::ThreadPool() | |
13 : mutex_(new Mutex()), | |
14 done_(false), | |
15 idle_workers_(NULL), | |
16 running_workers_(NULL), | |
17 workers_started_(0), | |
18 workers_stopped_(0) { | |
19 } | |
20 | |
21 | |
22 ThreadPool::~ThreadPool() { | |
23 Shutdown(); | |
24 delete mutex_; | |
25 mutex_ = NULL; | |
26 } | |
27 | |
28 | |
29 void ThreadPool::Run(Task* task) { | |
30 Worker* worker = NULL; | |
31 { | |
32 MutexLocker ml(mutex_); | |
33 if (done_) { | |
34 return; | |
35 } | |
36 worker = GetIdleWorker(); | |
37 } | |
38 // Release ThreadPool::mutex_ before calling Worker functions. | |
39 worker->Run(task); | |
40 } | |
41 | |
42 | |
43 void ThreadPool::Shutdown() { | |
44 Worker* idle_workers = NULL; | |
45 Worker* running_workers = NULL; | |
46 { | |
47 MutexLocker ml(mutex_); | |
48 done_ = true; | |
49 idle_workers = idle_workers_; | |
50 idle_workers_ = NULL; | |
51 running_workers = running_workers_; | |
52 running_workers = NULL; | |
53 } | |
54 // Release ThreadPool::mutex_ before calling Worker functions. | |
55 ShutdownWorkerList(idle_workers); | |
56 ShutdownWorkerList(running_workers); | |
57 } | |
58 | |
59 | |
60 void ThreadPool::RemoveWorkerFromList(Worker* worker, Worker** head) { | |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL && head != NULL);
turnidge
2012/03/06 22:34:01
Function removed, but added assert in some similar
| |
61 Worker* next = worker->next_; | |
62 Worker* prev = worker->prev_; | |
63 if (next) { | |
64 next->prev_ = prev; | |
65 } | |
66 if (prev) { | |
67 ASSERT(*head != worker); | |
68 prev->next_ = next; | |
69 } else { | |
70 // This worker is at the head of the list. | |
71 ASSERT(*head == worker); | |
72 ASSERT(worker->prev_ == NULL); | |
73 *head = next; | |
74 } | |
75 worker->next_ = NULL; | |
76 worker->prev_ = NULL; | |
77 } | |
78 | |
79 | |
80 void ThreadPool::AddWorkerToList(Worker* worker, Worker** head) { | |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL && head != NULL);
turnidge
2012/03/06 22:34:01
Ditto above.
| |
81 ASSERT(worker->next_ == NULL && worker->prev_ == NULL); | |
82 worker->next_ = *head; | |
83 worker->prev_ = NULL; | |
84 if (*head) { | |
85 (*head)->prev_ = worker; | |
86 } | |
87 *head = worker; | |
88 } | |
89 | |
90 | |
91 ThreadPool::Worker* ThreadPool::GetIdleWorker() { | |
siva
2012/03/06 05:07:00
Missing MutexLocker ml(mutex_); ?
turnidge
2012/03/06 22:34:01
We are already holding the lock. The only caller
| |
92 ASSERT(!done_); | |
93 Worker* worker = NULL; | |
94 if (idle_workers_ == NULL) { | |
95 worker = new Worker(this); | |
siva
2012/03/06 05:07:00
ASSERT(worker != NULL);
turnidge
2012/03/06 22:34:01
Should I be doing this on every "new" then?
| |
96 Thread::Start(&ThreadPool::Main, reinterpret_cast<uword>(worker)); | |
97 workers_started_++; | |
98 } else { | |
99 // Get the first worker from the idle worker list. | |
100 worker = idle_workers_; | |
101 RemoveWorkerFromList(worker, &idle_workers_); | |
102 } | |
103 // Move worker to the running list. | |
104 AddWorkerToList(worker, &running_workers_); | |
105 return worker; | |
106 } | |
107 | |
108 | |
109 void ThreadPool::SetIdle(Worker* worker) { | |
110 MutexLocker ml(mutex_); | |
111 ASSERT(!done_); | |
siva
2012/03/06 05:07:00
Instead of asserting shouldn't you just check for
turnidge
2012/03/06 22:34:01
Fixed.
| |
112 | |
113 RemoveWorkerFromList(worker, &running_workers_); | |
114 AddWorkerToList(worker, &idle_workers_); | |
115 } | |
116 | |
117 | |
118 bool ThreadPool::RemoveIdleWorker(Worker* worker) { | |
119 MutexLocker ml(mutex_); | |
120 ASSERT(!done_); | |
siva
2012/03/06 05:07:00
Instead of asserting for !done_ here shouldn't you
turnidge
2012/03/06 22:34:01
Fixed.
| |
121 | |
122 // We only want to remove a worker if it is idle. | |
123 for (Worker* current = idle_workers_; | |
124 current != NULL; | |
125 current = current->next_) { | |
126 if (current == worker) { | |
127 // We found this worker in the idle list. Remove it. | |
128 RemoveWorkerFromList(worker, &idle_workers_); | |
129 workers_stopped_++; | |
130 return true; | |
131 } | |
132 } | |
133 return false; | |
134 } | |
135 | |
136 | |
137 void ThreadPool::ShutdownWorkerList(Worker* head) { | |
138 Worker* worker = head; | |
139 while (worker != NULL) { | |
140 Worker* next = worker->next_; | |
141 worker->prev_ = NULL; | |
142 worker->next_ = NULL; | |
143 worker->Shutdown(); | |
144 worker = next; | |
145 } | |
146 } | |
147 | |
148 | |
149 void ThreadPool::Main(uword args) { | |
150 Worker* worker = reinterpret_cast<Worker*>(args); | |
151 worker->Loop(); | |
152 ASSERT(worker->next_ == NULL && worker->prev_ == NULL); | |
153 delete worker; | |
154 } | |
155 | |
156 | |
157 ThreadPool::Task::Task() { | |
158 } | |
159 | |
160 | |
161 ThreadPool::Task::~Task() { | |
162 } | |
163 | |
164 | |
165 ThreadPool::Worker::Worker(ThreadPool* pool) | |
166 : monitor_(new Monitor()), | |
167 pool_(pool), | |
168 task_(NULL), | |
169 done_(false), | |
170 next_(NULL), | |
171 prev_(NULL) { | |
172 } | |
173 | |
174 | |
175 ThreadPool::Worker::~Worker() { | |
176 delete monitor_; | |
177 monitor_ = NULL; | |
178 } | |
179 | |
180 | |
181 void ThreadPool::Worker::Run(Task* task) { | |
182 MonitorLocker ml(monitor_); | |
183 ASSERT(task_ == NULL); | |
184 task_ = task; | |
185 ml.Notify(); | |
186 } | |
187 | |
188 | |
189 void ThreadPool::Worker::Loop() { | |
190 MonitorLocker ml(monitor_); | |
191 | |
192 // Wait for first task. | |
193 while (task_ == NULL) { | |
194 ml.Wait(); | |
195 if (done_) { | |
196 return; | |
197 } | |
198 } | |
siva
2012/03/06 05:07:00
Do we need this initial while loop, there is an im
turnidge
2012/03/06 22:34:01
I moved the call to Thread::Start to be under lock
| |
199 | |
200 int64_t idle_start = 0; | |
201 while (true) { | |
202 Task* task = task_; | |
203 task_ = NULL; | |
204 | |
205 // Release monitor while handling the task. | |
206 monitor_->Exit(); | |
207 task->Run(); | |
208 delete task; | |
209 monitor_->Enter(); | |
210 | |
211 if (done_) { | |
212 return; | |
213 } | |
214 ASSERT(task_ == NULL); | |
215 | |
216 pool_->SetIdle(this); | |
217 idle_start = OS::GetCurrentTimeMillis(); | |
218 do { | |
219 ml.Wait(FLAG_worker_timeout_millis); | |
siva
2012/03/06 05:07:00
Monitor::WaitResult result = ml.Wait(...);
and yo
turnidge
2012/03/06 22:34:01
Done.
| |
220 if (done_) { | |
221 return; | |
222 } | |
223 if (task_ == NULL && FLAG_worker_timeout_millis > 0) { | |
224 bool timeout = | |
225 (OS::GetCurrentTimeMillis() - idle_start) > | |
226 FLAG_worker_timeout_millis; | |
227 if (timeout && pool_->RemoveIdleWorker(this)) { | |
228 return; | |
229 } | |
230 } | |
231 } while (task_ == NULL); | |
232 } | |
233 UNREACHABLE(); | |
234 } | |
235 | |
236 | |
237 void ThreadPool::Worker::Shutdown() { | |
238 MonitorLocker ml(monitor_); | |
239 done_ = true; | |
240 ml.Notify(); | |
241 } | |
242 | |
243 | |
244 } // namespace dart | |
OLD | NEW |