Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: runtime/vm/thread_pool.cc

Issue 9581039: Implement ThreadPool. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 #include "vm/thread_pool.h"
6
7 namespace dart {
8
9 DEFINE_FLAG(int, worker_timeout_millis, 5000,
10 "Free workers when they have been idle for this amount of time.");
11
12 ThreadPool::ThreadPool()
13 : shutting_down_(false),
14 all_workers_(NULL),
15 idle_workers_(NULL),
16 count_started_(0),
17 count_stopped_(0),
18 count_running_(0),
19 count_idle_(0) {
20 }
21
22
23 ThreadPool::~ThreadPool() {
24 Shutdown();
25 }
26
27
28 void ThreadPool::Run(Task* task) {
29 Worker* worker = NULL;
30 {
31 MutexLocker ml(&mutex_);
32 if (shutting_down_) {
33 return;
34 }
35 worker = GetIdleWorker();
36 }
37 // Release ThreadPool::mutex_ before calling Worker functions.
38 worker->Run(task);
39 }
40
41
42 ThreadPool::Worker* ThreadPool::GetIdleWorker() {
43 // Lock is claimed in ThreadPool::Run.
44 ASSERT(!shutting_down_);
45 Worker* worker = NULL;
46 if (idle_workers_ == NULL) {
47 worker = new Worker(this);
48 ASSERT(worker != NULL);
49 count_started_++;
50
51 // Add worker to the all_workers_ list.
52 worker->all_next_ = all_workers_;
53 all_workers_ = worker;
54
55 } else {
56 // Get the first worker from the idle worker list.
57 worker = idle_workers_;
58 idle_workers_ = worker->idle_next_;
59 worker->idle_next_ = NULL;
60 count_idle_--;
61 }
62 worker->state_ = kRunning;
63 count_running_++;
64 return worker;
65 }
66
67
68 void ThreadPool::Shutdown() {
69 Worker* current = NULL;
70 {
71 MutexLocker ml(&mutex_);
72 shutting_down_ = true;
73 current = all_workers_;
74 all_workers_ = NULL;
75 idle_workers_ = NULL;
76 count_idle_ = 0;
77 count_running_ = 0;
78
79 while (current != NULL) {
80 Worker* next = current->all_next_;
81 current->all_next_ = NULL;
82 current->idle_next_ = NULL;
83 current->state_ = kDone;
84 current = next;
85 }
86 }
87 // Release ThreadPool::mutex_ before calling Worker functions.
88
89 while (current != NULL) {
Ivan Posva 2012/03/07 01:06:05 Isn't current guaranteed to be NULL here?
turnidge 2012/03/07 18:02:20 Oops, that broke in my most recent changes. Fixed
90 Worker* next = current->all_next_;
91 current->Shutdown();
92 current = next;
93 }
94 }
95
96
97 void ThreadPool::SetIdle(Worker* worker) {
98 MutexLocker ml(&mutex_);
99 if (shutting_down_) {
100 return;
101 }
102 ASSERT(worker->state_ == kRunning);
103 worker->idle_next_ = idle_workers_;
104 idle_workers_ = worker;
105 worker->state_ = kIdle;
106 count_idle_++;
107 count_running_--;
108 }
109
110
111 bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
112 Worker* head = all_workers_;
113 ASSERT(worker != NULL && head != NULL);
114 ASSERT(worker->state_ != kDone);
115
116 // Special case head of list.
117 if (head == worker) {
118 head = worker->all_next_;
119 worker->all_next_ = NULL;
120 return true;
121 }
122
123 // We only want to remove a worker if it is idle.
Ivan Posva 2012/03/07 01:06:05 Comment out of place?
turnidge 2012/03/07 18:02:20 Comment removed, here and below.
124 for (Worker* current = head;
125 current->all_next_ != NULL;
126 current = current->all_next_) {
127 if (current->all_next_ == worker) {
128 // We found this worker in the idle list. Remove it.
Ivan Posva 2012/03/07 01:06:05 Comment seems out of place.
turnidge 2012/03/07 18:02:20 Comment removed.
129 current->all_next_ = current->all_next_->all_next_;
130 worker->all_next_ = NULL;
131 return true;
132 }
133 }
134 }
135
136
137 bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
138 Worker* head = idle_workers_;
139 ASSERT(worker != NULL && head != NULL);
140 ASSERT(worker->state_ == kIdle);
141
142 // Special case head of list.
143 if (head == worker) {
144 head = worker->idle_next_;
145 worker->idle_next_ = NULL;
146 return true;
147 }
148
149 // We only want to remove a worker if it is idle.
150 for (Worker* current = head;
151 current->idle_next_ != NULL;
152 current = current->idle_next_) {
153 if (current->idle_next_ == worker) {
154 // We found this worker in the idle list. Remove it.
155 current->idle_next_ = current->idle_next_->idle_next_;
156 worker->idle_next_ = NULL;
157 return true;
158 }
159 }
160 }
161
162
163 bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
164 MutexLocker ml(&mutex_);
165 if (shutting_down_) {
166 return false;
167 }
168 if (worker->state_ != kIdle) {
169 return false;
170 }
171
172 // Remove from idle list.
173 bool found = RemoveWorkerFromIdleList(worker);
174 ASSERT(found);
175
176 // Remove from all list.
177 found = RemoveWorkerFromAllList(worker);
178 ASSERT(found);
179
180 worker->state_ = kDone;
181 count_stopped_++;
182 count_idle_--;
183 return true;
184 }
185
186
187 ThreadPool::Task::Task() {
188 }
189
190
191 ThreadPool::Task::~Task() {
192 }
193
194
195 ThreadPool::Worker::Worker(ThreadPool* pool)
196 : pool_(pool),
197 task_(NULL),
198 started_(false),
199 done_(false),
200 state_(kInvalid),
201 all_next_(NULL),
202 idle_next_(NULL) {
203 }
204
205
206 void ThreadPool::Worker::Run(Task* task) {
207 MonitorLocker ml(&monitor_);
208 ASSERT(task_ == NULL);
209 if (!started_) {
210 Thread::Start(&Worker::Main, reinterpret_cast<uword>(this));
211 started_ = true;
212 }
213 task_ = task;
214 ml.Notify();
215 }
216
217
218 void ThreadPool::Worker::Loop() {
219 MonitorLocker ml(&monitor_);
220 if (done_) {
221 return;
222 }
223
224 ASSERT(started_ && task_ != NULL);
225 int64_t idle_start = 0;
226 while (true) {
227 Task* task = task_;
228 task_ = NULL;
229
230 // Release monitor while handling the task.
231 monitor_.Exit();
232 task->Run();
233 delete task;
234 monitor_.Enter();
235
236 if (done_) {
237 return;
238 }
239 ASSERT(task_ == NULL);
240
241 pool_->SetIdle(this);
242 idle_start = OS::GetCurrentTimeMillis();
243 do {
244 Monitor::WaitResult result = ml.Wait(FLAG_worker_timeout_millis);
245 if (done_) {
246 return;
247 }
248 if (task_ == NULL &&
249 result == Monitor::kTimedOut &&
250 FLAG_worker_timeout_millis > 0) {
251 bool timeout =
252 (OS::GetCurrentTimeMillis() - idle_start) >
253 FLAG_worker_timeout_millis;
254 if (timeout && pool_->ReleaseIdleWorker(this)) {
255 return;
256 }
257 }
258 } while (task_ == NULL);
259 }
260 UNREACHABLE();
261 }
262
263
264 void ThreadPool::Worker::Shutdown() {
265 MonitorLocker ml(&monitor_);
266 done_ = true;
267 ml.Notify();
268 }
269
270
271 // static
272 void ThreadPool::Worker::Main(uword args) {
273 Worker* worker = reinterpret_cast<Worker*>(args);
274 worker->Loop();
275
276 // It should be okay to access these unlocked here in this assert.
277 ASSERT(worker->state_ == kDone && worker->idle_next_ == NULL);
278 delete worker;
279 }
280
281 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/thread_pool.h ('k') | runtime/vm/thread_pool_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698