OLD | NEW |
---|---|
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message.h" | 5 #include "vm/message_handler.h" |
6 #include "vm/dart.h" | |
6 | 7 |
7 namespace dart { | 8 namespace dart { |
8 | 9 |
9 DECLARE_FLAG(bool, trace_isolates); | 10 DECLARE_FLAG(bool, trace_isolates); |
10 | 11 |
11 | 12 |
13 class MessageHandlerTask : public ThreadPool::Task { | |
14 public: | |
15 explicit MessageHandlerTask(MessageHandler* handler) | |
16 : handler_(handler) { | |
17 } | |
18 | |
19 void Run() { | |
20 handler_->TaskCallback(); | |
21 } | |
22 | |
23 private: | |
24 MessageHandler* handler_; | |
25 | |
26 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); | |
27 }; | |
28 | |
29 | |
12 MessageHandler::MessageHandler() | 30 MessageHandler::MessageHandler() |
13 : live_ports_(0), | 31 : queue_(new MessageQueue()), |
14 queue_(new MessageQueue()) { | 32 oob_queue_(new MessageQueue()), |
33 pool_(NULL), | |
34 blocking_(false), | |
35 task_(NULL), | |
36 start_function_(NULL), | |
37 start_data_(NULL), | |
38 live_ports_(0) { | |
15 ASSERT(queue_ != NULL); | 39 ASSERT(queue_ != NULL); |
16 } | 40 } |
17 | 41 |
18 | 42 |
19 MessageHandler::~MessageHandler() { | 43 MessageHandler::~MessageHandler() { |
20 delete queue_; | 44 delete queue_; |
45 delete oob_queue_; | |
21 } | 46 } |
22 | 47 |
23 | 48 |
24 const char* MessageHandler::name() const { | 49 const char* MessageHandler::name() const { |
25 return "<unnamed>"; | 50 return "<unnamed>"; |
26 } | 51 } |
27 | 52 |
28 | 53 |
29 #if defined(DEBUG) | 54 #if defined(DEBUG) |
30 void MessageHandler::CheckAccess() { | 55 void MessageHandler::CheckAccess() { |
31 // By default there is no checking. | 56 // By default there is no checking. |
32 } | 57 } |
33 #endif | 58 #endif |
34 | 59 |
35 | 60 |
36 void MessageHandler::MessageNotify(Message::Priority priority) { | 61 void MessageHandler::MessageNotify(Message::Priority priority) { |
37 // By default, there is no custom message notification. | 62 // By default, there is no custom message notification. |
38 } | 63 } |
39 | 64 |
40 | 65 |
66 void MessageHandler::Run(ThreadPool* pool, | |
67 StartFunction function, StartData data) { | |
68 { | |
69 MonitorLocker ml(&monitor_); | |
siva
2012/04/14 00:29:53
What is this lock protecting?
It seems like it is
turnidge
2012/04/17 23:46:55
Discuessed offline...
My intention was to always
| |
70 if (FLAG_trace_isolates) { | |
71 OS::Print("[+] Starting message handler:\n" | |
72 "\thandler: %s\n", | |
73 name()); | |
74 } | |
75 ASSERT(pool_ == NULL); | |
76 pool_ = pool; | |
77 blocking_ = false; | |
78 start_function_ = function; | |
79 start_data_ = data; | |
80 task_ = new MessageHandlerTask(this); | |
81 } | |
82 pool_->Run(task_); | |
83 } | |
84 | |
85 | |
86 void MessageHandler::RunBlocking(ThreadPool* pool, | |
87 StartFunction function, StartData data) { | |
88 { | |
89 MonitorLocker ml(&monitor_); | |
siva
2012/04/14 00:29:53
Ditto question about what is this lock protecting.
| |
90 if (FLAG_trace_isolates) { | |
91 OS::Print("[+] Starting message handler:\n" | |
92 "\thandler: %s\n", | |
93 name()); | |
94 } | |
95 ASSERT(pool_ == NULL); | |
96 pool_ = pool; | |
97 blocking_ = true; | |
98 start_function_ = function; | |
99 start_data_ = data; | |
100 task_ = new MessageHandlerTask(this); | |
101 } | |
102 pool_->Run(task_); | |
103 { | |
104 MonitorLocker ml(&monitor_); | |
105 while (pool_ != NULL) { | |
106 ml.Wait(); | |
107 } | |
108 } | |
109 } | |
110 | |
111 | |
41 void MessageHandler::PostMessage(Message* message) { | 112 void MessageHandler::PostMessage(Message* message) { |
113 MonitorLocker ml(&monitor_); | |
42 if (FLAG_trace_isolates) { | 114 if (FLAG_trace_isolates) { |
43 const char* source_name = "<native code>"; | 115 const char* source_name = "<native code>"; |
44 Isolate* source_isolate = Isolate::Current(); | 116 Isolate* source_isolate = Isolate::Current(); |
45 if (source_isolate) { | 117 if (source_isolate) { |
46 source_name = source_isolate->name(); | 118 source_name = source_isolate->name(); |
47 } | 119 } |
48 OS::Print("[>] Posting message:\n" | 120 OS::Print("[>] Posting message:\n" |
49 "\tsource: %s\n" | 121 "\tsource: %s\n" |
50 "\treply_port: %lld\n" | 122 "\treply_port: %lld\n" |
51 "\tdest: %s\n" | 123 "\tdest: %s\n" |
52 "\tdest_port: %lld\n", | 124 "\tdest_port: %lld\n", |
53 source_name, message->reply_port(), name(), message->dest_port()); | 125 source_name, message->reply_port(), name(), message->dest_port()); |
54 } | 126 } |
55 | 127 |
56 Message::Priority priority = message->priority(); | 128 Message::Priority priority = message->priority(); |
57 queue()->Enqueue(message); | 129 if (priority == Message::kOOBPriority) { |
130 oob_queue_->Enqueue(message); | |
131 } else { | |
132 queue_->Enqueue(message); | |
133 } | |
58 message = NULL; // Do not access message. May have been deleted. | 134 message = NULL; // Do not access message. May have been deleted. |
59 | 135 |
136 if (pool_ != NULL && task_ == NULL) { | |
137 task_ = new MessageHandlerTask(this); | |
138 pool_->Run(task_); | |
siva
2012/04/14 00:29:53
The task_ is run with the lock being held whereas
turnidge
2012/04/17 23:46:55
You are right. I have changed Run() so that it no
| |
139 } | |
140 | |
60 // Invoke any custom message notification. | 141 // Invoke any custom message notification. |
61 MessageNotify(priority); | 142 MessageNotify(priority); |
62 } | 143 } |
63 | 144 |
64 | 145 |
146 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { | |
147 Message* message = oob_queue_->Dequeue(); | |
148 if (message == NULL && min_priority < Message::kOOBPriority) { | |
149 message = queue_->Dequeue(); | |
150 } | |
151 return message; | |
152 } | |
153 | |
154 | |
155 bool MessageHandler::HandleMessages() { | |
156 ASSERT(Isolate::Current() == NULL); | |
157 ASSERT(pool_ != NULL); | |
158 bool result = true; | |
159 Message* message = DequeueMessage(Message::kNormalPriority); | |
160 while (message) { | |
161 if (FLAG_trace_isolates) { | |
162 OS::Print("[<] Handling message:\n" | |
163 "\thandler: %s\n" | |
164 "\tport: %lld\n", | |
165 name(), message->dest_port()); | |
166 } | |
167 | |
168 monitor_.Exit(); | |
siva
2012/04/14 00:29:53
Where is the monitor_.Enter() corresponding to thi
turnidge
2012/04/17 23:46:55
I will add "lock held" comments to the appropriate
| |
169 result = HandleMessage(message); | |
170 ASSERT(Isolate::Current() == NULL); | |
171 monitor_.Enter(); | |
172 if (!result) { | |
173 break; | |
174 } | |
175 message = DequeueMessage(Message::kNormalPriority); | |
176 } | |
177 return result; | |
178 } | |
179 | |
180 | |
181 bool MessageHandler::HandleNextMessage() { | |
182 // We can only call HandleNextMessage when this handler is not | |
183 // assigned to a thread pool. | |
184 ASSERT(pool_ == NULL); | |
185 #if defined(DEBUG) | |
186 CheckAccess(); | |
187 #endif | |
188 MonitorLocker ml(&monitor_); | |
189 bool result = true; | |
190 Message* message = DequeueMessage(Message::kNormalPriority); | |
191 while (message) { | |
192 if (FLAG_trace_isolates) { | |
193 OS::Print("[<] Handling message:\n" | |
194 "\thandler: %s\n" | |
195 "\tport: %lld\n", | |
196 name(), message->dest_port()); | |
197 } | |
198 | |
199 monitor_.Exit(); | |
200 Message::Priority priority = message->priority(); | |
201 result = HandleMessage(message); | |
202 monitor_.Enter(); | |
203 if (!result || priority == Message::kNormalPriority) { | |
204 break; | |
205 } | |
206 message = DequeueMessage(Message::kNormalPriority); | |
207 } | |
208 return result; | |
209 } | |
siva
2012/04/14 00:29:53
HandleNextMessage seems identical to HandleMessage
turnidge
2012/04/17 23:46:55
Refactored to share more code.
| |
210 | |
211 | |
212 bool MessageHandler::HandleOOBMessages() { | |
213 MonitorLocker ml(&monitor_); | |
214 bool result = true; | |
215 Message* message = DequeueMessage(Message::kOOBPriority); | |
216 while (message) { | |
217 if (FLAG_trace_isolates) { | |
218 OS::Print("[<] Handling message:\n" | |
219 "\thandler: %s\n" | |
220 "\tport: %lld\n", | |
221 name(), message->dest_port()); | |
222 } | |
223 | |
224 monitor_.Exit(); | |
225 result = HandleMessage(message); | |
226 monitor_.Enter(); | |
227 if (!result) { | |
228 break; | |
229 } | |
230 message = DequeueMessage(Message::kOOBPriority); | |
231 } | |
232 return result; | |
233 } | |
234 | |
235 | |
236 void MessageHandler::TaskCallback() { | |
237 ASSERT(Isolate::Current() == NULL); | |
238 bool ok = true; | |
239 { | |
240 MonitorLocker ml(&monitor_); | |
siva
2012/04/14 00:29:53
Is the lock being done here primarily to protect a
| |
241 // Initialize the message handler by running its start function, | |
242 // if we have one. For an isolate, this will run the isolate's | |
243 // main() function. | |
244 if (start_function_) { | |
245 monitor_.Exit(); | |
246 ok = start_function_(start_data_); | |
247 ASSERT(Isolate::Current() == NULL); | |
248 start_function_ = NULL; | |
249 start_data_ = NULL; | |
250 monitor_.Enter(); | |
251 } | |
252 | |
253 // Handle any pending messages for this message handler. | |
254 if (ok) { | |
255 ok = HandleMessages(); | |
256 } | |
257 task_ = NULL; // No task in queue. | |
258 CheckTermination(&ml, !ok); | |
259 } | |
260 } | |
261 | |
262 | |
65 void MessageHandler::ClosePort(Dart_Port port) { | 263 void MessageHandler::ClosePort(Dart_Port port) { |
66 queue()->Flush(port); | 264 MonitorLocker ml(&monitor_); |
265 if (FLAG_trace_isolates) { | |
266 OS::Print("[-] Closing port:\n" | |
267 "\thandler: %s\n" | |
268 "\tport: %d\n", | |
269 name(), port); | |
270 } | |
271 queue_->Flush(port); | |
67 } | 272 } |
68 | 273 |
69 | 274 |
70 void MessageHandler::CloseAllPorts() { | 275 void MessageHandler::CloseAllPorts() { |
71 queue()->FlushAll(); | 276 MonitorLocker ml(&monitor_); |
72 } | 277 if (FLAG_trace_isolates) { |
73 | 278 OS::Print("[-] Closing all ports:\n" |
74 | 279 "\thandler: %s\n", |
75 MessageQueue::MessageQueue() { | 280 name()); |
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 281 } |
77 head_[p] = NULL; | 282 queue_->FlushAll(); |
78 tail_[p] = NULL; | 283 } |
79 } | 284 |
80 } | 285 |
81 | 286 void MessageHandler::CheckTermination(MonitorLocker* ml, bool error) { |
siva
2012/04/14 00:29:53
Need an assertion that the lock is indeed held whe
| |
82 | 287 // If the message handler received an error or has no live ports, |
83 MessageQueue::~MessageQueue() { | 288 // we stop handling messages for this message handler. |
84 // Ensure that all pending messages have been released. | 289 if (pool_ == NULL) { |
85 #if defined(DEBUG) | 290 return; |
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 291 } |
87 ASSERT(head_[p] == NULL); | 292 if (error) { |
88 } | 293 // We received an error. Abnormal termination. |
89 #endif | 294 char* error_str = GetErrorCString(); |
90 } | 295 if (FLAG_trace_isolates) { |
91 | 296 OS::Print("[-] Stopping message handler (error):\n" |
92 | 297 "\thandler: %s\n", |
93 void MessageQueue::Enqueue(Message* msg) { | 298 name()); |
94 MonitorLocker ml(&monitor_); | 299 OS::Print("Error: %s\n", error_str); |
95 Message::Priority p = msg->priority(); | 300 } |
96 // Make sure messages are not reused. | 301 pool_ = NULL; |
97 ASSERT(msg->next_ == NULL); | 302 if (blocking_) { |
98 if (head_[p] == NULL) { | 303 ml->Notify(); |
99 // Only element in the queue. | 304 } else { |
100 head_[p] = msg; | 305 // TODO(turnidge): Implement a real system for handling |
101 tail_[p] = msg; | 306 // isolate death rather than terminating the vm. |
102 | 307 OS::PrintErr("%s\n", error_str); |
103 // We only need to notify if the queue was empty. | 308 exit(255); |
104 monitor_.Notify(); | 309 } |
105 } else { | 310 free(error_str); |
106 ASSERT(tail_[p] != NULL); | 311 |
107 // Append at the tail. | 312 } else if (!HasLivePorts()) { |
108 tail_[p]->next_ = msg; | 313 // No live ports. Normal termination. |
109 tail_[p] = msg; | 314 if (FLAG_trace_isolates) { |
110 } | 315 OS::Print("[-] Stopping message handler (no live ports):\n" |
111 } | 316 "\thandler: %s\n", |
112 | 317 name()); |
113 | 318 } |
114 Message* MessageQueue::DequeueNoWait() { | 319 pool_ = NULL; |
115 MonitorLocker ml(&monitor_); | 320 if (blocking_) { |
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); | 321 ml->Notify(); |
117 } | 322 } |
118 | 323 } |
119 | 324 } |
120 Message* MessageQueue::DequeueNoWaitWithPriority( | |
121 Message::Priority min_priority) { | |
122 MonitorLocker ml(&monitor_); | |
123 return DequeueNoWaitHoldsLock(min_priority); | |
124 } | |
125 | |
126 | |
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { | |
128 // Look for the highest priority available message. | |
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { | |
130 Message* result = head_[p]; | |
131 if (result != NULL) { | |
132 head_[p] = result->next_; | |
133 // The following update to tail_ is not strictly needed. | |
134 if (head_[p] == NULL) { | |
135 tail_[p] = NULL; | |
136 } | |
137 #if defined(DEBUG) | |
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue. | |
139 #endif // DEBUG | |
140 return result; | |
141 } | |
142 } | |
143 return NULL; | |
144 } | |
145 | |
146 | |
147 Message* MessageQueue::Dequeue(int64_t millis) { | |
148 ASSERT(millis >= 0); | |
149 MonitorLocker ml(&monitor_); | |
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
151 if (result == NULL) { | |
152 // No message available at any priority. | |
153 monitor_.Wait(millis); | |
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
155 } | |
156 return result; | |
157 } | |
158 | |
159 | |
160 void MessageQueue::Flush(Dart_Port port) { | |
161 MonitorLocker ml(&monitor_); | |
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | |
163 Message* cur = head_[p]; | |
164 Message* prev = NULL; | |
165 while (cur != NULL) { | |
166 Message* next = cur->next_; | |
167 // If the message matches, then remove it from the queue and delete it. | |
168 if (cur->dest_port() == port) { | |
169 if (prev != NULL) { | |
170 prev->next_ = next; | |
171 } else { | |
172 head_[p] = next; | |
173 } | |
174 delete cur; | |
175 } else { | |
176 // Move prev forward. | |
177 prev = cur; | |
178 } | |
179 // Advance to the next message in the queue. | |
180 cur = next; | |
181 } | |
182 tail_[p] = prev; | |
183 } | |
184 } | |
185 | |
186 | |
187 void MessageQueue::FlushAll() { | |
188 MonitorLocker ml(&monitor_); | |
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | |
190 Message* cur = head_[p]; | |
191 head_[p] = NULL; | |
192 tail_[p] = NULL; | |
193 while (cur != NULL) { | |
194 Message* next = cur->next_; | |
195 delete cur; | |
196 cur = next; | |
197 } | |
198 } | |
199 } | |
200 | |
201 | 325 |
202 } // namespace dart | 326 } // namespace dart |
OLD | NEW |