| OLD | NEW |
| 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include "vm/message.h" | 5 #include "vm/message_handler.h" |
| 6 #include "vm/dart.h" |
| 6 | 7 |
| 7 namespace dart { | 8 namespace dart { |
| 8 | 9 |
| 9 DECLARE_FLAG(bool, trace_isolates); | 10 DECLARE_FLAG(bool, trace_isolates); |
| 10 | 11 |
| 11 | 12 |
| 13 class MessageHandlerTask : public ThreadPool::Task { |
| 14 public: |
| 15 explicit MessageHandlerTask(MessageHandler* handler) |
| 16 : handler_(handler) { |
| 17 ASSERT(handler != NULL); |
| 18 } |
| 19 |
| 20 void Run() { |
| 21 handler_->TaskCallback(); |
| 22 } |
| 23 |
| 24 private: |
| 25 MessageHandler* handler_; |
| 26 |
| 27 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
| 28 }; |
| 29 |
| 30 |
| 12 MessageHandler::MessageHandler() | 31 MessageHandler::MessageHandler() |
| 13 : live_ports_(0), | 32 : queue_(new MessageQueue()), |
| 14 queue_(new MessageQueue()) { | 33 oob_queue_(new MessageQueue()), |
| 34 live_ports_(0), |
| 35 pool_(NULL), |
| 36 task_(NULL), |
| 37 start_callback_(NULL), |
| 38 end_callback_(NULL), |
| 39 callback_data_(NULL) { |
| 15 ASSERT(queue_ != NULL); | 40 ASSERT(queue_ != NULL); |
| 41 ASSERT(oob_queue_ != NULL); |
| 16 } | 42 } |
| 17 | 43 |
| 18 | 44 |
| 19 MessageHandler::~MessageHandler() { | 45 MessageHandler::~MessageHandler() { |
| 20 delete queue_; | 46 delete queue_; |
| 47 delete oob_queue_; |
| 21 } | 48 } |
| 22 | 49 |
| 23 | 50 |
| 24 const char* MessageHandler::name() const { | 51 const char* MessageHandler::name() const { |
| 25 return "<unnamed>"; | 52 return "<unnamed>"; |
| 26 } | 53 } |
| 27 | 54 |
| 28 | 55 |
| 29 #if defined(DEBUG) | 56 #if defined(DEBUG) |
| 30 void MessageHandler::CheckAccess() { | 57 void MessageHandler::CheckAccess() { |
| 31 // By default there is no checking. | 58 // By default there is no checking. |
| 32 } | 59 } |
| 33 #endif | 60 #endif |
| 34 | 61 |
| 35 | 62 |
| 36 void MessageHandler::MessageNotify(Message::Priority priority) { | 63 void MessageHandler::MessageNotify(Message::Priority priority) { |
| 37 // By default, there is no custom message notification. | 64 // By default, there is no custom message notification. |
| 38 } | 65 } |
| 39 | 66 |
| 40 | 67 |
| 68 void MessageHandler::Run(ThreadPool* pool, |
| 69 StartCallback start_callback, |
| 70 EndCallback end_callback, |
| 71 CallbackData data) { |
| 72 MonitorLocker ml(&monitor_); |
| 73 if (FLAG_trace_isolates) { |
| 74 OS::Print("[+] Starting message handler:\n" |
| 75 "\thandler: %s\n", |
| 76 name()); |
| 77 } |
| 78 ASSERT(pool_ == NULL); |
| 79 pool_ = pool; |
| 80 start_callback_ = start_callback; |
| 81 end_callback_ = end_callback; |
| 82 callback_data_ = data; |
| 83 task_ = new MessageHandlerTask(this); |
| 84 pool_->Run(task_); |
| 85 } |
| 86 |
| 87 |
| 41 void MessageHandler::PostMessage(Message* message) { | 88 void MessageHandler::PostMessage(Message* message) { |
| 89 MonitorLocker ml(&monitor_); |
| 42 if (FLAG_trace_isolates) { | 90 if (FLAG_trace_isolates) { |
| 43 const char* source_name = "<native code>"; | 91 const char* source_name = "<native code>"; |
| 44 Isolate* source_isolate = Isolate::Current(); | 92 Isolate* source_isolate = Isolate::Current(); |
| 45 if (source_isolate) { | 93 if (source_isolate) { |
| 46 source_name = source_isolate->name(); | 94 source_name = source_isolate->name(); |
| 47 } | 95 } |
| 48 OS::Print("[>] Posting message:\n" | 96 OS::Print("[>] Posting message:\n" |
| 49 "\tsource: %s\n" | 97 "\tsource: %s\n" |
| 50 "\treply_port: %lld\n" | 98 "\treply_port: %lld\n" |
| 51 "\tdest: %s\n" | 99 "\tdest: %s\n" |
| 52 "\tdest_port: %lld\n", | 100 "\tdest_port: %lld\n", |
| 53 source_name, message->reply_port(), name(), message->dest_port()); | 101 source_name, message->reply_port(), name(), message->dest_port()); |
| 54 } | 102 } |
| 55 | 103 |
| 56 Message::Priority priority = message->priority(); | 104 Message::Priority saved_priority = message->priority(); |
| 57 queue()->Enqueue(message); | 105 if (message->IsOOB()) { |
| 106 oob_queue_->Enqueue(message); |
| 107 } else { |
| 108 queue_->Enqueue(message); |
| 109 } |
| 58 message = NULL; // Do not access message. May have been deleted. | 110 message = NULL; // Do not access message. May have been deleted. |
| 59 | 111 |
| 112 if (pool_ != NULL && task_ == NULL) { |
| 113 task_ = new MessageHandlerTask(this); |
| 114 pool_->Run(task_); |
| 115 } |
| 116 |
| 60 // Invoke any custom message notification. | 117 // Invoke any custom message notification. |
| 61 MessageNotify(priority); | 118 MessageNotify(saved_priority); |
| 62 } | 119 } |
| 63 | 120 |
| 64 | 121 |
| 65 void MessageHandler::ClosePort(Dart_Port port) { | 122 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
| 66 queue()->Flush(port); | 123 // TODO(turnidge): Add assert that monitor_ is held here. |
| 124 Message* message = oob_queue_->Dequeue(); |
| 125 if (message == NULL && min_priority < Message::kOOBPriority) { |
| 126 message = queue_->Dequeue(); |
| 127 } |
| 128 return message; |
| 67 } | 129 } |
| 68 | 130 |
| 69 | 131 |
| 70 void MessageHandler::CloseAllPorts() { | 132 bool MessageHandler::HandleMessages(bool allow_normal_messages, |
| 71 queue()->FlushAll(); | 133 bool allow_multiple_normal_messages) { |
| 72 } | 134 // TODO(turnidge): Add assert that monitor_ is held here. |
| 135 bool result = true; |
| 136 Message::Priority min_priority = (allow_normal_messages |
| 137 ? Message::kNormalPriority |
| 138 : Message::kOOBPriority); |
| 139 Message* message = DequeueMessage(min_priority); |
| 140 while (message) { |
| 141 if (FLAG_trace_isolates) { |
| 142 OS::Print("[<] Handling message:\n" |
| 143 "\thandler: %s\n" |
| 144 "\tport: %lld\n", |
| 145 name(), message->dest_port()); |
| 146 } |
| 73 | 147 |
| 148 // Release the monitor_ temporarily while we handle the message. |
| 149 // The monitor was acquired in MessageHandler::TaskCallback(). |
| 150 monitor_.Exit(); |
| 151 Message::Priority saved_priority = message->priority(); |
| 152 result = HandleMessage(message); |
| 153 // ASSERT(Isolate::Current() == NULL); |
| 154 monitor_.Enter(); |
| 74 | 155 |
| 75 MessageQueue::MessageQueue() { | 156 if (!result) { |
| 76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 157 // If we hit an error, we're done processing messages. |
| 77 head_[p] = NULL; | 158 break; |
| 78 tail_[p] = NULL; | |
| 79 } | |
| 80 } | |
| 81 | |
| 82 | |
| 83 MessageQueue::~MessageQueue() { | |
| 84 // Ensure that all pending messages have been released. | |
| 85 #if defined(DEBUG) | |
| 86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | |
| 87 ASSERT(head_[p] == NULL); | |
| 88 } | |
| 89 #endif | |
| 90 } | |
| 91 | |
| 92 | |
| 93 void MessageQueue::Enqueue(Message* msg) { | |
| 94 MonitorLocker ml(&monitor_); | |
| 95 Message::Priority p = msg->priority(); | |
| 96 // Make sure messages are not reused. | |
| 97 ASSERT(msg->next_ == NULL); | |
| 98 if (head_[p] == NULL) { | |
| 99 // Only element in the queue. | |
| 100 head_[p] = msg; | |
| 101 tail_[p] = msg; | |
| 102 | |
| 103 // We only need to notify if the queue was empty. | |
| 104 monitor_.Notify(); | |
| 105 } else { | |
| 106 ASSERT(tail_[p] != NULL); | |
| 107 // Append at the tail. | |
| 108 tail_[p]->next_ = msg; | |
| 109 tail_[p] = msg; | |
| 110 } | |
| 111 } | |
| 112 | |
| 113 | |
| 114 Message* MessageQueue::DequeueNoWait() { | |
| 115 MonitorLocker ml(&monitor_); | |
| 116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
| 117 } | |
| 118 | |
| 119 | |
| 120 Message* MessageQueue::DequeueNoWaitWithPriority( | |
| 121 Message::Priority min_priority) { | |
| 122 MonitorLocker ml(&monitor_); | |
| 123 return DequeueNoWaitHoldsLock(min_priority); | |
| 124 } | |
| 125 | |
| 126 | |
| 127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { | |
| 128 // Look for the highest priority available message. | |
| 129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { | |
| 130 Message* result = head_[p]; | |
| 131 if (result != NULL) { | |
| 132 head_[p] = result->next_; | |
| 133 // The following update to tail_ is not strictly needed. | |
| 134 if (head_[p] == NULL) { | |
| 135 tail_[p] = NULL; | |
| 136 } | |
| 137 #if defined(DEBUG) | |
| 138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue. | |
| 139 #endif // DEBUG | |
| 140 return result; | |
| 141 } | 159 } |
| 142 } | 160 if (!allow_multiple_normal_messages && |
| 143 return NULL; | 161 saved_priority == Message::kNormalPriority) { |
| 144 } | 162 // Some callers want to process only one normal message and then quit. |
| 145 | 163 break; |
| 146 | 164 } |
| 147 Message* MessageQueue::Dequeue(int64_t millis) { | 165 message = DequeueMessage(min_priority); |
| 148 ASSERT(millis >= 0); | |
| 149 MonitorLocker ml(&monitor_); | |
| 150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
| 151 if (result == NULL) { | |
| 152 // No message available at any priority. | |
| 153 monitor_.Wait(millis); | |
| 154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority); | |
| 155 } | 166 } |
| 156 return result; | 167 return result; |
| 157 } | 168 } |
| 158 | 169 |
| 159 | 170 |
| 160 void MessageQueue::Flush(Dart_Port port) { | 171 bool MessageHandler::HandleNextMessage() { |
| 172 // We can only call HandleNextMessage when this handler is not |
| 173 // assigned to a thread pool. |
| 161 MonitorLocker ml(&monitor_); | 174 MonitorLocker ml(&monitor_); |
| 162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 175 ASSERT(pool_ == NULL); |
| 163 Message* cur = head_[p]; | 176 #if defined(DEBUG) |
| 164 Message* prev = NULL; | 177 CheckAccess(); |
| 165 while (cur != NULL) { | 178 #endif |
| 166 Message* next = cur->next_; | 179 return HandleMessages(true, false); |
| 167 // If the message matches, then remove it from the queue and delete it. | 180 } |
| 168 if (cur->dest_port() == port) { | 181 |
| 169 if (prev != NULL) { | 182 |
| 170 prev->next_ = next; | 183 bool MessageHandler::HandleOOBMessages() { |
| 171 } else { | 184 MonitorLocker ml(&monitor_); |
| 172 head_[p] = next; | 185 #if defined(DEBUG) |
| 173 } | 186 CheckAccess(); |
| 174 delete cur; | 187 #endif |
| 175 } else { | 188 return HandleMessages(false, false); |
| 176 // Move prev forward. | 189 } |
| 177 prev = cur; | 190 |
| 191 |
| 192 void MessageHandler::TaskCallback() { |
| 193 ASSERT(Isolate::Current() == NULL); |
| 194 bool ok = true; |
| 195 bool run_end_callback = false; |
| 196 { |
| 197 MonitorLocker ml(&monitor_); |
| 198 // Initialize the message handler by running its start function, |
| 199 // if we have one. For an isolate, this will run the isolate's |
| 200 // main() function. |
| 201 if (start_callback_) { |
| 202 monitor_.Exit(); |
| 203 ok = start_callback_(callback_data_); |
| 204 ASSERT(Isolate::Current() == NULL); |
| 205 start_callback_ = NULL; |
| 206 monitor_.Enter(); |
| 207 } |
| 208 |
| 209 // Handle any pending messages for this message handler. |
| 210 if (ok) { |
| 211 ok = HandleMessages(true, true); |
| 212 } |
| 213 task_ = NULL; // No task in queue. |
| 214 |
| 215 if (!ok || !HasLivePorts()) { |
| 216 if (FLAG_trace_isolates) { |
| 217 OS::Print("[-] Stopping message handler (%s):\n" |
| 218 "\thandler: %s\n", |
| 219 (ok ? "no live ports" : "error"), |
| 220 name()); |
| 178 } | 221 } |
| 179 // Advance to the next message in the queue. | 222 pool_ = NULL; |
| 180 cur = next; | 223 run_end_callback = true; |
| 181 } | 224 } |
| 182 tail_[p] = prev; | 225 } |
| 226 if (run_end_callback && end_callback_ != NULL) { |
| 227 end_callback_(callback_data_); |
| 228 // The handler may have been deleted after this point. |
| 183 } | 229 } |
| 184 } | 230 } |
| 185 | 231 |
| 186 | 232 |
| 187 void MessageQueue::FlushAll() { | 233 void MessageHandler::ClosePort(Dart_Port port) { |
| 188 MonitorLocker ml(&monitor_); | 234 MonitorLocker ml(&monitor_); |
| 189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { | 235 if (FLAG_trace_isolates) { |
| 190 Message* cur = head_[p]; | 236 OS::Print("[-] Closing port:\n" |
| 191 head_[p] = NULL; | 237 "\thandler: %s\n" |
| 192 tail_[p] = NULL; | 238 "\tport: %d\n", |
| 193 while (cur != NULL) { | 239 name(), port); |
| 194 Message* next = cur->next_; | |
| 195 delete cur; | |
| 196 cur = next; | |
| 197 } | |
| 198 } | 240 } |
| 241 queue_->Flush(port); |
| 242 oob_queue_->Flush(port); |
| 199 } | 243 } |
| 200 | 244 |
| 201 | 245 |
| 246 void MessageHandler::CloseAllPorts() { |
| 247 MonitorLocker ml(&monitor_); |
| 248 if (FLAG_trace_isolates) { |
| 249 OS::Print("[-] Closing all ports:\n" |
| 250 "\thandler: %s\n", |
| 251 name()); |
| 252 } |
| 253 queue_->FlushAll(); |
| 254 oob_queue_->FlushAll(); |
| 255 } |
| 256 |
| 257 |
| 258 void MessageHandler::increment_live_ports() { |
| 259 MonitorLocker ml(&monitor_); |
| 260 #if defined(DEBUG) |
| 261 CheckAccess(); |
| 262 #endif |
| 263 live_ports_++; |
| 264 } |
| 265 |
| 266 |
| 267 void MessageHandler::decrement_live_ports() { |
| 268 MonitorLocker ml(&monitor_); |
| 269 #if defined(DEBUG) |
| 270 CheckAccess(); |
| 271 #endif |
| 272 live_ports_--; |
| 273 } |
| 274 |
| 202 } // namespace dart | 275 } // namespace dart |
| OLD | NEW |