Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(382)

Unified Diff: runtime/vm/message_handler.cc

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/vm/message_handler.cc
===================================================================
--- runtime/vm/message_handler.cc (revision 5489)
+++ runtime/vm/message_handler.cc (working copy)
@@ -2,22 +2,49 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-#include "vm/message.h"
+#include "vm/message_handler.h"
+#include "vm/dart.h"
namespace dart {
DECLARE_FLAG(bool, trace_isolates);
+class MessageHandlerTask : public ThreadPool::Task {
+ public:
+ explicit MessageHandlerTask(MessageHandler* handler)
+ : handler_(handler) {
+ ASSERT(handler != NULL);
+ }
+
+ void Run() {
+ handler_->TaskCallback();
+ }
+
+ private:
+ MessageHandler* handler_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
+};
+
+
MessageHandler::MessageHandler()
- : live_ports_(0),
- queue_(new MessageQueue()) {
+ : queue_(new MessageQueue()),
+ oob_queue_(new MessageQueue()),
+ live_ports_(0),
+ pool_(NULL),
+ task_(NULL),
+ start_callback_(NULL),
+ end_callback_(NULL),
+ callback_data_(NULL) {
ASSERT(queue_ != NULL);
+ ASSERT(oob_queue_ != NULL);
}
MessageHandler::~MessageHandler() {
delete queue_;
+ delete oob_queue_;
}
@@ -38,7 +65,28 @@
}
+void MessageHandler::Run(ThreadPool* pool,
+ StartCallback start_callback,
+ EndCallback end_callback,
+ CallbackData data) {
+ MonitorLocker ml(&monitor_);
+ if (FLAG_trace_isolates) {
+ OS::Print("[+] Starting message handler:\n"
+ "\thandler: %s\n",
+ name());
+ }
+ ASSERT(pool_ == NULL);
+ pool_ = pool;
+ start_callback_ = start_callback;
+ end_callback_ = end_callback;
+ callback_data_ = data;
+ task_ = new MessageHandlerTask(this);
+ pool_->Run(task_);
+}
+
+
void MessageHandler::PostMessage(Message* message) {
+ MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
const char* source_name = "<native code>";
Isolate* source_isolate = Isolate::Current();
@@ -53,150 +101,175 @@
source_name, message->reply_port(), name(), message->dest_port());
}
- Message::Priority priority = message->priority();
- queue()->Enqueue(message);
+ Message::Priority saved_priority = message->priority();
+ if (message->IsOOB()) {
+ oob_queue_->Enqueue(message);
+ } else {
+ queue_->Enqueue(message);
+ }
message = NULL; // Do not access message. May have been deleted.
+ if (pool_ != NULL && task_ == NULL) {
+ task_ = new MessageHandlerTask(this);
+ pool_->Run(task_);
+ }
+
// Invoke any custom message notification.
- MessageNotify(priority);
+ MessageNotify(saved_priority);
}
-void MessageHandler::ClosePort(Dart_Port port) {
- queue()->Flush(port);
+Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
+ // TODO(turnidge): Add assert that monitor_ is held here.
+ Message* message = oob_queue_->Dequeue();
+ if (message == NULL && min_priority < Message::kOOBPriority) {
+ message = queue_->Dequeue();
+ }
+ return message;
}
-void MessageHandler::CloseAllPorts() {
- queue()->FlushAll();
-}
+bool MessageHandler::HandleMessages(bool allow_normal_messages,
+ bool allow_multiple_normal_messages) {
+ // TODO(turnidge): Add assert that monitor_ is held here.
+ bool result = true;
+ Message::Priority min_priority = (allow_normal_messages
+ ? Message::kNormalPriority
+ : Message::kOOBPriority);
+ Message* message = DequeueMessage(min_priority);
+ while (message) {
+ if (FLAG_trace_isolates) {
+ OS::Print("[<] Handling message:\n"
+ "\thandler: %s\n"
+ "\tport: %lld\n",
+ name(), message->dest_port());
+ }
+ // Release the monitor_ temporarily while we handle the message.
+ // The monitor was acquired in MessageHandler::TaskCallback().
+ monitor_.Exit();
+ Message::Priority saved_priority = message->priority();
+ result = HandleMessage(message);
+ // ASSERT(Isolate::Current() == NULL);
+ monitor_.Enter();
-MessageQueue::MessageQueue() {
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- head_[p] = NULL;
- tail_[p] = NULL;
+ if (!result) {
+ // If we hit an error, we're done processing messages.
+ break;
+ }
+ if (!allow_multiple_normal_messages &&
+ saved_priority == Message::kNormalPriority) {
+ // Some callers want to process only one normal message and then quit.
+ break;
+ }
+ message = DequeueMessage(min_priority);
}
+ return result;
}
-MessageQueue::~MessageQueue() {
- // Ensure that all pending messages have been released.
+bool MessageHandler::HandleNextMessage() {
+ // We can only call HandleNextMessage when this handler is not
+ // assigned to a thread pool.
+ MonitorLocker ml(&monitor_);
+ ASSERT(pool_ == NULL);
#if defined(DEBUG)
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- ASSERT(head_[p] == NULL);
- }
+ CheckAccess();
#endif
+ return HandleMessages(true, false);
}
-void MessageQueue::Enqueue(Message* msg) {
+bool MessageHandler::HandleOOBMessages() {
MonitorLocker ml(&monitor_);
- Message::Priority p = msg->priority();
- // Make sure messages are not reused.
- ASSERT(msg->next_ == NULL);
- if (head_[p] == NULL) {
- // Only element in the queue.
- head_[p] = msg;
- tail_[p] = msg;
-
- // We only need to notify if the queue was empty.
- monitor_.Notify();
- } else {
- ASSERT(tail_[p] != NULL);
- // Append at the tail.
- tail_[p]->next_ = msg;
- tail_[p] = msg;
- }
+#if defined(DEBUG)
+ CheckAccess();
+#endif
+ return HandleMessages(false, false);
}
-Message* MessageQueue::DequeueNoWait() {
- MonitorLocker ml(&monitor_);
- return DequeueNoWaitHoldsLock(Message::kFirstPriority);
-}
+void MessageHandler::TaskCallback() {
+ ASSERT(Isolate::Current() == NULL);
+ bool ok = true;
+ bool run_end_callback = false;
+ {
+ MonitorLocker ml(&monitor_);
+ // Initialize the message handler by running its start function,
+ // if we have one. For an isolate, this will run the isolate's
+ // main() function.
+ if (start_callback_) {
+ monitor_.Exit();
+ ok = start_callback_(callback_data_);
+ ASSERT(Isolate::Current() == NULL);
+ start_callback_ = NULL;
+ monitor_.Enter();
+ }
+ // Handle any pending messages for this message handler.
+ if (ok) {
+ ok = HandleMessages(true, true);
+ }
+ task_ = NULL; // No task in queue.
-Message* MessageQueue::DequeueNoWaitWithPriority(
- Message::Priority min_priority) {
- MonitorLocker ml(&monitor_);
- return DequeueNoWaitHoldsLock(min_priority);
-}
-
-
-Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
- // Look for the highest priority available message.
- for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
- Message* result = head_[p];
- if (result != NULL) {
- head_[p] = result->next_;
- // The following update to tail_ is not strictly needed.
- if (head_[p] == NULL) {
- tail_[p] = NULL;
+ if (!ok || !HasLivePorts()) {
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Stopping message handler (%s):\n"
+ "\thandler: %s\n",
+ (ok ? "no live ports" : "error"),
+ name());
}
-#if defined(DEBUG)
- result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
-#endif // DEBUG
- return result;
+ pool_ = NULL;
+ run_end_callback = true;
}
}
- return NULL;
+ if (run_end_callback && end_callback_ != NULL) {
+ end_callback_(callback_data_);
+ // The handler may have been deleted after this point.
+ }
}
-Message* MessageQueue::Dequeue(int64_t millis) {
- ASSERT(millis >= 0);
+void MessageHandler::ClosePort(Dart_Port port) {
MonitorLocker ml(&monitor_);
- Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
- if (result == NULL) {
- // No message available at any priority.
- monitor_.Wait(millis);
- result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Closing port:\n"
+ "\thandler: %s\n"
+ "\tport: %d\n",
+ name(), port);
}
- return result;
+ queue_->Flush(port);
+ oob_queue_->Flush(port);
}
-void MessageQueue::Flush(Dart_Port port) {
+void MessageHandler::CloseAllPorts() {
MonitorLocker ml(&monitor_);
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- Message* cur = head_[p];
- Message* prev = NULL;
- while (cur != NULL) {
- Message* next = cur->next_;
- // If the message matches, then remove it from the queue and delete it.
- if (cur->dest_port() == port) {
- if (prev != NULL) {
- prev->next_ = next;
- } else {
- head_[p] = next;
- }
- delete cur;
- } else {
- // Move prev forward.
- prev = cur;
- }
- // Advance to the next message in the queue.
- cur = next;
- }
- tail_[p] = prev;
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Closing all ports:\n"
+ "\thandler: %s\n",
+ name());
}
+ queue_->FlushAll();
+ oob_queue_->FlushAll();
}
-void MessageQueue::FlushAll() {
+void MessageHandler::increment_live_ports() {
MonitorLocker ml(&monitor_);
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- Message* cur = head_[p];
- head_[p] = NULL;
- tail_[p] = NULL;
- while (cur != NULL) {
- Message* next = cur->next_;
- delete cur;
- cur = next;
- }
- }
+#if defined(DEBUG)
+ CheckAccess();
+#endif
+ live_ports_++;
}
+void MessageHandler::decrement_live_ports() {
+ MonitorLocker ml(&monitor_);
+#if defined(DEBUG)
+ CheckAccess();
+#endif
+ live_ports_--;
+}
+
} // namespace dart
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698