Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(563)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 #include "vm/thread_pool.h"
6
7 namespace dart {
8
9 DEFINE_FLAG(int, worker_timeout_millis, 5000,
10 "Free workers when they have been idle for this amount of time.");
11
12 Monitor* ThreadPool::exit_monitor_ = NULL;
13 int* ThreadPool::exit_count_ = NULL;
14
15 ThreadPool::ThreadPool()
16 : shutting_down_(false),
17 all_workers_(NULL),
18 idle_workers_(NULL),
19 count_started_(0),
20 count_stopped_(0),
21 count_running_(0),
22 count_idle_(0) {
23 }
24
25
26 ThreadPool::~ThreadPool() {
27 Shutdown();
28 }
29
30
31 void ThreadPool::Run(Task* task) {
32 Worker* worker = NULL;
33 bool new_worker = false;
34 {
35 // We need ThreadPool::mutex_ to access worker lists and other
36 // ThreadPool state.
37 MutexLocker ml(&mutex_);
38 if (shutting_down_) {
39 return;
40 }
41 if (idle_workers_ == NULL) {
42 worker = new Worker(this);
43 ASSERT(worker != NULL);
44 new_worker = true;
45 count_started_++;
46
47 // Add worker to the all_workers_ list.
48 worker->all_next_ = all_workers_;
49 all_workers_ = worker;
50 worker->owned_ = true;
51 } else {
52 // Get the first worker from the idle worker list.
53 worker = idle_workers_;
54 idle_workers_ = worker->idle_next_;
55 worker->idle_next_ = NULL;
56 count_idle_--;
57 }
58 count_running_++;
59 }
60 // Release ThreadPool::mutex_ before calling Worker functions.
61 ASSERT(worker != NULL);
62 worker->SetTask(task);
63 if (new_worker) {
64 // Call StartThread after we've assigned the first task.
65 worker->StartThread();
66 }
67 }
68
69
70 void ThreadPool::Shutdown() {
71 Worker* saved = NULL;
72 {
73 MutexLocker ml(&mutex_);
74 shutting_down_ = true;
75 saved = all_workers_;
76 all_workers_ = NULL;
77 idle_workers_ = NULL;
78
79 Worker* current = saved;
80 while (current != NULL) {
81 Worker* next = current->all_next_;
82 current->idle_next_ = NULL;
83 current->owned_ = false;
84 current = next;
85 count_stopped_++;
86 }
87
88 count_idle_ = 0;
89 count_running_ = 0;
90 ASSERT(count_started_ == count_stopped_);
91 }
92 // Release ThreadPool::mutex_ before calling Worker functions.
93
94 Worker* current = saved;
95 while (current != NULL) {
96 // We may access all_next_ without holding ThreadPool::mutex_ here
97 // because the worker is no longer owned by the ThreadPool.
98 Worker* next = current->all_next_;
99 current->all_next_ = NULL;
100 current->Shutdown();
101 current = next;
102 }
103 }
104
105
106 bool ThreadPool::IsIdle(Worker* worker) {
107 ASSERT(worker != NULL && worker->owned_);
108 for (Worker* current = idle_workers_;
109 current != NULL;
110 current = current->idle_next_) {
111 if (current == worker) {
112 return true;
113 }
114 }
115 return false;
116 }
117
118
119 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
120 ASSERT(worker != NULL && worker->owned_);
121 if (idle_workers_ == NULL) {
122 return false;
123 }
124
125 // Special case head of list.
126 if (idle_workers_ == worker) {
127 idle_workers_ = worker->idle_next_;
128 worker->idle_next_ = NULL;
129 return true;
130 }
131
132 for (Worker* current = idle_workers_;
133 current->idle_next_ != NULL;
134 current = current->idle_next_) {
135 if (current->idle_next_ == worker) {
136 current->idle_next_ = worker->idle_next_;
137 worker->idle_next_ = NULL;
138 return true;
139 }
140 }
141 return false;
142 }
143
144
145 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
146 ASSERT(worker != NULL && worker->owned_);
147 if (all_workers_ == NULL) {
148 return false;
149 }
150
151 // Special case head of list.
152 if (all_workers_ == worker) {
153 all_workers_ = worker->all_next_;
154 worker->all_next_ = NULL;
155 worker->owned_ = false;
156 worker->pool_ = NULL;
157 return true;
158 }
159
160 for (Worker* current = all_workers_;
161 current->all_next_ != NULL;
162 current = current->all_next_) {
163 if (current->all_next_ == worker) {
164 current->all_next_ = worker->all_next_;
165 worker->all_next_ = NULL;
166 worker->owned_ = false;
167 return true;
168 }
169 }
170 return false;
171 }
172
173
174 void ThreadPool::SetIdle(Worker* worker) {
175 MutexLocker ml(&mutex_);
176 if (shutting_down_) {
177 return;
178 }
179 ASSERT(worker->owned_ && !IsIdle(worker));
180 worker->idle_next_ = idle_workers_;
181 idle_workers_ = worker;
182 count_idle_++;
183 count_running_--;
184 }
185
186
187 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
188 MutexLocker ml(&mutex_);
189 if (shutting_down_) {
190 return false;
191 }
192 // Remove from idle list.
193 if (!RemoveWorkerFromIdleList(worker)) {
194 return false;
195 }
196 // Remove from all list.
197 bool found = RemoveWorkerFromAllList(worker);
198 ASSERT(found);
199
200 count_stopped_++;
201 count_idle_--;
202 return true;
203 }
204
205
206 ThreadPool::Task::Task() {
207 }
208
209
210 ThreadPool::Task::~Task() {
211 }
212
213
214 ThreadPool::Worker::Worker(ThreadPool* pool)
215 : pool_(pool),
216 task_(NULL),
217 owned_(false),
218 all_next_(NULL),
219 idle_next_(NULL) {
220 }
221
222
223 void ThreadPool::Worker::StartThread() {
224 #if defined(DEBUG)
225 // Must call SetTask before StartThread.
226 { // NOLINT
227 MonitorLocker ml(&monitor_);
228 ASSERT(task_ != NULL);
229 }
230 #endif
231 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
232 }
233
234
235 void ThreadPool::Worker::SetTask(Task* task) {
236 MonitorLocker ml(&monitor_);
237 ASSERT(task_ == NULL);
238 task_ = task;
239 ml.Notify();
240 }
241
242
243 static int64_t ComputeTimeout(int64_t idle_start) {
244 if (FLAG_worker_timeout_millis <= 0) {
245 // No timeout.
246 return 0;
247 } else {
248 int64_t waited = OS::GetCurrentTimeMillis() - idle_start;
249 if (waited >= FLAG_worker_timeout_millis) {
250 // We must have gotten a spurious wakeup just before we timed
251 // out. Give the worker one last desperate chance to live. We
252 // are merciful.
253 return 1;
254 } else {
255 return FLAG_worker_timeout_millis - waited;
256 }
257 }
258 }
259
260
261 void ThreadPool::Worker::Loop() {
262 MonitorLocker ml(&monitor_);
263 int64_t idle_start;
264 while (true) {
265 ASSERT(task_ != NULL);
266 Task* task = task_;
267 task_ = NULL;
268
269 // Release monitor while handling the task.
270 monitor_.Exit();
271 task->Run();
272 delete task;
273 monitor_.Enter();
274
275 ASSERT(task_ == NULL);
276 if (IsDone()) {
277 return;
278 }
279 ASSERT(pool_ != NULL);
280 pool_->SetIdle(this);
281 idle_start = OS::GetCurrentTimeMillis();
282 while (true) {
283 Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
284 if (task_ != NULL) {
285 // We've found a task. Process it, regardless of whether the
286 // worker is done_.
287 break;
288 }
289 if (IsDone()) {
290 return;
291 }
292 if (result == Monitor::kTimedOut &&
293 pool_->ReleaseIdleWorker(this)) {
294 return;
295 }
296 }
297 }
298 UNREACHABLE();
299 }
300
301
302 void ThreadPool::Worker::Shutdown() {
303 MonitorLocker ml(&monitor_);
304 pool_ = NULL; // Fail fast if someone tries to access pool_.
305 ml.Notify();
306 }
307
308
309 // static
310 void ThreadPool::Worker::Main(uword args) {
311 Worker* worker = reinterpret_cast<Worker*>(args);
312 worker->Loop();
313
314 // It should be okay to access these unlocked here in this assert.
315 ASSERT(!worker->owned_ &&
316 worker->all_next_ == NULL &&
317 worker->idle_next_ == NULL);
318
319 // The exit monitor is only used during testing.
320 if (ThreadPool::exit_monitor_) {
321 MonitorLocker ml(ThreadPool::exit_monitor_);
322 (*ThreadPool::exit_count_)++;
323 ml.Notify();
324 }
325 delete worker;
326 }
327
328 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698