Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2774)

Unified Diff: runtime/vm/message_handler.cc

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/vm/message_handler.cc
===================================================================
--- runtime/vm/message_handler.cc (revision 5489)
+++ runtime/vm/message_handler.cc (working copy)
@@ -2,22 +2,47 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-#include "vm/message.h"
+#include "vm/message_handler.h"
+#include "vm/dart.h"
namespace dart {
DECLARE_FLAG(bool, trace_isolates);
+class MessageHandlerTask : public ThreadPool::Task {
+ public:
+ explicit MessageHandlerTask(MessageHandler* handler)
+ : handler_(handler) {
+ }
+
+ void Run() {
+ handler_->TaskCallback();
+ }
+
+ private:
+ MessageHandler* handler_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
+};
+
+
MessageHandler::MessageHandler()
- : live_ports_(0),
- queue_(new MessageQueue()) {
+ : queue_(new MessageQueue()),
+ oob_queue_(new MessageQueue()),
+ pool_(NULL),
+ blocking_(false),
+ task_(NULL),
+ start_function_(NULL),
+ start_data_(NULL),
+ live_ports_(0) {
ASSERT(queue_ != NULL);
}
MessageHandler::~MessageHandler() {
delete queue_;
+ delete oob_queue_;
}
@@ -38,7 +63,54 @@
}
+void MessageHandler::Run(ThreadPool* pool,
+ StartFunction function, StartData data) {
+ {
+ MonitorLocker ml(&monitor_);
siva 2012/04/14 00:29:53 What is this lock protecting? It seems like it is
turnidge 2012/04/17 23:46:55 Discuessed offline... My intention was to always
+ if (FLAG_trace_isolates) {
+ OS::Print("[+] Starting message handler:\n"
+ "\thandler: %s\n",
+ name());
+ }
+ ASSERT(pool_ == NULL);
+ pool_ = pool;
+ blocking_ = false;
+ start_function_ = function;
+ start_data_ = data;
+ task_ = new MessageHandlerTask(this);
+ }
+ pool_->Run(task_);
+}
+
+
+void MessageHandler::RunBlocking(ThreadPool* pool,
+ StartFunction function, StartData data) {
+ {
+ MonitorLocker ml(&monitor_);
siva 2012/04/14 00:29:53 Ditto question about what is this lock protecting.
+ if (FLAG_trace_isolates) {
+ OS::Print("[+] Starting message handler:\n"
+ "\thandler: %s\n",
+ name());
+ }
+ ASSERT(pool_ == NULL);
+ pool_ = pool;
+ blocking_ = true;
+ start_function_ = function;
+ start_data_ = data;
+ task_ = new MessageHandlerTask(this);
+ }
+ pool_->Run(task_);
+ {
+ MonitorLocker ml(&monitor_);
+ while (pool_ != NULL) {
+ ml.Wait();
+ }
+ }
+}
+
+
void MessageHandler::PostMessage(Message* message) {
+ MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
const char* source_name = "<native code>";
Isolate* source_isolate = Isolate::Current();
@@ -54,149 +126,201 @@
}
Message::Priority priority = message->priority();
- queue()->Enqueue(message);
+ if (priority == Message::kOOBPriority) {
+ oob_queue_->Enqueue(message);
+ } else {
+ queue_->Enqueue(message);
+ }
message = NULL; // Do not access message. May have been deleted.
+ if (pool_ != NULL && task_ == NULL) {
+ task_ = new MessageHandlerTask(this);
+ pool_->Run(task_);
siva 2012/04/14 00:29:53 The task_ is run with the lock being held whereas
turnidge 2012/04/17 23:46:55 You are right. I have changed Run() so that it no
+ }
+
// Invoke any custom message notification.
MessageNotify(priority);
}
-void MessageHandler::ClosePort(Dart_Port port) {
- queue()->Flush(port);
+Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
+ Message* message = oob_queue_->Dequeue();
+ if (message == NULL && min_priority < Message::kOOBPriority) {
+ message = queue_->Dequeue();
+ }
+ return message;
}
-void MessageHandler::CloseAllPorts() {
- queue()->FlushAll();
-}
+bool MessageHandler::HandleMessages() {
+ ASSERT(Isolate::Current() == NULL);
+ ASSERT(pool_ != NULL);
+ bool result = true;
+ Message* message = DequeueMessage(Message::kNormalPriority);
+ while (message) {
+ if (FLAG_trace_isolates) {
+ OS::Print("[<] Handling message:\n"
+ "\thandler: %s\n"
+ "\tport: %lld\n",
+ name(), message->dest_port());
+ }
-
-MessageQueue::MessageQueue() {
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- head_[p] = NULL;
- tail_[p] = NULL;
+ monitor_.Exit();
siva 2012/04/14 00:29:53 Where is the monitor_.Enter() corresponding to thi
turnidge 2012/04/17 23:46:55 I will add "lock held" comments to the appropriate
+ result = HandleMessage(message);
+ ASSERT(Isolate::Current() == NULL);
+ monitor_.Enter();
+ if (!result) {
+ break;
+ }
+ message = DequeueMessage(Message::kNormalPriority);
}
+ return result;
}
-MessageQueue::~MessageQueue() {
- // Ensure that all pending messages have been released.
+bool MessageHandler::HandleNextMessage() {
+ // We can only call HandleNextMessage when this handler is not
+ // assigned to a thread pool.
+ ASSERT(pool_ == NULL);
#if defined(DEBUG)
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- ASSERT(head_[p] == NULL);
- }
+ CheckAccess();
#endif
-}
-
-
-void MessageQueue::Enqueue(Message* msg) {
MonitorLocker ml(&monitor_);
- Message::Priority p = msg->priority();
- // Make sure messages are not reused.
- ASSERT(msg->next_ == NULL);
- if (head_[p] == NULL) {
- // Only element in the queue.
- head_[p] = msg;
- tail_[p] = msg;
+ bool result = true;
+ Message* message = DequeueMessage(Message::kNormalPriority);
+ while (message) {
+ if (FLAG_trace_isolates) {
+ OS::Print("[<] Handling message:\n"
+ "\thandler: %s\n"
+ "\tport: %lld\n",
+ name(), message->dest_port());
+ }
- // We only need to notify if the queue was empty.
- monitor_.Notify();
- } else {
- ASSERT(tail_[p] != NULL);
- // Append at the tail.
- tail_[p]->next_ = msg;
- tail_[p] = msg;
+ monitor_.Exit();
+ Message::Priority priority = message->priority();
+ result = HandleMessage(message);
+ monitor_.Enter();
+ if (!result || priority == Message::kNormalPriority) {
+ break;
+ }
+ message = DequeueMessage(Message::kNormalPriority);
}
+ return result;
}
siva 2012/04/14 00:29:53 HandleNextMessage seems identical to HandleMessage
turnidge 2012/04/17 23:46:55 Refactored to share more code.
-Message* MessageQueue::DequeueNoWait() {
+bool MessageHandler::HandleOOBMessages() {
MonitorLocker ml(&monitor_);
- return DequeueNoWaitHoldsLock(Message::kFirstPriority);
-}
+ bool result = true;
+ Message* message = DequeueMessage(Message::kOOBPriority);
+ while (message) {
+ if (FLAG_trace_isolates) {
+ OS::Print("[<] Handling message:\n"
+ "\thandler: %s\n"
+ "\tport: %lld\n",
+ name(), message->dest_port());
+ }
-
-Message* MessageQueue::DequeueNoWaitWithPriority(
- Message::Priority min_priority) {
- MonitorLocker ml(&monitor_);
- return DequeueNoWaitHoldsLock(min_priority);
+ monitor_.Exit();
+ result = HandleMessage(message);
+ monitor_.Enter();
+ if (!result) {
+ break;
+ }
+ message = DequeueMessage(Message::kOOBPriority);
+ }
+ return result;
}
-Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
- // Look for the highest priority available message.
- for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
- Message* result = head_[p];
- if (result != NULL) {
- head_[p] = result->next_;
- // The following update to tail_ is not strictly needed.
- if (head_[p] == NULL) {
- tail_[p] = NULL;
- }
-#if defined(DEBUG)
- result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
-#endif // DEBUG
- return result;
+void MessageHandler::TaskCallback() {
+ ASSERT(Isolate::Current() == NULL);
+ bool ok = true;
+ {
+ MonitorLocker ml(&monitor_);
siva 2012/04/14 00:29:53 Is the lock being done here primarily to protect a
+ // Initialize the message handler by running its start function,
+ // if we have one. For an isolate, this will run the isolate's
+ // main() function.
+ if (start_function_) {
+ monitor_.Exit();
+ ok = start_function_(start_data_);
+ ASSERT(Isolate::Current() == NULL);
+ start_function_ = NULL;
+ start_data_ = NULL;
+ monitor_.Enter();
}
+
+ // Handle any pending messages for this message handler.
+ if (ok) {
+ ok = HandleMessages();
+ }
+ task_ = NULL; // No task in queue.
+ CheckTermination(&ml, !ok);
}
- return NULL;
}
-Message* MessageQueue::Dequeue(int64_t millis) {
- ASSERT(millis >= 0);
+void MessageHandler::ClosePort(Dart_Port port) {
MonitorLocker ml(&monitor_);
- Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
- if (result == NULL) {
- // No message available at any priority.
- monitor_.Wait(millis);
- result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Closing port:\n"
+ "\thandler: %s\n"
+ "\tport: %d\n",
+ name(), port);
}
- return result;
+ queue_->Flush(port);
}
-void MessageQueue::Flush(Dart_Port port) {
+void MessageHandler::CloseAllPorts() {
MonitorLocker ml(&monitor_);
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- Message* cur = head_[p];
- Message* prev = NULL;
- while (cur != NULL) {
- Message* next = cur->next_;
- // If the message matches, then remove it from the queue and delete it.
- if (cur->dest_port() == port) {
- if (prev != NULL) {
- prev->next_ = next;
- } else {
- head_[p] = next;
- }
- delete cur;
- } else {
- // Move prev forward.
- prev = cur;
- }
- // Advance to the next message in the queue.
- cur = next;
- }
- tail_[p] = prev;
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Closing all ports:\n"
+ "\thandler: %s\n",
+ name());
}
+ queue_->FlushAll();
}
-void MessageQueue::FlushAll() {
- MonitorLocker ml(&monitor_);
- for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
- Message* cur = head_[p];
- head_[p] = NULL;
- tail_[p] = NULL;
- while (cur != NULL) {
- Message* next = cur->next_;
- delete cur;
- cur = next;
+void MessageHandler::CheckTermination(MonitorLocker* ml, bool error) {
siva 2012/04/14 00:29:53 Need an assertion that the lock is indeed held whe
+ // If the message handler received an error or has no live ports,
+ // we stop handling messages for this message handler.
+ if (pool_ == NULL) {
+ return;
+ }
+ if (error) {
+ // We received an error. Abnormal termination.
+ char* error_str = GetErrorCString();
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Stopping message handler (error):\n"
+ "\thandler: %s\n",
+ name());
+ OS::Print("Error: %s\n", error_str);
}
+ pool_ = NULL;
+ if (blocking_) {
+ ml->Notify();
+ } else {
+ // TODO(turnidge): Implement a real system for handling
+ // isolate death rather than terminating the vm.
+ OS::PrintErr("%s\n", error_str);
+ exit(255);
+ }
+ free(error_str);
+
+ } else if (!HasLivePorts()) {
+ // No live ports. Normal termination.
+ if (FLAG_trace_isolates) {
+ OS::Print("[-] Stopping message handler (no live ports):\n"
+ "\thandler: %s\n",
+ name());
+ }
+ pool_ = NULL;
+ if (blocking_) {
+ ml->Notify();
+ }
}
}
-
} // namespace dart

Powered by Google App Engine
This is Rietveld 408576698