| Index: runtime/vm/message_handler.cc
|
| ===================================================================
|
| --- runtime/vm/message_handler.cc (revision 5489)
|
| +++ runtime/vm/message_handler.cc (working copy)
|
| @@ -2,22 +2,49 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -#include "vm/message.h"
|
| +#include "vm/message_handler.h"
|
| +#include "vm/dart.h"
|
|
|
| namespace dart {
|
|
|
| DECLARE_FLAG(bool, trace_isolates);
|
|
|
|
|
| +class MessageHandlerTask : public ThreadPool::Task {
|
| + public:
|
| + explicit MessageHandlerTask(MessageHandler* handler)
|
| + : handler_(handler) {
|
| + ASSERT(handler != NULL);
|
| + }
|
| +
|
| + void Run() {
|
| + handler_->TaskCallback();
|
| + }
|
| +
|
| + private:
|
| + MessageHandler* handler_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
|
| +};
|
| +
|
| +
|
| MessageHandler::MessageHandler()
|
| - : live_ports_(0),
|
| - queue_(new MessageQueue()) {
|
| + : queue_(new MessageQueue()),
|
| + oob_queue_(new MessageQueue()),
|
| + live_ports_(0),
|
| + pool_(NULL),
|
| + task_(NULL),
|
| + start_callback_(NULL),
|
| + end_callback_(NULL),
|
| + callback_data_(NULL) {
|
| ASSERT(queue_ != NULL);
|
| + ASSERT(oob_queue_ != NULL);
|
| }
|
|
|
|
|
| MessageHandler::~MessageHandler() {
|
| delete queue_;
|
| + delete oob_queue_;
|
| }
|
|
|
|
|
| @@ -38,7 +65,28 @@
|
| }
|
|
|
|
|
| +void MessageHandler::Run(ThreadPool* pool,
|
| + StartCallback start_callback,
|
| + EndCallback end_callback,
|
| + CallbackData data) {
|
| + MonitorLocker ml(&monitor_);
|
| + if (FLAG_trace_isolates) {
|
| + OS::Print("[+] Starting message handler:\n"
|
| + "\thandler: %s\n",
|
| + name());
|
| + }
|
| + ASSERT(pool_ == NULL);
|
| + pool_ = pool;
|
| + start_callback_ = start_callback;
|
| + end_callback_ = end_callback;
|
| + callback_data_ = data;
|
| + task_ = new MessageHandlerTask(this);
|
| + pool_->Run(task_);
|
| +}
|
| +
|
| +
|
| void MessageHandler::PostMessage(Message* message) {
|
| + MonitorLocker ml(&monitor_);
|
| if (FLAG_trace_isolates) {
|
| const char* source_name = "<native code>";
|
| Isolate* source_isolate = Isolate::Current();
|
| @@ -53,150 +101,175 @@
|
| source_name, message->reply_port(), name(), message->dest_port());
|
| }
|
|
|
| - Message::Priority priority = message->priority();
|
| - queue()->Enqueue(message);
|
| + Message::Priority saved_priority = message->priority();
|
| + if (message->IsOOB()) {
|
| + oob_queue_->Enqueue(message);
|
| + } else {
|
| + queue_->Enqueue(message);
|
| + }
|
| message = NULL; // Do not access message. May have been deleted.
|
|
|
| + if (pool_ != NULL && task_ == NULL) {
|
| + task_ = new MessageHandlerTask(this);
|
| + pool_->Run(task_);
|
| + }
|
| +
|
| // Invoke any custom message notification.
|
| - MessageNotify(priority);
|
| + MessageNotify(saved_priority);
|
| }
|
|
|
|
|
| -void MessageHandler::ClosePort(Dart_Port port) {
|
| - queue()->Flush(port);
|
| +Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
|
| + // TODO(turnidge): Add assert that monitor_ is held here.
|
| + Message* message = oob_queue_->Dequeue();
|
| + if (message == NULL && min_priority < Message::kOOBPriority) {
|
| + message = queue_->Dequeue();
|
| + }
|
| + return message;
|
| }
|
|
|
|
|
| -void MessageHandler::CloseAllPorts() {
|
| - queue()->FlushAll();
|
| -}
|
| +bool MessageHandler::HandleMessages(bool allow_normal_messages,
|
| + bool allow_multiple_normal_messages) {
|
| + // TODO(turnidge): Add assert that monitor_ is held here.
|
| + bool result = true;
|
| + Message::Priority min_priority = (allow_normal_messages
|
| + ? Message::kNormalPriority
|
| + : Message::kOOBPriority);
|
| + Message* message = DequeueMessage(min_priority);
|
| + while (message) {
|
| + if (FLAG_trace_isolates) {
|
| + OS::Print("[<] Handling message:\n"
|
| + "\thandler: %s\n"
|
| + "\tport: %lld\n",
|
| + name(), message->dest_port());
|
| + }
|
|
|
| + // Release the monitor_ temporarily while we handle the message.
|
| + // The monitor was acquired in MessageHandler::TaskCallback().
|
| + monitor_.Exit();
|
| + Message::Priority saved_priority = message->priority();
|
| + result = HandleMessage(message);
|
| + // ASSERT(Isolate::Current() == NULL);
|
| + monitor_.Enter();
|
|
|
| -MessageQueue::MessageQueue() {
|
| - for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| - head_[p] = NULL;
|
| - tail_[p] = NULL;
|
| + if (!result) {
|
| + // If we hit an error, we're done processing messages.
|
| + break;
|
| + }
|
| + if (!allow_multiple_normal_messages &&
|
| + saved_priority == Message::kNormalPriority) {
|
| + // Some callers want to process only one normal message and then quit.
|
| + break;
|
| + }
|
| + message = DequeueMessage(min_priority);
|
| }
|
| + return result;
|
| }
|
|
|
|
|
| -MessageQueue::~MessageQueue() {
|
| - // Ensure that all pending messages have been released.
|
| +bool MessageHandler::HandleNextMessage() {
|
| + // We can only call HandleNextMessage when this handler is not
|
| + // assigned to a thread pool.
|
| + MonitorLocker ml(&monitor_);
|
| + ASSERT(pool_ == NULL);
|
| #if defined(DEBUG)
|
| - for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| - ASSERT(head_[p] == NULL);
|
| - }
|
| + CheckAccess();
|
| #endif
|
| + return HandleMessages(true, false);
|
| }
|
|
|
|
|
| -void MessageQueue::Enqueue(Message* msg) {
|
| +bool MessageHandler::HandleOOBMessages() {
|
| MonitorLocker ml(&monitor_);
|
| - Message::Priority p = msg->priority();
|
| - // Make sure messages are not reused.
|
| - ASSERT(msg->next_ == NULL);
|
| - if (head_[p] == NULL) {
|
| - // Only element in the queue.
|
| - head_[p] = msg;
|
| - tail_[p] = msg;
|
| -
|
| - // We only need to notify if the queue was empty.
|
| - monitor_.Notify();
|
| - } else {
|
| - ASSERT(tail_[p] != NULL);
|
| - // Append at the tail.
|
| - tail_[p]->next_ = msg;
|
| - tail_[p] = msg;
|
| - }
|
| +#if defined(DEBUG)
|
| + CheckAccess();
|
| +#endif
|
| + return HandleMessages(false, false);
|
| }
|
|
|
|
|
| -Message* MessageQueue::DequeueNoWait() {
|
| - MonitorLocker ml(&monitor_);
|
| - return DequeueNoWaitHoldsLock(Message::kFirstPriority);
|
| -}
|
| +void MessageHandler::TaskCallback() {
|
| + ASSERT(Isolate::Current() == NULL);
|
| + bool ok = true;
|
| + bool run_end_callback = false;
|
| + {
|
| + MonitorLocker ml(&monitor_);
|
| + // Initialize the message handler by running its start function,
|
| + // if we have one. For an isolate, this will run the isolate's
|
| + // main() function.
|
| + if (start_callback_) {
|
| + monitor_.Exit();
|
| + ok = start_callback_(callback_data_);
|
| + ASSERT(Isolate::Current() == NULL);
|
| + start_callback_ = NULL;
|
| + monitor_.Enter();
|
| + }
|
|
|
| + // Handle any pending messages for this message handler.
|
| + if (ok) {
|
| + ok = HandleMessages(true, true);
|
| + }
|
| + task_ = NULL; // No task in queue.
|
|
|
| -Message* MessageQueue::DequeueNoWaitWithPriority(
|
| - Message::Priority min_priority) {
|
| - MonitorLocker ml(&monitor_);
|
| - return DequeueNoWaitHoldsLock(min_priority);
|
| -}
|
| -
|
| -
|
| -Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
|
| - // Look for the highest priority available message.
|
| - for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
|
| - Message* result = head_[p];
|
| - if (result != NULL) {
|
| - head_[p] = result->next_;
|
| - // The following update to tail_ is not strictly needed.
|
| - if (head_[p] == NULL) {
|
| - tail_[p] = NULL;
|
| + if (!ok || !HasLivePorts()) {
|
| + if (FLAG_trace_isolates) {
|
| + OS::Print("[-] Stopping message handler (%s):\n"
|
| + "\thandler: %s\n",
|
| + (ok ? "no live ports" : "error"),
|
| + name());
|
| }
|
| -#if defined(DEBUG)
|
| - result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
|
| -#endif // DEBUG
|
| - return result;
|
| + pool_ = NULL;
|
| + run_end_callback = true;
|
| }
|
| }
|
| - return NULL;
|
| + if (run_end_callback && end_callback_ != NULL) {
|
| + end_callback_(callback_data_);
|
| + // The handler may have been deleted after this point.
|
| + }
|
| }
|
|
|
|
|
| -Message* MessageQueue::Dequeue(int64_t millis) {
|
| - ASSERT(millis >= 0);
|
| +void MessageHandler::ClosePort(Dart_Port port) {
|
| MonitorLocker ml(&monitor_);
|
| - Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
|
| - if (result == NULL) {
|
| - // No message available at any priority.
|
| - monitor_.Wait(millis);
|
| - result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
|
| + if (FLAG_trace_isolates) {
|
| + OS::Print("[-] Closing port:\n"
|
| + "\thandler: %s\n"
|
| + "\tport: %d\n",
|
| + name(), port);
|
| }
|
| - return result;
|
| + queue_->Flush(port);
|
| + oob_queue_->Flush(port);
|
| }
|
|
|
|
|
| -void MessageQueue::Flush(Dart_Port port) {
|
| +void MessageHandler::CloseAllPorts() {
|
| MonitorLocker ml(&monitor_);
|
| - for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| - Message* cur = head_[p];
|
| - Message* prev = NULL;
|
| - while (cur != NULL) {
|
| - Message* next = cur->next_;
|
| - // If the message matches, then remove it from the queue and delete it.
|
| - if (cur->dest_port() == port) {
|
| - if (prev != NULL) {
|
| - prev->next_ = next;
|
| - } else {
|
| - head_[p] = next;
|
| - }
|
| - delete cur;
|
| - } else {
|
| - // Move prev forward.
|
| - prev = cur;
|
| - }
|
| - // Advance to the next message in the queue.
|
| - cur = next;
|
| - }
|
| - tail_[p] = prev;
|
| + if (FLAG_trace_isolates) {
|
| + OS::Print("[-] Closing all ports:\n"
|
| + "\thandler: %s\n",
|
| + name());
|
| }
|
| + queue_->FlushAll();
|
| + oob_queue_->FlushAll();
|
| }
|
|
|
|
|
| -void MessageQueue::FlushAll() {
|
| +void MessageHandler::increment_live_ports() {
|
| MonitorLocker ml(&monitor_);
|
| - for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| - Message* cur = head_[p];
|
| - head_[p] = NULL;
|
| - tail_[p] = NULL;
|
| - while (cur != NULL) {
|
| - Message* next = cur->next_;
|
| - delete cur;
|
| - cur = next;
|
| - }
|
| - }
|
| +#if defined(DEBUG)
|
| + CheckAccess();
|
| +#endif
|
| + live_ports_++;
|
| }
|
|
|
|
|
| +void MessageHandler::decrement_live_ports() {
|
| + MonitorLocker ml(&monitor_);
|
| +#if defined(DEBUG)
|
| + CheckAccess();
|
| +#endif
|
| + live_ports_--;
|
| +}
|
| +
|
| } // namespace dart
|
|
|