Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 #include "vm/thread_pool.h"
6
7 namespace dart {
8
9 DEFINE_FLAG(int, worker_timeout_millis, 5000,
10 "Free workers when they have been idle for this amount of time.");
11
12 ThreadPool::ThreadPool()
13 : shutting_down_(false),
14 all_workers_(NULL),
15 idle_workers_(NULL),
16 count_started_(0),
17 count_stopped_(0),
18 count_running_(0),
19 count_idle_(0) {
20 }
21
22
23 ThreadPool::~ThreadPool() {
24 Shutdown();
25 }
26
27
28 void ThreadPool::Run(Task* task) {
29 Worker* worker = NULL;
30 {
31 MutexLocker ml(&mutex_);
32 if (shutting_down_) {
33 return;
34 }
35 worker = GetIdleWorker();
siva 2012/03/09 18:36:29 Why not treat the new thread creation case differe
turnidge 2012/03/09 21:40:38 Done.
36 }
37 // Release ThreadPool::mutex_ before calling Worker functions.
siva 2012/03/09 18:36:29 ASSERT(worker != NULL);
turnidge 2012/03/09 21:40:38 Done.
38 worker->Run(task);
39 }
40
41
42 ThreadPool::Worker* ThreadPool::GetIdleWorker() {
43 // Lock is claimed in ThreadPool::Run.
44 ASSERT(!shutting_down_);
45 Worker* worker = NULL;
46 if (idle_workers_ == NULL) {
47 worker = new Worker(this);
48 ASSERT(worker != NULL);
49 count_started_++;
50
51 // Add worker to the all_workers_ list.
52 worker->all_next_ = all_workers_;
53 all_workers_ = worker;
54
55 } else {
56 // Get the first worker from the idle worker list.
57 worker = idle_workers_;
58 idle_workers_ = worker->idle_next_;
59 worker->idle_next_ = NULL;
60 count_idle_--;
61 }
62 worker->state_ = kRunning;
63 count_running_++;
64 return worker;
65 }
66
67
68 void ThreadPool::Shutdown() {
69 Worker* saved = NULL;
70 {
71 MutexLocker ml(&mutex_);
72 shutting_down_ = true;
73 saved = all_workers_;
74 all_workers_ = NULL;
75 idle_workers_ = NULL;
76
77 Worker* current = saved;
78 while (current != NULL) {
79 Worker* next = current->all_next_;
80 current->idle_next_ = NULL;
81 current->state_ = kDone;
82 current = next;
83 }
84
85 count_idle_ = 0;
86 count_running_ = 0;
87 }
88 // Release ThreadPool::mutex_ before calling Worker functions.
89
90 Worker* current = saved;
91 while (current != NULL) {
92 Worker* next = current->all_next_;
93 current->all_next_ = NULL;
94 current->Shutdown();
95 current = next;
96 }
97 }
98
99
100 void ThreadPool::SetIdle(Worker* worker) {
101 MutexLocker ml(&mutex_);
102 if (shutting_down_) {
103 return;
104 }
105 ASSERT(worker->state_ == kRunning);
106 worker->idle_next_ = idle_workers_;
107 idle_workers_ = worker;
108 worker->state_ = kIdle;
109 count_idle_++;
110 count_running_--;
111 }
112
113
114 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
115 ASSERT(worker != NULL && all_workers_ != NULL);
116 ASSERT(worker->state_ != kDone);
117
118 // Special case head of list.
119 if (all_workers_ == worker) {
120 all_workers_ = worker->all_next_;
121 worker->all_next_ = NULL;
122 return true;
123 }
124
125 for (Worker* current = all_workers_;
126 current->all_next_ != NULL;
127 current = current->all_next_) {
128 if (current->all_next_ == worker) {
129 current->all_next_ = current->all_next_->all_next_;
siva 2012/03/09 18:36:29 current->all_next_ = worker->all_next_; maybe more
turnidge 2012/03/09 21:40:38 Done.
130 worker->all_next_ = NULL;
131 return true;
132 }
133 }
134 }
135
136
137 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
138 ASSERT(worker != NULL && idle_workers_ != NULL);
139 ASSERT(worker->state_ == kIdle);
140
141 // Special case head of list.
142 if (idle_workers_ == worker) {
143 idle_workers_ = worker->idle_next_;
144 worker->idle_next_ = NULL;
145 return true;
146 }
147
148 for (Worker* current = idle_workers_;
149 current->idle_next_ != NULL;
150 current = current->idle_next_) {
151 if (current->idle_next_ == worker) {
152 current->idle_next_ = current->idle_next_->idle_next_;
siva 2012/03/09 18:36:29 current->idle_next_ = worker->idle_next_; maybe mo
turnidge 2012/03/09 21:40:38 Done.
153 worker->idle_next_ = NULL;
154 return true;
155 }
156 }
157 }
158
159
160 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
161 MutexLocker ml(&mutex_);
162 if (shutting_down_) {
163 return false;
164 }
165 if (worker->state_ != kIdle) {
166 return false;
167 }
168
169 // Remove from idle list.
170 bool found = RemoveWorkerFromIdleList(worker);
171 ASSERT(found);
172
173 // Remove from all list.
174 found = RemoveWorkerFromAllList(worker);
175 ASSERT(found);
176
177 worker->state_ = kDone;
178 count_stopped_++;
179 count_idle_--;
180 return true;
181 }
182
183
184 ThreadPool::Task::Task() {
185 }
186
187
188 ThreadPool::Task::~Task() {
189 }
190
191
192 ThreadPool::Worker::Worker(ThreadPool* pool)
193 : pool_(pool),
194 task_(NULL),
195 started_(false),
196 done_(false),
197 state_(kInvalid),
198 all_next_(NULL),
199 idle_next_(NULL) {
200 }
201
202
203 void ThreadPool::Worker::Run(Task* task) {
204 MonitorLocker ml(&monitor_);
205 ASSERT(task_ == NULL);
206 if (!started_) {
207 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
208 started_ = true;
209 }
210 task_ = task;
211 ml.Notify();
212 }
213
214
215 void ThreadPool::Worker::Loop() {
216 MonitorLocker ml(&monitor_);
217 if (done_) {
218 return;
219 }
220
221 ASSERT(started_ && task_ != NULL);
222 int64_t idle_start = 0;
223 while (true) {
224 Task* task = task_;
225 task_ = NULL;
226
227 // Release monitor while handling the task.
228 monitor_.Exit();
229 task->Run();
230 delete task;
231 monitor_.Enter();
232
233 if (done_) {
234 return;
235 }
236 ASSERT(task_ == NULL);
237
238 pool_->SetIdle(this);
239 idle_start = OS::GetCurrentTimeMillis();
240 do {
241 Monitor::WaitResult result = ml.Wait(FLAG_worker_timeout_millis);
242 if (done_) {
243 return;
244 }
siva 2012/03/09 18:36:29 Threads which are in the idle list will not remove
turnidge 2012/03/09 21:40:38 Discussed offline -- already removed from idle lis
245 if (task_ == NULL &&
246 result == Monitor::kTimedOut &&
247 FLAG_worker_timeout_millis > 0) {
248 bool timeout =
249 (OS::GetCurrentTimeMillis() - idle_start) >
250 FLAG_worker_timeout_millis;
251 if (timeout && pool_->ReleaseIdleWorker(this)) {
252 return;
253 }
254 }
255 } while (task_ == NULL);
256 }
siva 2012/03/09 18:36:29 MonitorLocker ml(&monitor_); ASSERT(task_ != NULL)
turnidge 2012/03/09 21:40:38 Discussed offilne. On 2012/03/09 18:36:29, asiva
257 UNREACHABLE();
258 }
259
260
261 void ThreadPool::Worker::Shutdown() {
262 MonitorLocker ml(&monitor_);
263 done_ = true;
264 ml.Notify();
265 }
266
267
268 // static
269 void ThreadPool::Worker::Main(uword args) {
270 Worker* worker = reinterpret_cast<Worker*>(args);
271 worker->Loop();
272
273 // It should be okay to access these unlocked here in this assert.
274 ASSERT(worker->state_ == kDone && worker->idle_next_ == NULL);
siva 2012/03/09 18:36:29 Also (worker->all_next_ == NULL)?
turnidge 2012/03/09 21:40:38 Done.
275 delete worker;
276 }
277
278 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698