Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: runtime/vm/message_handler.cc

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/message.h" 5 #include "vm/message_handler.h"
6 #include "vm/dart.h"
6 7
7 namespace dart { 8 namespace dart {
8 9
9 DECLARE_FLAG(bool, trace_isolates); 10 DECLARE_FLAG(bool, trace_isolates);
10 11
11 12
13 class MessageHandlerTask : public ThreadPool::Task {
14 public:
15 explicit MessageHandlerTask(MessageHandler* handler)
16 : handler_(handler) {
17 }
18
19 void Run() {
20 handler_->TaskCallback();
21 }
22
23 private:
24 MessageHandler* handler_;
25
26 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
27 };
28
29
12 MessageHandler::MessageHandler() 30 MessageHandler::MessageHandler()
13 : live_ports_(0), 31 : queue_(new MessageQueue()),
14 queue_(new MessageQueue()) { 32 oob_queue_(new MessageQueue()),
33 pool_(NULL),
34 blocking_(false),
35 task_(NULL),
36 start_function_(NULL),
37 start_data_(NULL),
38 live_ports_(0) {
15 ASSERT(queue_ != NULL); 39 ASSERT(queue_ != NULL);
16 } 40 }
17 41
18 42
19 MessageHandler::~MessageHandler() { 43 MessageHandler::~MessageHandler() {
20 delete queue_; 44 delete queue_;
45 delete oob_queue_;
21 } 46 }
22 47
23 48
24 const char* MessageHandler::name() const { 49 const char* MessageHandler::name() const {
25 return "<unnamed>"; 50 return "<unnamed>";
26 } 51 }
27 52
28 53
29 #if defined(DEBUG) 54 #if defined(DEBUG)
30 void MessageHandler::CheckAccess() { 55 void MessageHandler::CheckAccess() {
31 // By default there is no checking. 56 // By default there is no checking.
32 } 57 }
33 #endif 58 #endif
34 59
35 60
36 void MessageHandler::MessageNotify(Message::Priority priority) { 61 void MessageHandler::MessageNotify(Message::Priority priority) {
37 // By default, there is no custom message notification. 62 // By default, there is no custom message notification.
38 } 63 }
39 64
40 65
66 void MessageHandler::Run(ThreadPool* pool,
67 StartFunction function, StartData data) {
68 {
69 MonitorLocker ml(&monitor_);
70 if (FLAG_trace_isolates) {
71 OS::Print("[+] Starting message handler:\n"
72 "\thandler: %s\n",
73 name());
74 }
75 ASSERT(pool_ == NULL);
76 pool_ = pool;
77 blocking_ = false;
78 start_function_ = function;
79 start_data_ = data;
80 task_ = new MessageHandlerTask(this);
81 }
82 pool_->Run(task_);
83 }
84
85
86 void MessageHandler::RunBlocking(ThreadPool* pool,
87 StartFunction function, StartData data) {
88 {
89 MonitorLocker ml(&monitor_);
90 if (FLAG_trace_isolates) {
91 OS::Print("[+] Starting message handler:\n"
92 "\thandler: %s\n",
93 name());
94 }
95 ASSERT(pool_ == NULL);
96 pool_ = pool;
97 blocking_ = true;
98 start_function_ = function;
99 start_data_ = data;
100 task_ = new MessageHandlerTask(this);
101 }
102 pool_->Run(task_);
103 {
104 MonitorLocker ml(&monitor_);
105 while (pool_ != NULL) {
106 ml.Wait();
107 }
108 }
109 }
110
111
41 void MessageHandler::PostMessage(Message* message) { 112 void MessageHandler::PostMessage(Message* message) {
113 MonitorLocker ml(&monitor_);
42 if (FLAG_trace_isolates) { 114 if (FLAG_trace_isolates) {
43 const char* source_name = "<native code>"; 115 const char* source_name = "<native code>";
44 Isolate* source_isolate = Isolate::Current(); 116 Isolate* source_isolate = Isolate::Current();
45 if (source_isolate) { 117 if (source_isolate) {
46 source_name = source_isolate->name(); 118 source_name = source_isolate->name();
47 } 119 }
48 OS::Print("[>] Posting message:\n" 120 OS::Print("[>] Posting message:\n"
49 "\tsource: %s\n" 121 "\tsource: %s\n"
50 "\treply_port: %lld\n" 122 "\treply_port: %lld\n"
51 "\tdest: %s\n" 123 "\tdest: %s\n"
52 "\tdest_port: %lld\n", 124 "\tdest_port: %lld\n",
53 source_name, message->reply_port(), name(), message->dest_port()); 125 source_name, message->reply_port(), name(), message->dest_port());
54 } 126 }
55 127
56 Message::Priority priority = message->priority(); 128 Message::Priority priority = message->priority();
57 queue()->Enqueue(message); 129 if (priority == Message::kOOBPriority) {
130 oob_queue_->Enqueue(message);
131 } else {
132 queue_->Enqueue(message);
133 }
58 message = NULL; // Do not access message. May have been deleted. 134 message = NULL; // Do not access message. May have been deleted.
59 135
136 if (pool_ != NULL && task_ == NULL) {
137 task_ = new MessageHandlerTask(this);
138 pool_->Run(task_);
139 }
140
60 // Invoke any custom message notification. 141 // Invoke any custom message notification.
61 MessageNotify(priority); 142 MessageNotify(priority);
62 } 143 }
63 144
64 145
146 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
147 Message* message = oob_queue_->Dequeue();
148 if (message == NULL && min_priority < Message::kOOBPriority) {
149 message = queue_->Dequeue();
150 }
151 return message;
152 }
153
154
155 bool MessageHandler::HandleMessages() {
156 ASSERT(Isolate::Current() == NULL);
157 ASSERT(pool_ != NULL);
158 bool result = true;
159 Message* message = DequeueMessage(Message::kNormalPriority);
160 while (message) {
161 if (FLAG_trace_isolates) {
162 OS::Print("[<] Handling message:\n"
163 "\thandler: %s\n"
164 "\tport: %lld\n",
165 name(), message->dest_port());
166 }
167
168 monitor_.Exit();
169 result = HandleMessage(message);
170 ASSERT(Isolate::Current() == NULL);
171 monitor_.Enter();
172 if (!result) {
173 break;
174 }
175 message = DequeueMessage(Message::kNormalPriority);
176 }
177 return result;
178 }
179
180
181 bool MessageHandler::HandleNextMessage() {
182 // We can only call HandleNextMessage when this handler is not
183 // assigned to a thread pool.
184 ASSERT(pool_ == NULL);
185 #if defined(DEBUG)
186 CheckAccess();
187 #endif
188 MonitorLocker ml(&monitor_);
189 bool result = true;
190 Message* message = DequeueMessage(Message::kNormalPriority);
191 while (message) {
192 if (FLAG_trace_isolates) {
193 OS::Print("[<] Handling message:\n"
194 "\thandler: %s\n"
195 "\tport: %lld\n",
196 name(), message->dest_port());
197 }
198
199 monitor_.Exit();
200 Message::Priority priority = message->priority();
201 result = HandleMessage(message);
202 monitor_.Enter();
203 if (!result || priority == Message::kNormalPriority) {
204 break;
205 }
206 message = DequeueMessage(Message::kNormalPriority);
207 }
208 return result;
209 }
210
211
212 bool MessageHandler::HandleOOBMessages() {
213 MonitorLocker ml(&monitor_);
214 bool result = true;
215 Message* message = DequeueMessage(Message::kOOBPriority);
216 while (message) {
217 if (FLAG_trace_isolates) {
218 OS::Print("[<] Handling message:\n"
219 "\thandler: %s\n"
220 "\tport: %lld\n",
221 name(), message->dest_port());
222 }
223
224 monitor_.Exit();
225 result = HandleMessage(message);
226 monitor_.Enter();
227 if (!result) {
228 break;
229 }
230 message = DequeueMessage(Message::kOOBPriority);
231 }
232 return result;
233 }
234
235
236 void MessageHandler::TaskCallback() {
237 ASSERT(Isolate::Current() == NULL);
238 bool error = false;
239 {
240 MonitorLocker ml(&monitor_);
241 // Initialize the message handler by running its start function,
242 // if we have one. For an isolate, this will run the isolate's
243 // main() function.
244 if (start_function_) {
245 monitor_.Exit();
246 error = !start_function_(start_data_);
Ivan Posva 2012/04/08 22:58:27 I find this style a bit hard to read. The double-n
turnidge 2012/04/11 19:37:16 True. Fixed. On 2012/04/08 22:58:27, Ivan Posva
247 ASSERT(Isolate::Current() == NULL);
248 start_function_ = NULL;
249 start_data_ = NULL;
250 monitor_.Enter();
251 }
252
253 // Handle any pending messages for this message handler.
254 if (!error) {
255 error = !HandleMessages();
256 }
257 task_ = NULL; // No task in queue.
258 CheckTermination(&ml, error);
259 }
260 }
261
262
65 void MessageHandler::ClosePort(Dart_Port port) { 263 void MessageHandler::ClosePort(Dart_Port port) {
66 queue()->Flush(port); 264 MonitorLocker ml(&monitor_);
265 if (FLAG_trace_isolates) {
266 OS::Print("[-] Closing port:\n"
267 "\thandler: %s\n"
268 "\tport: %d\n",
269 name(), port);
270 }
271 queue_->Flush(port);
67 } 272 }
68 273
69 274
70 void MessageHandler::CloseAllPorts() { 275 void MessageHandler::CloseAllPorts() {
71 queue()->FlushAll(); 276 MonitorLocker ml(&monitor_);
72 } 277 if (FLAG_trace_isolates) {
73 278 OS::Print("[-] Closing all ports:\n"
74 279 "\thandler: %s\n",
75 MessageQueue::MessageQueue() { 280 name());
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 281 }
77 head_[p] = NULL; 282 queue_->FlushAll();
78 tail_[p] = NULL; 283 }
79 } 284
80 } 285
81 286 void MessageHandler::CheckTermination(MonitorLocker* ml, bool error) {
82 287 // If the message handler received an error or has no live ports,
83 MessageQueue::~MessageQueue() { 288 // we stop handling messages for this message handler.
84 // Ensure that all pending messages have been released. 289 if (pool_ == NULL) {
85 #if defined(DEBUG) 290 return;
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 291 }
87 ASSERT(head_[p] == NULL); 292 if (error) {
88 } 293 // We received an error. Abnormal termination.
89 #endif 294 char* error_str = GetErrorCString();
90 } 295 if (FLAG_trace_isolates) {
91 296 OS::Print("[-] Stopping message handler (error):\n"
92 297 "\thandler: %s\n",
93 void MessageQueue::Enqueue(Message* msg) { 298 name());
94 MonitorLocker ml(&monitor_); 299 OS::Print("Error: %s\n", error_str);
95 Message::Priority p = msg->priority(); 300 }
96 // Make sure messages are not reused. 301 pool_ = NULL;
97 ASSERT(msg->next_ == NULL); 302 if (blocking_) {
98 if (head_[p] == NULL) { 303 ml->Notify();
99 // Only element in the queue. 304 } else {
100 head_[p] = msg; 305 // TODO(turnidge): Implement a real system for handling
101 tail_[p] = msg; 306 // isolate death rather than terminating the vm.
102 307 OS::PrintErr("%s\n", error_str);
103 // We only need to notify if the queue was empty. 308 exit(255);
104 monitor_.Notify(); 309 }
105 } else { 310 free(error_str);
106 ASSERT(tail_[p] != NULL); 311
107 // Append at the tail. 312 } else if (!HasLivePorts()) {
108 tail_[p]->next_ = msg; 313 // No live ports. Normal termination.
109 tail_[p] = msg; 314 if (FLAG_trace_isolates) {
110 } 315 OS::Print("[-] Stopping message handler (no live ports):\n"
111 } 316 "\thandler: %s\n",
112 317 name());
113 318 }
114 Message* MessageQueue::DequeueNoWait() { 319 pool_ = NULL;
115 MonitorLocker ml(&monitor_); 320 if (blocking_) {
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); 321 ml->Notify();
117 } 322 }
118 323 }
119 324 }
120 Message* MessageQueue::DequeueNoWaitWithPriority(
121 Message::Priority min_priority) {
122 MonitorLocker ml(&monitor_);
123 return DequeueNoWaitHoldsLock(min_priority);
124 }
125
126
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
128 // Look for the highest priority available message.
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
130 Message* result = head_[p];
131 if (result != NULL) {
132 head_[p] = result->next_;
133 // The following update to tail_ is not strictly needed.
134 if (head_[p] == NULL) {
135 tail_[p] = NULL;
136 }
137 #if defined(DEBUG)
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
139 #endif // DEBUG
140 return result;
141 }
142 }
143 return NULL;
144 }
145
146
147 Message* MessageQueue::Dequeue(int64_t millis) {
148 ASSERT(millis >= 0);
149 MonitorLocker ml(&monitor_);
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
151 if (result == NULL) {
152 // No message available at any priority.
153 monitor_.Wait(millis);
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
155 }
156 return result;
157 }
158
159
160 void MessageQueue::Flush(Dart_Port port) {
161 MonitorLocker ml(&monitor_);
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
163 Message* cur = head_[p];
164 Message* prev = NULL;
165 while (cur != NULL) {
166 Message* next = cur->next_;
167 // If the message matches, then remove it from the queue and delete it.
168 if (cur->dest_port() == port) {
169 if (prev != NULL) {
170 prev->next_ = next;
171 } else {
172 head_[p] = next;
173 }
174 delete cur;
175 } else {
176 // Move prev forward.
177 prev = cur;
178 }
179 // Advance to the next message in the queue.
180 cur = next;
181 }
182 tail_[p] = prev;
183 }
184 }
185
186
187 void MessageQueue::FlushAll() {
188 MonitorLocker ml(&monitor_);
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
190 Message* cur = head_[p];
191 head_[p] = NULL;
192 tail_[p] = NULL;
193 while (cur != NULL) {
194 Message* next = cur->next_;
195 delete cur;
196 cur = next;
197 }
198 }
199 }
200
201 325
202 } // namespace dart 326 } // namespace dart
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698