Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 #include "vm/thread_pool.h"
6
7 namespace dart {
8
9 DEFINE_FLAG(int, worker_timeout_millis, 5000,
10 "Free workers when they have been idle for this amount of time.");
11
12 ThreadPool::ThreadPool()
13 : shutting_down_(false),
14 all_workers_(NULL),
15 idle_workers_(NULL),
16 count_started_(0),
17 count_stopped_(0),
18 count_running_(0),
19 count_idle_(0) {
20 }
21
22
23 ThreadPool::~ThreadPool() {
24 Shutdown();
25 }
26
27
28 void ThreadPool::Run(Task* task) {
29 Worker* worker = NULL;
30 bool new_worker = false;
31 {
Ivan Posva 2012/03/14 18:51:26 // Grab ThreadPool::mutex_ before touching queues
turnidge 2012/03/14 21:00:27 Done.
32 MutexLocker ml(&mutex_);
33 if (shutting_down_) {
34 return;
35 }
36 if (idle_workers_ == NULL) {
37 worker = new Worker(this);
38 ASSERT(worker != NULL);
39 new_worker = true;
40 count_started_++;
41
42 // Add worker to the all_workers_ list.
43 worker->all_next_ = all_workers_;
44 all_workers_ = worker;
45 worker->owned_ = true;
46 } else {
47 // Get the first worker from the idle worker list.
48 worker = idle_workers_;
49 idle_workers_ = worker->idle_next_;
50 worker->idle_next_ = NULL;
51 count_idle_--;
52 }
53 count_running_++;
54 }
55 // Release ThreadPool::mutex_ before calling Worker functions.
56 ASSERT(worker != NULL);
57 worker->Run(task);
58 if (new_worker) {
59 // Call StartThread after we've assigned the first t
siva 2012/03/14 17:36:09 first task.
Ivan Posva 2012/03/14 18:51:26 Comment cutoff.
turnidge 2012/03/14 21:00:27 Done.
60 worker->StartThread();
61 }
62 }
63
64
65 void ThreadPool::Shutdown() {
66 Worker* saved = NULL;
67 {
68 MutexLocker ml(&mutex_);
69 shutting_down_ = true;
70 saved = all_workers_;
71 all_workers_ = NULL;
72 idle_workers_ = NULL;
73
74 Worker* current = saved;
75 while (current != NULL) {
76 Worker* next = current->all_next_;
77 current->idle_next_ = NULL;
78 current->owned_ = false;
79 current = next;
80 }
81
82 count_idle_ = 0;
83 count_running_ = 0;
84 }
85 // Release ThreadPool::mutex_ before calling Worker functions.
86
87 Worker* current = saved;
88 while (current != NULL) {
89 Worker* next = current->all_next_;
Ivan Posva 2012/03/14 18:51:26 There is unprotected access to fields marked as pr
turnidge 2012/03/14 21:00:27 Done.
90 current->all_next_ = NULL;
91 current->Shutdown();
92 current = next;
93 }
94 }
95
96
97 bool ThreadPool::IsIdle(Worker* worker) {
98 ASSERT(worker != NULL && worker->owned_);
99 for (Worker* current = idle_workers_;
100 current != NULL;
101 current = current->idle_next_) {
102 if (current == worker) {
103 return true;
104 }
105 }
106 return false;
107 }
108
109
110 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
111 ASSERT(worker != NULL && worker->owned_);
112 if (idle_workers_ == NULL) {
113 return false;
114 }
115
116 // Special case head of list.
117 if (idle_workers_ == worker) {
118 idle_workers_ = worker->idle_next_;
119 worker->idle_next_ = NULL;
120 return true;
121 }
122
123 for (Worker* current = idle_workers_;
124 current->idle_next_ != NULL;
125 current = current->idle_next_) {
126 if (current->idle_next_ == worker) {
127 current->idle_next_ = worker->idle_next_;
128 worker->idle_next_ = NULL;
129 return true;
130 }
131 }
132 return false;
133 }
134
135
136 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
137 ASSERT(worker != NULL && worker->owned_);
138 if (all_workers_ == NULL) {
139 return false;
140 }
141
142 // Special case head of list.
143 if (all_workers_ == worker) {
144 all_workers_ = worker->all_next_;
145 worker->all_next_ = NULL;
146 worker->owned_ = false;
Ivan Posva 2012/03/14 18:51:26 worker->pool_ = NULL; ?
turnidge 2012/03/14 21:00:27 Done.
147 return true;
148 }
149
150 for (Worker* current = all_workers_;
151 current->all_next_ != NULL;
152 current = current->all_next_) {
153 if (current->all_next_ == worker) {
154 current->all_next_ = worker->all_next_;
155 worker->all_next_ = NULL;
156 worker->owned_ = false;
157 return true;
158 }
159 }
160 return false;
161 }
162
163
164 void ThreadPool::SetIdle(Worker* worker) {
165 MutexLocker ml(&mutex_);
166 if (shutting_down_) {
167 return;
168 }
169 ASSERT(worker->owned_ && !IsIdle(worker));
170 worker->idle_next_ = idle_workers_;
171 idle_workers_ = worker;
172 count_idle_++;
173 count_running_--;
174 }
175
176
177 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
178 MutexLocker ml(&mutex_);
179 if (shutting_down_) {
180 return false;
181 }
182 // Remove from idle list.
183 if (!RemoveWorkerFromIdleList(worker)) {
184 return false;
185 }
186 // Remove from all list.
187 bool found = RemoveWorkerFromAllList(worker);
188 ASSERT(found);
189
190 count_stopped_++;
191 count_idle_--;
siva 2012/03/14 17:36:09 Will these counts get updated when we are doing a
turnidge 2012/03/14 21:00:27 count_started_ == The number of workers started.
192 return true;
193 }
194
195
196 ThreadPool::Task::Task() {
197 }
198
199
200 ThreadPool::Task::~Task() {
201 }
202
203
204 ThreadPool::Worker::Worker(ThreadPool* pool)
205 : pool_(pool),
206 task_(NULL),
207 done_(false),
208 owned_(false),
209 all_next_(NULL),
210 idle_next_(NULL) {
211 }
212
213
214 void ThreadPool::Worker::StartThread() {
215 #if defined(DEBUG)
216 // Must call Run before StartThread.
217 { // NOLINT
Ivan Posva 2012/03/14 18:51:26 NOLINT?
turnidge 2012/03/14 21:00:27 Yeah. The linter seems to be confused that the {
218 MonitorLocker ml(&monitor_);
219 ASSERT(task_ != NULL);
220 }
221 #endif
222 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
223 }
224
225
226 void ThreadPool::Worker::Run(Task* task) {
227 MonitorLocker ml(&monitor_);
228 ASSERT(task_ == NULL);
229 task_ = task;
230 ml.Notify();
231 }
232
233
234 static int64_t ComputeTimeout(int64_t idle_start) {
235 if (FLAG_worker_timeout_millis <= 0) {
236 // No timeout.
237 return 0;
238 } else {
239 int64_t waited = OS::GetCurrentTimeMillis() - idle_start;
240 if (waited >= FLAG_worker_timeout_millis) {
241 // We must have gotten a spurious wakeup just before we timed
242 // out. Give the worker one last desperate chance to live. We
243 // are merciful.
244 return 1;
245 } else {
246 return FLAG_worker_timeout_millis - waited;
247 }
248 }
249 }
250
251
252 void ThreadPool::Worker::Loop() {
253 MonitorLocker ml(&monitor_);
254 if (done_) {
255 return;
256 }
Ivan Posva 2012/03/14 18:51:26 remove
turnidge 2012/03/14 21:00:27 Done.
257
258 int64_t idle_start;
259 while (true) {
260 ASSERT(task_ != NULL);
261 Task* task = task_;
262 task_ = NULL;
263
264 // Release monitor while handling the task.
265 monitor_.Exit();
266 task->Run();
267 delete task;
268 monitor_.Enter();
269
270 if (done_) {
271 return;
272 }
273 ASSERT(task_ == NULL);
Ivan Posva 2012/03/14 18:51:26 Move up after the monitor enter.
turnidge 2012/03/14 21:00:27 Done.
274 ASSERT(pool_ != NULL);
275 pool_->SetIdle(this);
276 idle_start = OS::GetCurrentTimeMillis();
277 while (true) {
278 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
279 if (task_ != NULL) {
280 // We've found a task. Process it, regardless of whether the
281 // worker is done_.
282 break;
283 }
284 if (done_) {
285 return;
286 }
287 if (result == Monitor::kTimedOut &&
288 pool_->ReleaseIdleWorker(this)) {
289 return;
290 }
291 }
292 }
293 UNREACHABLE();
294 }
295
296
297 void ThreadPool::Worker::Shutdown() {
298 MonitorLocker ml(&monitor_);
299 done_ = true;
300 pool_ = NULL; // Fail fast if someone tries to access pool_.
301 ml.Notify();
302 }
303
304
305 // static
306 void ThreadPool::Worker::Main(uword args) {
307 Worker* worker = reinterpret_cast<Worker*>(args);
308 worker->Loop();
309
310 // It should be okay to access these unlocked here in this assert.
311 ASSERT(!worker->owned_ &&
312 worker->all_next_ == NULL &&
313 worker->idle_next_ == NULL);
314 delete worker;
315 }
316
317 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698