OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message.h" | 5 #include "vm/message_handler.h" |
| 6 #include "vm/dart.h" |
6 | 7 |
7 namespace dart { | 8 namespace dart { |
8 | 9 |
9 DECLARE_FLAG(bool, trace_isolates); | 10 DECLARE_FLAG(bool, trace_isolates); |
10 | 11 |
11 | 12 |
| 13 class MessageHandlerTask : public ThreadPool::Task { |
| 14 public: |
| 15 explicit MessageHandlerTask(MessageHandler* handler) |
| 16 : handler_(handler) { |
| 17 ASSERT(handler != NULL); |
| 18 } |
| 19 |
| 20 void Run() { |
| 21 handler_->TaskCallback(); |
| 22 } |
| 23 |
| 24 private: |
| 25 MessageHandler* handler_; |
| 26 |
| 27 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
| 28 }; |
| 29 |
| 30 |
12 MessageHandler::MessageHandler() | 31 MessageHandler::MessageHandler() |
13 : live_ports_(0), | 32 : queue_(new MessageQueue()), |
14 queue_(new MessageQueue()) { | 33 oob_queue_(new MessageQueue()), |
| 34 live_ports_(0), |
| 35 pool_(NULL), |
| 36 task_(NULL), |
| 37 start_callback_(NULL), |
| 38 end_callback_(NULL), |
| 39 callback_data_(NULL) { |
15 ASSERT(queue_ != NULL); | 40 ASSERT(queue_ != NULL); |
| 41 ASSERT(oob_queue_ != NULL); |
16 } | 42 } |
17 | 43 |
18 | 44 |
19 MessageHandler::~MessageHandler() { | 45 MessageHandler::~MessageHandler() { |
20 delete queue_; | 46 delete queue_; |
| 47 delete oob_queue_; |
21 } | 48 } |
22 | 49 |
23 | 50 |
24 const char* MessageHandler::name() const { | 51 const char* MessageHandler::name() const { |
25 return "<unnamed>"; | 52 return "<unnamed>"; |
26 } | 53 } |
27 | 54 |
28 | 55 |
29 #if defined(DEBUG) | 56 #if defined(DEBUG) |
30 void MessageHandler::CheckAccess() { | 57 void MessageHandler::CheckAccess() { |
31 // By default there is no checking. | 58 // By default there is no checking. |
32 } | 59 } |
33 #endif | 60 #endif |
34 | 61 |
35 | 62 |
36 void MessageHandler::MessageNotify(Message::Priority priority) { | 63 void MessageHandler::MessageNotify(Message::Priority priority) { |
37 // By default, there is no custom message notification. | 64 // By default, there is no custom message notification. |
38 } | 65 } |
39 | 66 |
40 | 67 |
| 68 void MessageHandler::Run(ThreadPool* pool, |
| 69 StartCallback start_callback, |
| 70 EndCallback end_callback, |
| 71 CallbackData data) { |
| 72 MonitorLocker ml(&monitor_); |
| 73 if (FLAG_trace_isolates) { |
| 74 OS::Print("[+] Starting message handler:\n" |
| 75 "\thandler: %s\n", |
| 76 name()); |
| 77 } |
| 78 ASSERT(pool_ == NULL); |
| 79 pool_ = pool; |
| 80 start_callback_ = start_callback; |
| 81 end_callback_ = end_callback; |
| 82 callback_data_ = data; |
| 83 task_ = new MessageHandlerTask(this); |
| 84 pool_->Run(task_); |
| 85 } |
| 86 |
| 87 |
41 void MessageHandler::PostMessage(Message* message) { | 88 void MessageHandler::PostMessage(Message* message) { |
| 89 MonitorLocker ml(&monitor_); |
42 if (FLAG_trace_isolates) { | 90 if (FLAG_trace_isolates) { |
43 const char* source_name = "<native code>"; | 91 const char* source_name = "<native code>"; |
44 Isolate* source_isolate = Isolate::Current(); | 92 Isolate* source_isolate = Isolate::Current(); |
45 if (source_isolate) { | 93 if (source_isolate) { |
46 source_name = source_isolate->name(); | 94 source_name = source_isolate->name(); |
47 } | 95 } |
48 OS::Print("[>] Posting message:\n" | 96 OS::Print("[>] Posting message:\n" |
49 "\tsource: %s\n" | 97 "\tsource: %s\n" |
50 "\treply_port: %lld\n" | 98 "\treply_port: %lld\n" |
51 "\tdest: %s\n" | 99 "\tdest: %s\n" |
52 "\tdest_port: %lld\n", | 100 "\tdest_port: %lld\n", |
53 source_name, message->reply_port(), name(), message->dest_port()); | 101 source_name, message->reply_port(), name(), message->dest_port()); |
54 } | 102 } |
55 | 103 |
56 Message::Priority priority = message->priority(); | 104 Message::Priority saved_priority = message->priority(); |
57 queue()->Enqueue(message); | 105 if (message->IsOOB()) { |
| 106 oob_queue_->Enqueue(message); |
| 107 } else { |
| 108 queue_->Enqueue(message); |
| 109 } |
58 message = NULL; // Do not access message. May have been deleted. | 110 message = NULL; // Do not access message. May have been deleted. |
59 | 111 |
| 112 if (pool_ != NULL && task_ == NULL) { |
| 113 task_ = new MessageHandlerTask(this); |
| 114 pool_->Run(task_); |
| 115 } |
| 116 |
60 // Invoke any custom message notification. | 117 // Invoke any custom message notification. |
61 MessageNotify(priority); | 118 MessageNotify(saved_priority); |
62 } | 119 } |
63 | 120 |
64 | 121 |
65 void MessageHandler::ClosePort(Dart_Port port) { | 122 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
66 queue()->Flush(port); | 123 // TODO(turnidge): Add assert that monitor_ is held here. |
| 124 Message* message = oob_queue_->Dequeue(); |
| 125 if (message == NULL && min_priority < Message::kOOBPriority) { |
| 126 message = queue_->Dequeue(); |
| 127 } |
| 128 return message; |
67 } | 129 } |
68 | 130 |
69 | 131 |
70 void MessageHandler::CloseAllPorts() { | 132 bool MessageHandler::HandleMessages(bool allow_normal_messages, |
71 queue()->FlushAll(); | 133 bool allow_multiple_normal_messages) { |
72 } | 134 // TODO(turnidge): Add assert that monitor_ is held here. |
| 135 bool result = true; |
| 136 Message::Priority min_priority = (allow_normal_messages |
| 137 ? Message::kNormalPriority |
| 138 : Message::kOOBPriority); |
| 139 Message* message = DequeueMessage(min_priority); |
| 140 while (message) { |
| 141 if (FLAG_trace_isolates) { |
| 142 OS::Print("[<] Handling message:\n" |
| 143 "\thandler: %s\n" |
| 144 "\tport: %lld\n", |
| 145 name(), message->dest_port()); |
| 146 } |
73 | 147 |
| 148 // Release the monitor_ temporarily while we handle the message. |
| 149 // The monitor was acquired in MessageHandler::TaskCallback(). |
| 150 monitor_.Exit(); |
| 151 Message::Priority saved_priority = message->priority(); |
| 152 result = HandleMessage(message); |
| 153 // ASSERT(Isolate::Current() == NULL); |
| 154 monitor_.Enter(); |
74 | 155 |
75 MessageQueue::MessageQueue() { | 156 if (!result) { |
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 157 // If we hit an error, we're done processing messages. |
77 head_[p] = NULL; | 158 break; |
78 tail_[p] = NULL; | |
79 } | |
80 } | |
81 | |
82 | |
83 MessageQueue::~MessageQueue() { | |
84 // Ensure that all pending messages have been released. | |
85 #if defined(DEBUG) | |
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | |
87 ASSERT(head_[p] == NULL); | |
88 } | |
89 #endif | |
90 } | |
91 | |
92 | |
93 void MessageQueue::Enqueue(Message* msg) { | |
94 MonitorLocker ml(&monitor_); | |
95 Message::Priority p = msg->priority(); | |
96 // Make sure messages are not reused. | |
97 ASSERT(msg->next_ == NULL); | |
98 if (head_[p] == NULL) { | |
99 // Only element in the queue. | |
100 head_[p] = msg; | |
101 tail_[p] = msg; | |
102 | |
103 // We only need to notify if the queue was empty. | |
104 monitor_.Notify(); | |
105 } else { | |
106 ASSERT(tail_[p] != NULL); | |
107 // Append at the tail. | |
108 tail_[p]->next_ = msg; | |
109 tail_[p] = msg; | |
110 } | |
111 } | |
112 | |
113 | |
114 Message* MessageQueue::DequeueNoWait() { | |
115 MonitorLocker ml(&monitor_); | |
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
117 } | |
118 | |
119 | |
120 Message* MessageQueue::DequeueNoWaitWithPriority( | |
121 Message::Priority min_priority) { | |
122 MonitorLocker ml(&monitor_); | |
123 return DequeueNoWaitHoldsLock(min_priority); | |
124 } | |
125 | |
126 | |
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { | |
128 // Look for the highest priority available message. | |
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { | |
130 Message* result = head_[p]; | |
131 if (result != NULL) { | |
132 head_[p] = result->next_; | |
133 // The following update to tail_ is not strictly needed. | |
134 if (head_[p] == NULL) { | |
135 tail_[p] = NULL; | |
136 } | |
137 #if defined(DEBUG) | |
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue. | |
139 #endif // DEBUG | |
140 return result; | |
141 } | 159 } |
142 } | 160 if (!allow_multiple_normal_messages && |
143 return NULL; | 161 saved_priority == Message::kNormalPriority) { |
144 } | 162 // Some callers want to process only one normal message and then quit. |
145 | 163 break; |
146 | 164 } |
147 Message* MessageQueue::Dequeue(int64_t millis) { | 165 message = DequeueMessage(min_priority); |
148 ASSERT(millis >= 0); | |
149 MonitorLocker ml(&monitor_); | |
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
151 if (result == NULL) { | |
152 // No message available at any priority. | |
153 monitor_.Wait(millis); | |
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
155 } | 166 } |
156 return result; | 167 return result; |
157 } | 168 } |
158 | 169 |
159 | 170 |
160 void MessageQueue::Flush(Dart_Port port) { | 171 bool MessageHandler::HandleNextMessage() { |
| 172 // We can only call HandleNextMessage when this handler is not |
| 173 // assigned to a thread pool. |
161 MonitorLocker ml(&monitor_); | 174 MonitorLocker ml(&monitor_); |
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 175 ASSERT(pool_ == NULL); |
163 Message* cur = head_[p]; | 176 #if defined(DEBUG) |
164 Message* prev = NULL; | 177 CheckAccess(); |
165 while (cur != NULL) { | 178 #endif |
166 Message* next = cur->next_; | 179 return HandleMessages(true, false); |
167 // If the message matches, then remove it from the queue and delete it. | 180 } |
168 if (cur->dest_port() == port) { | 181 |
169 if (prev != NULL) { | 182 |
170 prev->next_ = next; | 183 bool MessageHandler::HandleOOBMessages() { |
171 } else { | 184 MonitorLocker ml(&monitor_); |
172 head_[p] = next; | 185 #if defined(DEBUG) |
173 } | 186 CheckAccess(); |
174 delete cur; | 187 #endif |
175 } else { | 188 return HandleMessages(false, false); |
176 // Move prev forward. | 189 } |
177 prev = cur; | 190 |
| 191 |
| 192 void MessageHandler::TaskCallback() { |
| 193 ASSERT(Isolate::Current() == NULL); |
| 194 bool ok = true; |
| 195 bool run_end_callback = false; |
| 196 { |
| 197 MonitorLocker ml(&monitor_); |
| 198 // Initialize the message handler by running its start function, |
| 199 // if we have one. For an isolate, this will run the isolate's |
| 200 // main() function. |
| 201 if (start_callback_) { |
| 202 monitor_.Exit(); |
| 203 ok = start_callback_(callback_data_); |
| 204 ASSERT(Isolate::Current() == NULL); |
| 205 start_callback_ = NULL; |
| 206 monitor_.Enter(); |
| 207 } |
| 208 |
| 209 // Handle any pending messages for this message handler. |
| 210 if (ok) { |
| 211 ok = HandleMessages(true, true); |
| 212 } |
| 213 task_ = NULL; // No task in queue. |
| 214 |
| 215 if (!ok || !HasLivePorts()) { |
| 216 if (FLAG_trace_isolates) { |
| 217 OS::Print("[-] Stopping message handler (%s):\n" |
| 218 "\thandler: %s\n", |
| 219 (ok ? "no live ports" : "error"), |
| 220 name()); |
178 } | 221 } |
179 // Advance to the next message in the queue. | 222 pool_ = NULL; |
180 cur = next; | 223 run_end_callback = true; |
181 } | 224 } |
182 tail_[p] = prev; | 225 } |
| 226 if (run_end_callback && end_callback_ != NULL) { |
| 227 end_callback_(callback_data_); |
| 228 // The handler may have been deleted after this point. |
183 } | 229 } |
184 } | 230 } |
185 | 231 |
186 | 232 |
187 void MessageQueue::FlushAll() { | 233 void MessageHandler::ClosePort(Dart_Port port) { |
188 MonitorLocker ml(&monitor_); | 234 MonitorLocker ml(&monitor_); |
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 235 if (FLAG_trace_isolates) { |
190 Message* cur = head_[p]; | 236 OS::Print("[-] Closing port:\n" |
191 head_[p] = NULL; | 237 "\thandler: %s\n" |
192 tail_[p] = NULL; | 238 "\tport: %d\n", |
193 while (cur != NULL) { | 239 name(), port); |
194 Message* next = cur->next_; | |
195 delete cur; | |
196 cur = next; | |
197 } | |
198 } | 240 } |
| 241 queue_->Flush(port); |
| 242 oob_queue_->Flush(port); |
199 } | 243 } |
200 | 244 |
201 | 245 |
| 246 void MessageHandler::CloseAllPorts() { |
| 247 MonitorLocker ml(&monitor_); |
| 248 if (FLAG_trace_isolates) { |
| 249 OS::Print("[-] Closing all ports:\n" |
| 250 "\thandler: %s\n", |
| 251 name()); |
| 252 } |
| 253 queue_->FlushAll(); |
| 254 oob_queue_->FlushAll(); |
| 255 } |
| 256 |
| 257 |
| 258 void MessageHandler::increment_live_ports() { |
| 259 MonitorLocker ml(&monitor_); |
| 260 #if defined(DEBUG) |
| 261 CheckAccess(); |
| 262 #endif |
| 263 live_ports_++; |
| 264 } |
| 265 |
| 266 |
| 267 void MessageHandler::decrement_live_ports() { |
| 268 MonitorLocker ml(&monitor_); |
| 269 #if defined(DEBUG) |
| 270 CheckAccess(); |
| 271 #endif |
| 272 live_ports_--; |
| 273 } |
| 274 |
202 } // namespace dart | 275 } // namespace dart |
OLD | NEW |