Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1348)

Unified Diff: runtime/vm/message_handler.h

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: runtime/vm/message_handler.h
===================================================================
--- runtime/vm/message_handler.h (revision 5489)
+++ runtime/vm/message_handler.h (working copy)
@@ -2,115 +2,68 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-#ifndef VM_MESSAGE_H_
-#define VM_MESSAGE_H_
+#ifndef VM_MESSAGE_HANDLER_H_
+#define VM_MESSAGE_HANDLER_H_
+#include "vm/message.h"
#include "vm/thread.h"
+#include "vm/thread_pool.h"
-// Duplicated from dart_api.h to avoid including the whole header.
-typedef int64_t Dart_Port;
-
namespace dart {
-class Message {
- public:
- typedef enum {
- kNormalPriority = 0, // Deliver message when idle.
- kOOBPriority = 1, // Deliver message asap.
-
- // Iteration.
- kFirstPriority = 0,
- kNumPriorities = 2,
- } Priority;
-
- // A port number which is never used.
- static const Dart_Port kIllegalPort = 0;
-
- // A new message to be sent between two isolates. The data handed to this
- // message will be disposed by calling free() once the message object is
- // being destructed (after delivery or when the receiving port is closed).
- //
- // If reply_port is kIllegalPort, then there is no reply port.
- Message(Dart_Port dest_port, Dart_Port reply_port,
- uint8_t* data, Priority priority)
- : next_(NULL),
- dest_port_(dest_port),
- reply_port_(reply_port),
- data_(data),
- priority_(priority) {}
- ~Message() {
- free(data_);
- }
-
- Dart_Port dest_port() const { return dest_port_; }
- Dart_Port reply_port() const { return reply_port_; }
- uint8_t* data() const { return data_; }
- Priority priority() const { return priority_; }
-
- private:
- friend class MessageQueue;
-
- Message* next_;
- Dart_Port dest_port_;
- Dart_Port reply_port_;
- uint8_t* data_;
- Priority priority_;
-
- DISALLOW_COPY_AND_ASSIGN(Message);
-};
-
-// There is a message queue per isolate.
-class MessageQueue {
- public:
- MessageQueue();
- ~MessageQueue();
-
- void Enqueue(Message* msg);
-
- // Gets the next message from the message queue, possibly blocking
- // if no message is available. 'millis' is a timeout in
- // milliseconds. If 'millis' is 0, then this means to block
- // indefinitely. May block if no message is available. May return
- // NULL even if 'millis' is 0 due to spurious wakeups.
- Message* Dequeue(int64_t millis);
-
- // Gets the next message from the message queue if available. Will
- // not block.
- Message* DequeueNoWait();
-
- // Gets the next message of the specified priority or greater from
- // the message queue if available. Will not block.
- Message* DequeueNoWaitWithPriority(Message::Priority min_priority);
-
- void Flush(Dart_Port port);
- void FlushAll();
-
- private:
- friend class MessageQueueTestPeer;
-
- Message* DequeueNoWaitHoldsLock(Message::Priority min_priority);
-
- Monitor monitor_;
- Message* head_[Message::kNumPriorities];
- Message* tail_[Message::kNumPriorities];
-
- DISALLOW_COPY_AND_ASSIGN(MessageQueue);
-};
-
// A MessageHandler is an entity capable of accepting messages.
class MessageHandler {
protected:
MessageHandler();
- // Allows subclasses to provide custom message notification.
- virtual void MessageNotify(Message::Priority priority);
-
public:
virtual ~MessageHandler();
// Allow subclasses to provide a handler name.
virtual const char* name() const;
+ typedef uword StartData;
+ typedef bool (*StartFunction)(StartData data);
+
+ // Runs this message handler on the thread pool.
+ //
+ // Before processing messages, the optional StartFunction is run.
+ //
+ // A message handler will run until it terminates either normally or
+ // abnormally. Normal termination occurs when the message handler
+ // no longer has any live ports. Abnormal termination occurs when
+ // HandleMessage() indicates that an error has occurred during
+ // message procesing.
+ void Run(ThreadPool* pool, StartFunction function, StartData data);
+
+ // Runs this message handler on the thread pool and waits for the
+ // message handler to terminate.
+ //
+ // Before processing messages, the optional StartFunction is run.
+ //
+ // A message handler will run until it terminates either normally or
+ // abnormally. Normal termination occurs when the message handler
+ // no longer has any live ports. Abnormal termination occurs when
+ // HandleMessage() indicates that an error has occurred during
+ // message procesing.
+ void RunBlocking(ThreadPool* pool, StartFunction function, StartData data);
+
+ // Handles the next message for this message handler. Should only
+ // be used when not running the handler on the thread pool (via Run
+ // or RunBlocking).
+ //
+ // Returns true on success.
+ bool HandleNextMessage();
+
+ // Handles any OOB messages for this message handler. Can be used
+ // even if the message handler is running on the thread pool.
+ //
+ // Returns true on success.
+ bool HandleOOBMessages();
+
+ // A message handler tracks how many live ports it has.
+ bool HasLivePorts() const { return live_ports_ > 0; }
+
#if defined(DEBUG)
// Check that it is safe to access this message handler.
//
@@ -120,12 +73,23 @@
virtual void CheckAccess();
#endif
+ // ------------ PortMap API ------------
+ // These functions should only be called from the PortMap.
+
+ // Posts a message on this handler's message queue.
void PostMessage(Message* message);
+
+ // Notifies this handler that a port is being closed.
void ClosePort(Dart_Port port);
+
+ // Notifies this handler that all ports are being closed.
void CloseAllPorts();
- // A message handler tracks how many live ports it has.
- bool HasLivePorts() const { return live_ports_ > 0; }
+ // Returns true if the handler is owned by the PortMap.
+ //
+ // This is used to delete handlers when their last live port is closed.
+ virtual bool OwnedByPortMap() const { return false; }
+
void increment_live_ports() {
#if defined(DEBUG)
CheckAccess();
@@ -139,18 +103,47 @@
live_ports_--;
}
- // Returns true if the handler is owned by the PortMap.
+ protected:
+ // Custom message notification. Optionally provided by subclass.
+ virtual void MessageNotify(Message::Priority priority);
+
+ // Handles a single message. Provided by subclass.
//
- // This is used to delete handlers when their last live port is closed.
- virtual bool OwnedByPortMap() const { return false; }
+ // Returns true on success.
+ virtual bool HandleMessage(Message* message) = 0;
- MessageQueue* queue() const { return queue_; }
+ // Gets the error string for this message handler, if any. The
+ // returned string should be allocated by malloc. Caller is
+ // responsible for freeing the string.
+ virtual char* GetErrorCString() { return NULL; }
private:
- intptr_t live_ports_;
+ friend class MessageHandlerTestPeer;
+ friend class MessageHandlerTask;
+
+ // Called by MessageHandlerTask to process our task queue.
+ void TaskCallback();
+
+ // Dequeue the next message. Prefer messages from the oob_queue_ to
+ // messages from the queue_.
+ Message* DequeueMessage(Message::Priority min_priority);
+
+ // Handles any pending messages.
+ bool HandleMessages();
+
+ void CheckTermination(MonitorLocker* ml, bool error);
+
+ Monitor monitor_;
MessageQueue* queue_;
+ MessageQueue* oob_queue_;
+ ThreadPool* pool_;
+ bool blocking_;
+ ThreadPool::Task* task_;
+ StartFunction start_function_;
+ StartData start_data_;
+ intptr_t live_ports_;
};
} // namespace dart
-#endif // VM_MESSAGE_H_
+#endif // VM_MESSAGE_HANDLER_H_

Powered by Google App Engine
This is Rietveld 408576698