OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 #include "vm/thread_pool.h" | |
6 | |
7 namespace dart { | |
8 | |
9 DEFINE_FLAG(int, worker_timeout_millis, 5000, | |
10 "Free workers when they have been idle for this amount of time."); | |
11 | |
12 ThreadPool::ThreadPool() | |
13 : shutting_down_(false), | |
14 all_workers_(NULL), | |
15 idle_workers_(NULL), | |
16 count_started_(0), | |
17 count_stopped_(0), | |
18 count_running_(0), | |
19 count_idle_(0) { | |
20 } | |
21 | |
22 | |
23 ThreadPool::~ThreadPool() { | |
24 Shutdown(); | |
25 } | |
26 | |
27 | |
28 void ThreadPool::Run(Task* task) { | |
29 Worker* worker = NULL; | |
30 bool new_worker = false; | |
31 { | |
Ivan Posva
2012/03/14 18:51:26
// Grab ThreadPool::mutex_ before touching queues
turnidge
2012/03/14 21:00:27
Done.
| |
32 MutexLocker ml(&mutex_); | |
33 if (shutting_down_) { | |
34 return; | |
35 } | |
36 if (idle_workers_ == NULL) { | |
37 worker = new Worker(this); | |
38 ASSERT(worker != NULL); | |
39 new_worker = true; | |
40 count_started_++; | |
41 | |
42 // Add worker to the all_workers_ list. | |
43 worker->all_next_ = all_workers_; | |
44 all_workers_ = worker; | |
45 worker->owned_ = true; | |
46 } else { | |
47 // Get the first worker from the idle worker list. | |
48 worker = idle_workers_; | |
49 idle_workers_ = worker->idle_next_; | |
50 worker->idle_next_ = NULL; | |
51 count_idle_--; | |
52 } | |
53 count_running_++; | |
54 } | |
55 // Release ThreadPool::mutex_ before calling Worker functions. | |
56 ASSERT(worker != NULL); | |
57 worker->Run(task); | |
58 if (new_worker) { | |
59 // Call StartThread after we've assigned the first t | |
siva
2012/03/14 17:36:09
first task.
Ivan Posva
2012/03/14 18:51:26
Comment cutoff.
turnidge
2012/03/14 21:00:27
Done.
| |
60 worker->StartThread(); | |
61 } | |
62 } | |
63 | |
64 | |
65 void ThreadPool::Shutdown() { | |
66 Worker* saved = NULL; | |
67 { | |
68 MutexLocker ml(&mutex_); | |
69 shutting_down_ = true; | |
70 saved = all_workers_; | |
71 all_workers_ = NULL; | |
72 idle_workers_ = NULL; | |
73 | |
74 Worker* current = saved; | |
75 while (current != NULL) { | |
76 Worker* next = current->all_next_; | |
77 current->idle_next_ = NULL; | |
78 current->owned_ = false; | |
79 current = next; | |
80 } | |
81 | |
82 count_idle_ = 0; | |
83 count_running_ = 0; | |
84 } | |
85 // Release ThreadPool::mutex_ before calling Worker functions. | |
86 | |
87 Worker* current = saved; | |
88 while (current != NULL) { | |
89 Worker* next = current->all_next_; | |
Ivan Posva
2012/03/14 18:51:26
There is unprotected access to fields marked as pr
turnidge
2012/03/14 21:00:27
Done.
| |
90 current->all_next_ = NULL; | |
91 current->Shutdown(); | |
92 current = next; | |
93 } | |
94 } | |
95 | |
96 | |
97 bool ThreadPool::IsIdle(Worker* worker) { | |
98 ASSERT(worker != NULL && worker->owned_); | |
99 for (Worker* current = idle_workers_; | |
100 current != NULL; | |
101 current = current->idle_next_) { | |
102 if (current == worker) { | |
103 return true; | |
104 } | |
105 } | |
106 return false; | |
107 } | |
108 | |
109 | |
110 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) { | |
111 ASSERT(worker != NULL && worker->owned_); | |
112 if (idle_workers_ == NULL) { | |
113 return false; | |
114 } | |
115 | |
116 // Special case head of list. | |
117 if (idle_workers_ == worker) { | |
118 idle_workers_ = worker->idle_next_; | |
119 worker->idle_next_ = NULL; | |
120 return true; | |
121 } | |
122 | |
123 for (Worker* current = idle_workers_; | |
124 current->idle_next_ != NULL; | |
125 current = current->idle_next_) { | |
126 if (current->idle_next_ == worker) { | |
127 current->idle_next_ = worker->idle_next_; | |
128 worker->idle_next_ = NULL; | |
129 return true; | |
130 } | |
131 } | |
132 return false; | |
133 } | |
134 | |
135 | |
136 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) { | |
137 ASSERT(worker != NULL && worker->owned_); | |
138 if (all_workers_ == NULL) { | |
139 return false; | |
140 } | |
141 | |
142 // Special case head of list. | |
143 if (all_workers_ == worker) { | |
144 all_workers_ = worker->all_next_; | |
145 worker->all_next_ = NULL; | |
146 worker->owned_ = false; | |
Ivan Posva
2012/03/14 18:51:26
worker->pool_ = NULL; ?
turnidge
2012/03/14 21:00:27
Done.
| |
147 return true; | |
148 } | |
149 | |
150 for (Worker* current = all_workers_; | |
151 current->all_next_ != NULL; | |
152 current = current->all_next_) { | |
153 if (current->all_next_ == worker) { | |
154 current->all_next_ = worker->all_next_; | |
155 worker->all_next_ = NULL; | |
156 worker->owned_ = false; | |
157 return true; | |
158 } | |
159 } | |
160 return false; | |
161 } | |
162 | |
163 | |
164 void ThreadPool::SetIdle(Worker* worker) { | |
165 MutexLocker ml(&mutex_); | |
166 if (shutting_down_) { | |
167 return; | |
168 } | |
169 ASSERT(worker->owned_ && !IsIdle(worker)); | |
170 worker->idle_next_ = idle_workers_; | |
171 idle_workers_ = worker; | |
172 count_idle_++; | |
173 count_running_--; | |
174 } | |
175 | |
176 | |
177 bool ThreadPool::ReleaseIdleWorker(Worker* worker) { | |
178 MutexLocker ml(&mutex_); | |
179 if (shutting_down_) { | |
180 return false; | |
181 } | |
182 // Remove from idle list. | |
183 if (!RemoveWorkerFromIdleList(worker)) { | |
184 return false; | |
185 } | |
186 // Remove from all list. | |
187 bool found = RemoveWorkerFromAllList(worker); | |
188 ASSERT(found); | |
189 | |
190 count_stopped_++; | |
191 count_idle_--; | |
siva
2012/03/14 17:36:09
Will these counts get updated when we are doing a
turnidge
2012/03/14 21:00:27
count_started_ == The number of workers started.
| |
192 return true; | |
193 } | |
194 | |
195 | |
196 ThreadPool::Task::Task() { | |
197 } | |
198 | |
199 | |
200 ThreadPool::Task::~Task() { | |
201 } | |
202 | |
203 | |
204 ThreadPool::Worker::Worker(ThreadPool* pool) | |
205 : pool_(pool), | |
206 task_(NULL), | |
207 done_(false), | |
208 owned_(false), | |
209 all_next_(NULL), | |
210 idle_next_(NULL) { | |
211 } | |
212 | |
213 | |
214 void ThreadPool::Worker::StartThread() { | |
215 #if defined(DEBUG) | |
216 // Must call Run before StartThread. | |
217 { // NOLINT | |
Ivan Posva
2012/03/14 18:51:26
NOLINT?
turnidge
2012/03/14 21:00:27
Yeah. The linter seems to be confused that the {
| |
218 MonitorLocker ml(&monitor_); | |
219 ASSERT(task_ != NULL); | |
220 } | |
221 #endif | |
222 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this)); | |
223 } | |
224 | |
225 | |
226 void ThreadPool::Worker::Run(Task* task) { | |
227 MonitorLocker ml(&monitor_); | |
228 ASSERT(task_ == NULL); | |
229 task_ = task; | |
230 ml.Notify(); | |
231 } | |
232 | |
233 | |
234 static int64_t ComputeTimeout(int64_t idle_start) { | |
235 if (FLAG_worker_timeout_millis <= 0) { | |
236 // No timeout. | |
237 return 0; | |
238 } else { | |
239 int64_t waited = OS::GetCurrentTimeMillis() - idle_start; | |
240 if (waited >= FLAG_worker_timeout_millis) { | |
241 // We must have gotten a spurious wakeup just before we timed | |
242 // out. Give the worker one last desperate chance to live. We | |
243 // are merciful. | |
244 return 1; | |
245 } else { | |
246 return FLAG_worker_timeout_millis - waited; | |
247 } | |
248 } | |
249 } | |
250 | |
251 | |
252 void ThreadPool::Worker::Loop() { | |
253 MonitorLocker ml(&monitor_); | |
254 if (done_) { | |
255 return; | |
256 } | |
Ivan Posva
2012/03/14 18:51:26
remove
turnidge
2012/03/14 21:00:27
Done.
| |
257 | |
258 int64_t idle_start; | |
259 while (true) { | |
260 ASSERT(task_ != NULL); | |
261 Task* task = task_; | |
262 task_ = NULL; | |
263 | |
264 // Release monitor while handling the task. | |
265 monitor_.Exit(); | |
266 task->Run(); | |
267 delete task; | |
268 monitor_.Enter(); | |
269 | |
270 if (done_) { | |
271 return; | |
272 } | |
273 ASSERT(task_ == NULL); | |
Ivan Posva
2012/03/14 18:51:26
Move up after the monitor enter.
turnidge
2012/03/14 21:00:27
Done.
| |
274 ASSERT(pool_ != NULL); | |
275 pool_->SetIdle(this); | |
276 idle_start = OS::GetCurrentTimeMillis(); | |
277 while (true) { | |
278 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start)); | |
279 if (task_ != NULL) { | |
280 // We've found a task. Process it, regardless of whether the | |
281 // worker is done_. | |
282 break; | |
283 } | |
284 if (done_) { | |
285 return; | |
286 } | |
287 if (result == Monitor::kTimedOut && | |
288 pool_->ReleaseIdleWorker(this)) { | |
289 return; | |
290 } | |
291 } | |
292 } | |
293 UNREACHABLE(); | |
294 } | |
295 | |
296 | |
297 void ThreadPool::Worker::Shutdown() { | |
298 MonitorLocker ml(&monitor_); | |
299 done_ = true; | |
300 pool_ = NULL; // Fail fast if someone tries to access pool_. | |
301 ml.Notify(); | |
302 } | |
303 | |
304 | |
305 // static | |
306 void ThreadPool::Worker::Main(uword args) { | |
307 Worker* worker = reinterpret_cast<Worker*>(args); | |
308 worker->Loop(); | |
309 | |
310 // It should be okay to access these unlocked here in this assert. | |
311 ASSERT(!worker->owned_ && | |
312 worker->all_next_ == NULL && | |
313 worker->idle_next_ == NULL); | |
314 delete worker; | |
315 } | |
316 | |
317 } // namespace dart | |
OLD | NEW |