Index: runtime/vm/message_handler.cc |
=================================================================== |
--- runtime/vm/message_handler.cc (revision 5489) |
+++ runtime/vm/message_handler.cc (working copy) |
@@ -2,22 +2,49 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-#include "vm/message.h" |
+#include "vm/message_handler.h" |
+#include "vm/dart.h" |
namespace dart { |
DECLARE_FLAG(bool, trace_isolates); |
+class MessageHandlerTask : public ThreadPool::Task { |
+ public: |
+ explicit MessageHandlerTask(MessageHandler* handler) |
+ : handler_(handler) { |
+ ASSERT(handler != NULL); |
+ } |
+ |
+ void Run() { |
+ handler_->TaskCallback(); |
+ } |
+ |
+ private: |
+ MessageHandler* handler_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
+}; |
+ |
+ |
MessageHandler::MessageHandler() |
- : live_ports_(0), |
- queue_(new MessageQueue()) { |
+ : queue_(new MessageQueue()), |
+ oob_queue_(new MessageQueue()), |
+ live_ports_(0), |
+ pool_(NULL), |
+ task_(NULL), |
+ start_callback_(NULL), |
+ end_callback_(NULL), |
+ callback_data_(NULL) { |
ASSERT(queue_ != NULL); |
+ ASSERT(oob_queue_ != NULL); |
} |
MessageHandler::~MessageHandler() { |
delete queue_; |
+ delete oob_queue_; |
} |
@@ -38,7 +65,28 @@ |
} |
+void MessageHandler::Run(ThreadPool* pool, |
+ StartCallback start_callback, |
+ EndCallback end_callback, |
+ CallbackData data) { |
+ MonitorLocker ml(&monitor_); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[+] Starting message handler:\n" |
+ "\thandler: %s\n", |
+ name()); |
+ } |
+ ASSERT(pool_ == NULL); |
+ pool_ = pool; |
+ start_callback_ = start_callback; |
+ end_callback_ = end_callback; |
+ callback_data_ = data; |
+ task_ = new MessageHandlerTask(this); |
+ pool_->Run(task_); |
+} |
+ |
+ |
void MessageHandler::PostMessage(Message* message) { |
+ MonitorLocker ml(&monitor_); |
if (FLAG_trace_isolates) { |
const char* source_name = "<native code>"; |
Isolate* source_isolate = Isolate::Current(); |
@@ -53,150 +101,175 @@ |
source_name, message->reply_port(), name(), message->dest_port()); |
} |
- Message::Priority priority = message->priority(); |
- queue()->Enqueue(message); |
+ Message::Priority saved_priority = message->priority(); |
+ if (message->IsOOB()) { |
+ oob_queue_->Enqueue(message); |
+ } else { |
+ queue_->Enqueue(message); |
+ } |
message = NULL; // Do not access message. May have been deleted. |
+ if (pool_ != NULL && task_ == NULL) { |
+ task_ = new MessageHandlerTask(this); |
+ pool_->Run(task_); |
+ } |
+ |
// Invoke any custom message notification. |
- MessageNotify(priority); |
+ MessageNotify(saved_priority); |
} |
-void MessageHandler::ClosePort(Dart_Port port) { |
- queue()->Flush(port); |
+Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
+ // TODO(turnidge): Add assert that monitor_ is held here. |
+ Message* message = oob_queue_->Dequeue(); |
+ if (message == NULL && min_priority < Message::kOOBPriority) { |
+ message = queue_->Dequeue(); |
+ } |
+ return message; |
} |
-void MessageHandler::CloseAllPorts() { |
- queue()->FlushAll(); |
-} |
+bool MessageHandler::HandleMessages(bool allow_normal_messages, |
+ bool allow_multiple_normal_messages) { |
+ // TODO(turnidge): Add assert that monitor_ is held here. |
+ bool result = true; |
+ Message::Priority min_priority = (allow_normal_messages |
+ ? Message::kNormalPriority |
+ : Message::kOOBPriority); |
+ Message* message = DequeueMessage(min_priority); |
+ while (message) { |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[<] Handling message:\n" |
+ "\thandler: %s\n" |
+ "\tport: %lld\n", |
+ name(), message->dest_port()); |
+ } |
+ // Release the monitor_ temporarily while we handle the message. |
+ // The monitor was acquired in MessageHandler::TaskCallback(). |
+ monitor_.Exit(); |
+ Message::Priority saved_priority = message->priority(); |
+ result = HandleMessage(message); |
+ // ASSERT(Isolate::Current() == NULL); |
+ monitor_.Enter(); |
-MessageQueue::MessageQueue() { |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- head_[p] = NULL; |
- tail_[p] = NULL; |
+ if (!result) { |
+ // If we hit an error, we're done processing messages. |
+ break; |
+ } |
+ if (!allow_multiple_normal_messages && |
+ saved_priority == Message::kNormalPriority) { |
+ // Some callers want to process only one normal message and then quit. |
+ break; |
+ } |
+ message = DequeueMessage(min_priority); |
} |
+ return result; |
} |
-MessageQueue::~MessageQueue() { |
- // Ensure that all pending messages have been released. |
+bool MessageHandler::HandleNextMessage() { |
+ // We can only call HandleNextMessage when this handler is not |
+ // assigned to a thread pool. |
+ MonitorLocker ml(&monitor_); |
+ ASSERT(pool_ == NULL); |
#if defined(DEBUG) |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- ASSERT(head_[p] == NULL); |
- } |
+ CheckAccess(); |
#endif |
+ return HandleMessages(true, false); |
} |
-void MessageQueue::Enqueue(Message* msg) { |
+bool MessageHandler::HandleOOBMessages() { |
MonitorLocker ml(&monitor_); |
- Message::Priority p = msg->priority(); |
- // Make sure messages are not reused. |
- ASSERT(msg->next_ == NULL); |
- if (head_[p] == NULL) { |
- // Only element in the queue. |
- head_[p] = msg; |
- tail_[p] = msg; |
- |
- // We only need to notify if the queue was empty. |
- monitor_.Notify(); |
- } else { |
- ASSERT(tail_[p] != NULL); |
- // Append at the tail. |
- tail_[p]->next_ = msg; |
- tail_[p] = msg; |
- } |
+#if defined(DEBUG) |
+ CheckAccess(); |
+#endif |
+ return HandleMessages(false, false); |
} |
-Message* MessageQueue::DequeueNoWait() { |
- MonitorLocker ml(&monitor_); |
- return DequeueNoWaitHoldsLock(Message::kFirstPriority); |
-} |
+void MessageHandler::TaskCallback() { |
+ ASSERT(Isolate::Current() == NULL); |
+ bool ok = true; |
+ bool run_end_callback = false; |
+ { |
+ MonitorLocker ml(&monitor_); |
+ // Initialize the message handler by running its start function, |
+ // if we have one. For an isolate, this will run the isolate's |
+ // main() function. |
+ if (start_callback_) { |
+ monitor_.Exit(); |
+ ok = start_callback_(callback_data_); |
+ ASSERT(Isolate::Current() == NULL); |
+ start_callback_ = NULL; |
+ monitor_.Enter(); |
+ } |
+ // Handle any pending messages for this message handler. |
+ if (ok) { |
+ ok = HandleMessages(true, true); |
+ } |
+ task_ = NULL; // No task in queue. |
-Message* MessageQueue::DequeueNoWaitWithPriority( |
- Message::Priority min_priority) { |
- MonitorLocker ml(&monitor_); |
- return DequeueNoWaitHoldsLock(min_priority); |
-} |
- |
- |
-Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { |
- // Look for the highest priority available message. |
- for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { |
- Message* result = head_[p]; |
- if (result != NULL) { |
- head_[p] = result->next_; |
- // The following update to tail_ is not strictly needed. |
- if (head_[p] == NULL) { |
- tail_[p] = NULL; |
+ if (!ok || !HasLivePorts()) { |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Stopping message handler (%s):\n" |
+ "\thandler: %s\n", |
+ (ok ? "no live ports" : "error"), |
+ name()); |
} |
-#if defined(DEBUG) |
- result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
-#endif // DEBUG |
- return result; |
+ pool_ = NULL; |
+ run_end_callback = true; |
} |
} |
- return NULL; |
+ if (run_end_callback && end_callback_ != NULL) { |
+ end_callback_(callback_data_); |
+ // The handler may have been deleted after this point. |
+ } |
} |
-Message* MessageQueue::Dequeue(int64_t millis) { |
- ASSERT(millis >= 0); |
+void MessageHandler::ClosePort(Dart_Port port) { |
MonitorLocker ml(&monitor_); |
- Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); |
- if (result == NULL) { |
- // No message available at any priority. |
- monitor_.Wait(millis); |
- result = DequeueNoWaitHoldsLock(Message::kFirstPriority); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Closing port:\n" |
+ "\thandler: %s\n" |
+ "\tport: %d\n", |
+ name(), port); |
} |
- return result; |
+ queue_->Flush(port); |
+ oob_queue_->Flush(port); |
} |
-void MessageQueue::Flush(Dart_Port port) { |
+void MessageHandler::CloseAllPorts() { |
MonitorLocker ml(&monitor_); |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- Message* cur = head_[p]; |
- Message* prev = NULL; |
- while (cur != NULL) { |
- Message* next = cur->next_; |
- // If the message matches, then remove it from the queue and delete it. |
- if (cur->dest_port() == port) { |
- if (prev != NULL) { |
- prev->next_ = next; |
- } else { |
- head_[p] = next; |
- } |
- delete cur; |
- } else { |
- // Move prev forward. |
- prev = cur; |
- } |
- // Advance to the next message in the queue. |
- cur = next; |
- } |
- tail_[p] = prev; |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Closing all ports:\n" |
+ "\thandler: %s\n", |
+ name()); |
} |
+ queue_->FlushAll(); |
+ oob_queue_->FlushAll(); |
} |
-void MessageQueue::FlushAll() { |
+void MessageHandler::increment_live_ports() { |
MonitorLocker ml(&monitor_); |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- Message* cur = head_[p]; |
- head_[p] = NULL; |
- tail_[p] = NULL; |
- while (cur != NULL) { |
- Message* next = cur->next_; |
- delete cur; |
- cur = next; |
- } |
- } |
+#if defined(DEBUG) |
+ CheckAccess(); |
+#endif |
+ live_ports_++; |
} |
+void MessageHandler::decrement_live_ports() { |
+ MonitorLocker ml(&monitor_); |
+#if defined(DEBUG) |
+ CheckAccess(); |
+#endif |
+ live_ports_--; |
+} |
+ |
} // namespace dart |