Index: runtime/vm/message_handler.cc |
=================================================================== |
--- runtime/vm/message_handler.cc (revision 5489) |
+++ runtime/vm/message_handler.cc (working copy) |
@@ -2,22 +2,47 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-#include "vm/message.h" |
+#include "vm/message_handler.h" |
+#include "vm/dart.h" |
namespace dart { |
DECLARE_FLAG(bool, trace_isolates); |
+class MessageHandlerTask : public ThreadPool::Task { |
+ public: |
+ explicit MessageHandlerTask(MessageHandler* handler) |
+ : handler_(handler) { |
+ } |
+ |
+ void Run() { |
+ handler_->TaskCallback(); |
+ } |
+ |
+ private: |
+ MessageHandler* handler_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
+}; |
+ |
+ |
MessageHandler::MessageHandler() |
- : live_ports_(0), |
- queue_(new MessageQueue()) { |
+ : queue_(new MessageQueue()), |
+ oob_queue_(new MessageQueue()), |
+ pool_(NULL), |
+ blocking_(false), |
+ task_(NULL), |
+ start_function_(NULL), |
+ start_data_(NULL), |
+ live_ports_(0) { |
ASSERT(queue_ != NULL); |
} |
MessageHandler::~MessageHandler() { |
delete queue_; |
+ delete oob_queue_; |
} |
@@ -38,7 +63,54 @@ |
} |
+void MessageHandler::Run(ThreadPool* pool, |
+ StartFunction function, StartData data) { |
+ { |
+ MonitorLocker ml(&monitor_); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[+] Starting message handler:\n" |
+ "\thandler: %s\n", |
+ name()); |
+ } |
+ ASSERT(pool_ == NULL); |
+ pool_ = pool; |
+ blocking_ = false; |
+ start_function_ = function; |
+ start_data_ = data; |
+ task_ = new MessageHandlerTask(this); |
+ } |
+ pool_->Run(task_); |
+} |
+ |
+ |
+void MessageHandler::RunBlocking(ThreadPool* pool, |
+ StartFunction function, StartData data) { |
+ { |
+ MonitorLocker ml(&monitor_); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[+] Starting message handler:\n" |
+ "\thandler: %s\n", |
+ name()); |
+ } |
+ ASSERT(pool_ == NULL); |
+ pool_ = pool; |
+ blocking_ = true; |
+ start_function_ = function; |
+ start_data_ = data; |
+ task_ = new MessageHandlerTask(this); |
+ } |
+ pool_->Run(task_); |
+ { |
+ MonitorLocker ml(&monitor_); |
+ while (pool_ != NULL) { |
+ ml.Wait(); |
+ } |
+ } |
+} |
+ |
+ |
void MessageHandler::PostMessage(Message* message) { |
+ MonitorLocker ml(&monitor_); |
if (FLAG_trace_isolates) { |
const char* source_name = "<native code>"; |
Isolate* source_isolate = Isolate::Current(); |
@@ -54,149 +126,201 @@ |
} |
Message::Priority priority = message->priority(); |
- queue()->Enqueue(message); |
+ if (priority == Message::kOOBPriority) { |
+ oob_queue_->Enqueue(message); |
+ } else { |
+ queue_->Enqueue(message); |
+ } |
message = NULL; // Do not access message. May have been deleted. |
+ if (pool_ != NULL && task_ == NULL) { |
+ task_ = new MessageHandlerTask(this); |
+ pool_->Run(task_); |
+ } |
+ |
// Invoke any custom message notification. |
MessageNotify(priority); |
} |
-void MessageHandler::ClosePort(Dart_Port port) { |
- queue()->Flush(port); |
+Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
+ Message* message = oob_queue_->Dequeue(); |
+ if (message == NULL && min_priority < Message::kOOBPriority) { |
+ message = queue_->Dequeue(); |
+ } |
+ return message; |
} |
-void MessageHandler::CloseAllPorts() { |
- queue()->FlushAll(); |
-} |
+bool MessageHandler::HandleMessages() { |
+ ASSERT(Isolate::Current() == NULL); |
+ ASSERT(pool_ != NULL); |
+ bool result = true; |
+ Message* message = DequeueMessage(Message::kNormalPriority); |
+ while (message) { |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[<] Handling message:\n" |
+ "\thandler: %s\n" |
+ "\tport: %lld\n", |
+ name(), message->dest_port()); |
+ } |
- |
-MessageQueue::MessageQueue() { |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- head_[p] = NULL; |
- tail_[p] = NULL; |
+ monitor_.Exit(); |
+ result = HandleMessage(message); |
+ ASSERT(Isolate::Current() == NULL); |
+ monitor_.Enter(); |
+ if (!result) { |
+ break; |
+ } |
+ message = DequeueMessage(Message::kNormalPriority); |
} |
+ return result; |
} |
-MessageQueue::~MessageQueue() { |
- // Ensure that all pending messages have been released. |
+bool MessageHandler::HandleNextMessage() { |
+ // We can only call HandleNextMessage when this handler is not |
+ // assigned to a thread pool. |
+ ASSERT(pool_ == NULL); |
#if defined(DEBUG) |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- ASSERT(head_[p] == NULL); |
- } |
+ CheckAccess(); |
#endif |
-} |
- |
- |
-void MessageQueue::Enqueue(Message* msg) { |
MonitorLocker ml(&monitor_); |
- Message::Priority p = msg->priority(); |
- // Make sure messages are not reused. |
- ASSERT(msg->next_ == NULL); |
- if (head_[p] == NULL) { |
- // Only element in the queue. |
- head_[p] = msg; |
- tail_[p] = msg; |
+ bool result = true; |
+ Message* message = DequeueMessage(Message::kNormalPriority); |
+ while (message) { |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[<] Handling message:\n" |
+ "\thandler: %s\n" |
+ "\tport: %lld\n", |
+ name(), message->dest_port()); |
+ } |
- // We only need to notify if the queue was empty. |
- monitor_.Notify(); |
- } else { |
- ASSERT(tail_[p] != NULL); |
- // Append at the tail. |
- tail_[p]->next_ = msg; |
- tail_[p] = msg; |
+ monitor_.Exit(); |
+ Message::Priority priority = message->priority(); |
+ result = HandleMessage(message); |
+ monitor_.Enter(); |
+ if (!result || priority == Message::kNormalPriority) { |
+ break; |
+ } |
+ message = DequeueMessage(Message::kNormalPriority); |
} |
+ return result; |
} |
-Message* MessageQueue::DequeueNoWait() { |
+bool MessageHandler::HandleOOBMessages() { |
MonitorLocker ml(&monitor_); |
- return DequeueNoWaitHoldsLock(Message::kFirstPriority); |
-} |
+ bool result = true; |
+ Message* message = DequeueMessage(Message::kOOBPriority); |
+ while (message) { |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[<] Handling message:\n" |
+ "\thandler: %s\n" |
+ "\tport: %lld\n", |
+ name(), message->dest_port()); |
+ } |
- |
-Message* MessageQueue::DequeueNoWaitWithPriority( |
- Message::Priority min_priority) { |
- MonitorLocker ml(&monitor_); |
- return DequeueNoWaitHoldsLock(min_priority); |
+ monitor_.Exit(); |
+ result = HandleMessage(message); |
+ monitor_.Enter(); |
+ if (!result) { |
+ break; |
+ } |
+ message = DequeueMessage(Message::kOOBPriority); |
+ } |
+ return result; |
} |
-Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) { |
- // Look for the highest priority available message. |
- for (int p = Message::kNumPriorities-1; p >= min_priority; p--) { |
- Message* result = head_[p]; |
- if (result != NULL) { |
- head_[p] = result->next_; |
- // The following update to tail_ is not strictly needed. |
- if (head_[p] == NULL) { |
- tail_[p] = NULL; |
- } |
-#if defined(DEBUG) |
- result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
-#endif // DEBUG |
- return result; |
+void MessageHandler::TaskCallback() { |
+ ASSERT(Isolate::Current() == NULL); |
+ bool error = false; |
+ { |
+ MonitorLocker ml(&monitor_); |
+ // Initialize the message handler by running its start function, |
+ // if we have one. For an isolate, this will run the isolate's |
+ // main() function. |
+ if (start_function_) { |
+ monitor_.Exit(); |
+ error = !start_function_(start_data_); |
Ivan Posva
2012/04/08 22:58:27
I find this style a bit hard to read. The double-n
turnidge
2012/04/11 19:37:16
True. Fixed.
On 2012/04/08 22:58:27, Ivan Posva
|
+ ASSERT(Isolate::Current() == NULL); |
+ start_function_ = NULL; |
+ start_data_ = NULL; |
+ monitor_.Enter(); |
} |
+ |
+ // Handle any pending messages for this message handler. |
+ if (!error) { |
+ error = !HandleMessages(); |
+ } |
+ task_ = NULL; // No task in queue. |
+ CheckTermination(&ml, error); |
} |
- return NULL; |
} |
-Message* MessageQueue::Dequeue(int64_t millis) { |
- ASSERT(millis >= 0); |
+void MessageHandler::ClosePort(Dart_Port port) { |
MonitorLocker ml(&monitor_); |
- Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); |
- if (result == NULL) { |
- // No message available at any priority. |
- monitor_.Wait(millis); |
- result = DequeueNoWaitHoldsLock(Message::kFirstPriority); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Closing port:\n" |
+ "\thandler: %s\n" |
+ "\tport: %d\n", |
+ name(), port); |
} |
- return result; |
+ queue_->Flush(port); |
} |
-void MessageQueue::Flush(Dart_Port port) { |
+void MessageHandler::CloseAllPorts() { |
MonitorLocker ml(&monitor_); |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- Message* cur = head_[p]; |
- Message* prev = NULL; |
- while (cur != NULL) { |
- Message* next = cur->next_; |
- // If the message matches, then remove it from the queue and delete it. |
- if (cur->dest_port() == port) { |
- if (prev != NULL) { |
- prev->next_ = next; |
- } else { |
- head_[p] = next; |
- } |
- delete cur; |
- } else { |
- // Move prev forward. |
- prev = cur; |
- } |
- // Advance to the next message in the queue. |
- cur = next; |
- } |
- tail_[p] = prev; |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Closing all ports:\n" |
+ "\thandler: %s\n", |
+ name()); |
} |
+ queue_->FlushAll(); |
} |
-void MessageQueue::FlushAll() { |
- MonitorLocker ml(&monitor_); |
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
- Message* cur = head_[p]; |
- head_[p] = NULL; |
- tail_[p] = NULL; |
- while (cur != NULL) { |
- Message* next = cur->next_; |
- delete cur; |
- cur = next; |
+void MessageHandler::CheckTermination(MonitorLocker* ml, bool error) { |
+ // If the message handler received an error or has no live ports, |
+ // we stop handling messages for this message handler. |
+ if (pool_ == NULL) { |
+ return; |
+ } |
+ if (error) { |
+ // We received an error. Abnormal termination. |
+ char* error_str = GetErrorCString(); |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Stopping message handler (error):\n" |
+ "\thandler: %s\n", |
+ name()); |
+ OS::Print("Error: %s\n", error_str); |
} |
+ pool_ = NULL; |
+ if (blocking_) { |
+ ml->Notify(); |
+ } else { |
+ // TODO(turnidge): Implement a real system for handling |
+ // isolate death rather than terminating the vm. |
+ OS::PrintErr("%s\n", error_str); |
+ exit(255); |
+ } |
+ free(error_str); |
+ |
+ } else if (!HasLivePorts()) { |
+ // No live ports. Normal termination. |
+ if (FLAG_trace_isolates) { |
+ OS::Print("[-] Stopping message handler (no live ports):\n" |
+ "\thandler: %s\n", |
+ name()); |
+ } |
+ pool_ = NULL; |
+ if (blocking_) { |
+ ml->Notify(); |
+ } |
} |
} |
- |
} // namespace dart |