OLD | NEW |
---|---|
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message.h" | 5 #include "vm/message_handler.h" |
6 #include "vm/dart.h" | |
6 | 7 |
7 namespace dart { | 8 namespace dart { |
8 | 9 |
9 DECLARE_FLAG(bool, trace_isolates); | 10 DECLARE_FLAG(bool, trace_isolates); |
10 | 11 |
11 | 12 |
13 class MessageHandlerTask : public ThreadPool::Task { | |
14 public: | |
15 explicit MessageHandlerTask(MessageHandler* handler) | |
16 : handler_(handler) { | |
siva
2012/04/18 22:05:00
ASSERT(handler != NULL);
turnidge
2012/04/19 19:46:55
Done.
| |
17 } | |
18 | |
19 void Run() { | |
20 handler_->TaskCallback(); | |
21 } | |
22 | |
23 private: | |
24 MessageHandler* handler_; | |
25 | |
26 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); | |
27 }; | |
28 | |
29 | |
12 MessageHandler::MessageHandler() | 30 MessageHandler::MessageHandler() |
13 : live_ports_(0), | 31 : queue_(new MessageQueue()), |
14 queue_(new MessageQueue()) { | 32 oob_queue_(new MessageQueue()), |
33 live_ports_(0), | |
34 pool_(NULL), | |
35 task_(NULL), | |
36 start_callback_(NULL), | |
37 end_callback_(NULL), | |
38 callback_data_(NULL) { | |
15 ASSERT(queue_ != NULL); | 39 ASSERT(queue_ != NULL); |
40 ASSERT(oob_queue_ != NULL); | |
16 } | 41 } |
17 | 42 |
18 | 43 |
19 MessageHandler::~MessageHandler() { | 44 MessageHandler::~MessageHandler() { |
20 delete queue_; | 45 delete queue_; |
46 delete oob_queue_; | |
21 } | 47 } |
22 | 48 |
23 | 49 |
24 const char* MessageHandler::name() const { | 50 const char* MessageHandler::name() const { |
25 return "<unnamed>"; | 51 return "<unnamed>"; |
26 } | 52 } |
27 | 53 |
28 | 54 |
29 #if defined(DEBUG) | 55 #if defined(DEBUG) |
30 void MessageHandler::CheckAccess() { | 56 void MessageHandler::CheckAccess() { |
31 // By default there is no checking. | 57 // By default there is no checking. |
32 } | 58 } |
33 #endif | 59 #endif |
34 | 60 |
35 | 61 |
36 void MessageHandler::MessageNotify(Message::Priority priority) { | 62 void MessageHandler::MessageNotify(Message::Priority priority) { |
37 // By default, there is no custom message notification. | 63 // By default, there is no custom message notification. |
38 } | 64 } |
39 | 65 |
40 | 66 |
67 void MessageHandler::Run(ThreadPool* pool, | |
68 StartCallback start_callback, | |
69 EndCallback end_callback, | |
70 CallbackData data) { | |
71 MonitorLocker ml(&monitor_); | |
72 if (FLAG_trace_isolates) { | |
73 OS::Print("[+] Starting message handler:\n" | |
74 "\thandler: %s\n", | |
75 name()); | |
76 } | |
77 ASSERT(pool_ == NULL); | |
78 pool_ = pool; | |
79 start_callback_ = start_callback; | |
80 end_callback_ = end_callback; | |
81 callback_data_ = data; | |
82 task_ = new MessageHandlerTask(this); | |
83 pool_->Run(task_); | |
84 } | |
85 | |
86 | |
41 void MessageHandler::PostMessage(Message* message) { | 87 void MessageHandler::PostMessage(Message* message) { |
88 MonitorLocker ml(&monitor_); | |
42 if (FLAG_trace_isolates) { | 89 if (FLAG_trace_isolates) { |
43 const char* source_name = "<native code>"; | 90 const char* source_name = "<native code>"; |
44 Isolate* source_isolate = Isolate::Current(); | 91 Isolate* source_isolate = Isolate::Current(); |
45 if (source_isolate) { | 92 if (source_isolate) { |
46 source_name = source_isolate->name(); | 93 source_name = source_isolate->name(); |
47 } | 94 } |
48 OS::Print("[>] Posting message:\n" | 95 OS::Print("[>] Posting message:\n" |
49 "\tsource: %s\n" | 96 "\tsource: %s\n" |
50 "\treply_port: %lld\n" | 97 "\treply_port: %lld\n" |
51 "\tdest: %s\n" | 98 "\tdest: %s\n" |
52 "\tdest_port: %lld\n", | 99 "\tdest_port: %lld\n", |
53 source_name, message->reply_port(), name(), message->dest_port()); | 100 source_name, message->reply_port(), name(), message->dest_port()); |
54 } | 101 } |
55 | 102 |
56 Message::Priority priority = message->priority(); | 103 Message::Priority saved_priority = message->priority(); |
57 queue()->Enqueue(message); | 104 if (message->IsOOB()) { |
105 oob_queue_->Enqueue(message); | |
106 } else { | |
107 queue_->Enqueue(message); | |
108 } | |
58 message = NULL; // Do not access message. May have been deleted. | 109 message = NULL; // Do not access message. May have been deleted. |
59 | 110 |
111 if (pool_ != NULL && task_ == NULL) { | |
112 task_ = new MessageHandlerTask(this); | |
113 pool_->Run(task_); | |
114 } | |
115 | |
60 // Invoke any custom message notification. | 116 // Invoke any custom message notification. |
61 MessageNotify(priority); | 117 MessageNotify(saved_priority); |
62 } | 118 } |
63 | 119 |
64 | 120 |
65 void MessageHandler::ClosePort(Dart_Port port) { | 121 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
66 queue()->Flush(port); | 122 // TODO(turnidge): Add assert that monitor_ is held here. |
123 Message* message = oob_queue_->Dequeue(); | |
124 if (message == NULL && min_priority < Message::kOOBPriority) { | |
125 message = queue_->Dequeue(); | |
126 } | |
127 return message; | |
67 } | 128 } |
68 | 129 |
69 | 130 |
70 void MessageHandler::CloseAllPorts() { | 131 bool MessageHandler::HandleMessages(bool allow_normal_messages, |
71 queue()->FlushAll(); | 132 bool allow_multiple_normal_messages) { |
72 } | 133 // TODO(turnidge): Add assert that monitor_ is held here. |
134 bool result = true; | |
135 Message::Priority min_priority = (allow_normal_messages | |
136 ? Message::kNormalPriority | |
137 : Message::kOOBPriority); | |
138 Message* message = DequeueMessage(min_priority); | |
139 while (message) { | |
140 if (FLAG_trace_isolates) { | |
141 OS::Print("[<] Handling message:\n" | |
142 "\thandler: %s\n" | |
143 "\tport: %lld\n", | |
144 name(), message->dest_port()); | |
145 } | |
73 | 146 |
147 // Release the monitor_ temporarily while we handle the message. | |
148 // The monitor was acquired in MessageHandler::TaskCallback(). | |
149 monitor_.Exit(); | |
150 Message::Priority saved_priority = message->priority(); | |
151 result = HandleMessage(message); | |
152 // ASSERT(Isolate::Current() == NULL); | |
153 monitor_.Enter(); | |
74 | 154 |
75 MessageQueue::MessageQueue() { | 155 if (!result) { |
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 156 // If we hit an error, we're done processing messages. |
77 head_[p] = NULL; | 157 break; |
78 tail_[p] = NULL; | |
79 } | |
80 } | |
81 | |
82 | |
83 MessageQueue::~MessageQueue() { | |
84 // Ensure that all pending messages have been released. | |
85 #if defined(DEBUG) | |
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | |
87 ASSERT(head_[p] == NULL); | |
88 } | |
89 #endif | |
90 } | |
91 | |
92 | |
93 void MessageQueue::Enqueue(Message* msg) { | |
94 MonitorLocker ml(&monitor_); | |
95 Message::Priority p = msg->priority(); | |
96 // Make sure messages are not reused. | |
97 ASSERT(msg->next_ == NULL); | |
98 if (head_[p] == NULL) { | |
99 // Only element in the queue. | |
100 head_[p] = msg; | |
101 tail_[p] = msg; | |
102 | |
103 // We only need to notify if the queue was empty. | |
104 monitor_.Notify(); | |
105 } else { | |
106 ASSERT(tail_[p] != NULL); | |
107 // Append at the tail. | |
108 tail_[p]->next_ = msg; | |
109 tail_[p] = msg; | |
110 } | |
111 } | |
112 | |
113 | |
114 Message* MessageQueue::DequeueNoWait() { | |
115 MonitorLocker ml(&monitor_); | |
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
117 } | |
118 | |
119 | |
120 Message* MessageQueue::DequeueNoWaitWithPriority( | |
121 Message::Priority min_priority) { | |
122 MonitorLocker ml(&monitor_); | |
123 return DequeueNoWaitHoldsLock(min_priority); | |
124 } | |
125 | |
126 | |
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { | |
128 // Look for the highest priority available message. | |
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { | |
130 Message* result = head_[p]; | |
131 if (result != NULL) { | |
132 head_[p] = result->next_; | |
133 // The following update to tail_ is not strictly needed. | |
134 if (head_[p] == NULL) { | |
135 tail_[p] = NULL; | |
136 } | |
137 #if defined(DEBUG) | |
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue. | |
139 #endif // DEBUG | |
140 return result; | |
141 } | 158 } |
142 } | 159 if (!allow_multiple_normal_messages && |
143 return NULL; | 160 saved_priority == Message::kNormalPriority) { |
144 } | 161 // Some callers want to process only one normal message and then quit. |
145 | 162 break; |
146 | 163 } |
147 Message* MessageQueue::Dequeue(int64_t millis) { | 164 message = DequeueMessage(min_priority); |
148 ASSERT(millis >= 0); | |
149 MonitorLocker ml(&monitor_); | |
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
151 if (result == NULL) { | |
152 // No message available at any priority. | |
153 monitor_.Wait(millis); | |
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
155 } | 165 } |
156 return result; | 166 return result; |
157 } | 167 } |
158 | 168 |
159 | 169 |
160 void MessageQueue::Flush(Dart_Port port) { | 170 bool MessageHandler::HandleNextMessage() { |
171 // We can only call HandleNextMessage when this handler is not | |
172 // assigned to a thread pool. | |
173 ASSERT(pool_ == NULL); | |
siva
2012/04/18 22:05:00
The assert should be after grabbing the lock?
turnidge
2012/04/19 19:46:55
Done.
| |
161 MonitorLocker ml(&monitor_); | 174 MonitorLocker ml(&monitor_); |
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 175 #if defined(DEBUG) |
163 Message* cur = head_[p]; | 176 CheckAccess(); |
164 Message* prev = NULL; | 177 #endif |
165 while (cur != NULL) { | 178 return HandleMessages(true, false); |
166 Message* next = cur->next_; | 179 } |
167 // If the message matches, then remove it from the queue and delete it. | 180 |
168 if (cur->dest_port() == port) { | 181 |
169 if (prev != NULL) { | 182 bool MessageHandler::HandleOOBMessages() { |
170 prev->next_ = next; | 183 MonitorLocker ml(&monitor_); |
171 } else { | 184 #if defined(DEBUG) |
172 head_[p] = next; | 185 CheckAccess(); |
173 } | 186 #endif |
174 delete cur; | 187 return HandleMessages(false, false); |
175 } else { | 188 } |
176 // Move prev forward. | 189 |
177 prev = cur; | 190 |
191 void MessageHandler::TaskCallback() { | |
192 ASSERT(Isolate::Current() == NULL); | |
193 bool ok = true; | |
194 bool run_end_callback = false; | |
195 { | |
196 MonitorLocker ml(&monitor_); | |
197 // Initialize the message handler by running its start function, | |
198 // if we have one. For an isolate, this will run the isolate's | |
199 // main() function. | |
200 if (start_callback_) { | |
201 monitor_.Exit(); | |
202 ok = start_callback_(callback_data_); | |
203 ASSERT(Isolate::Current() == NULL); | |
204 start_callback_ = NULL; | |
205 monitor_.Enter(); | |
206 } | |
207 | |
208 // Handle any pending messages for this message handler. | |
209 if (ok) { | |
210 ok = HandleMessages(true, true); | |
211 } | |
212 task_ = NULL; // No task in queue. | |
213 | |
214 if (!ok || !HasLivePorts()) { | |
215 if (FLAG_trace_isolates) { | |
216 OS::Print("[-] Stopping message handler (%s):\n" | |
217 "\thandler: %s\n", | |
218 (ok ? "no live ports" : "error"), | |
219 name()); | |
178 } | 220 } |
179 // Advance to the next message in the queue. | 221 pool_ = NULL; |
180 cur = next; | 222 run_end_callback = true; |
181 } | 223 } |
182 tail_[p] = prev; | 224 } |
225 if (run_end_callback && end_callback_ != NULL) { | |
226 end_callback_(callback_data_); | |
227 // The handler may have been deleted after this point. | |
183 } | 228 } |
184 } | 229 } |
185 | 230 |
186 | 231 |
187 void MessageQueue::FlushAll() { | 232 void MessageHandler::ClosePort(Dart_Port port) { |
188 MonitorLocker ml(&monitor_); | 233 MonitorLocker ml(&monitor_); |
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 234 if (FLAG_trace_isolates) { |
190 Message* cur = head_[p]; | 235 OS::Print("[-] Closing port:\n" |
191 head_[p] = NULL; | 236 "\thandler: %s\n" |
192 tail_[p] = NULL; | 237 "\tport: %d\n", |
193 while (cur != NULL) { | 238 name(), port); |
194 Message* next = cur->next_; | |
195 delete cur; | |
196 cur = next; | |
197 } | |
198 } | 239 } |
240 queue_->Flush(port); | |
241 oob_queue_->Flush(port); | |
199 } | 242 } |
200 | 243 |
201 | 244 |
245 void MessageHandler::CloseAllPorts() { | |
246 MonitorLocker ml(&monitor_); | |
247 if (FLAG_trace_isolates) { | |
248 OS::Print("[-] Closing all ports:\n" | |
249 "\thandler: %s\n", | |
250 name()); | |
251 } | |
252 queue_->FlushAll(); | |
253 oob_queue_->FlushAll(); | |
254 } | |
255 | |
256 | |
257 void MessageHandler::increment_live_ports() { | |
258 MonitorLocker ml(&monitor_); | |
259 #if defined(DEBUG) | |
260 CheckAccess(); | |
261 #endif | |
262 live_ports_++; | |
263 } | |
264 | |
265 | |
266 void MessageHandler::decrement_live_ports() { | |
267 MonitorLocker ml(&monitor_); | |
268 #if defined(DEBUG) | |
269 CheckAccess(); | |
270 #endif | |
271 live_ports_--; | |
272 } | |
273 | |
202 } // namespace dart | 274 } // namespace dart |
OLD | NEW |