Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(608)

Unified Diff: runtime/vm/message_handler.h

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/vm/message.cc ('k') | runtime/vm/message_handler.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/vm/message_handler.h
===================================================================
--- runtime/vm/message_handler.h (revision 5489)
+++ runtime/vm/message_handler.h (working copy)
@@ -2,115 +2,60 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-#ifndef VM_MESSAGE_H_
-#define VM_MESSAGE_H_
+#ifndef VM_MESSAGE_HANDLER_H_
+#define VM_MESSAGE_HANDLER_H_
+#include "vm/message.h"
#include "vm/thread.h"
+#include "vm/thread_pool.h"
-// Duplicated from dart_api.h to avoid including the whole header.
-typedef int64_t Dart_Port;
-
namespace dart {
-class Message {
- public:
- typedef enum {
- kNormalPriority = 0, // Deliver message when idle.
- kOOBPriority = 1, // Deliver message asap.
-
- // Iteration.
- kFirstPriority = 0,
- kNumPriorities = 2,
- } Priority;
-
- // A port number which is never used.
- static const Dart_Port kIllegalPort = 0;
-
- // A new message to be sent between two isolates. The data handed to this
- // message will be disposed by calling free() once the message object is
- // being destructed (after delivery or when the receiving port is closed).
- //
- // If reply_port is kIllegalPort, then there is no reply port.
- Message(Dart_Port dest_port, Dart_Port reply_port,
- uint8_t* data, Priority priority)
- : next_(NULL),
- dest_port_(dest_port),
- reply_port_(reply_port),
- data_(data),
- priority_(priority) {}
- ~Message() {
- free(data_);
- }
-
- Dart_Port dest_port() const { return dest_port_; }
- Dart_Port reply_port() const { return reply_port_; }
- uint8_t* data() const { return data_; }
- Priority priority() const { return priority_; }
-
- private:
- friend class MessageQueue;
-
- Message* next_;
- Dart_Port dest_port_;
- Dart_Port reply_port_;
- uint8_t* data_;
- Priority priority_;
-
- DISALLOW_COPY_AND_ASSIGN(Message);
-};
-
-// There is a message queue per isolate.
-class MessageQueue {
- public:
- MessageQueue();
- ~MessageQueue();
-
- void Enqueue(Message* msg);
-
- // Gets the next message from the message queue, possibly blocking
- // if no message is available. 'millis' is a timeout in
- // milliseconds. If 'millis' is 0, then this means to block
- // indefinitely. May block if no message is available. May return
- // NULL even if 'millis' is 0 due to spurious wakeups.
- Message* Dequeue(int64_t millis);
-
- // Gets the next message from the message queue if available. Will
- // not block.
- Message* DequeueNoWait();
-
- // Gets the next message of the specified priority or greater from
- // the message queue if available. Will not block.
- Message* DequeueNoWaitWithPriority(Message::Priority min_priority);
-
- void Flush(Dart_Port port);
- void FlushAll();
-
- private:
- friend class MessageQueueTestPeer;
-
- Message* DequeueNoWaitHoldsLock(Message::Priority min_priority);
-
- Monitor monitor_;
- Message* head_[Message::kNumPriorities];
- Message* tail_[Message::kNumPriorities];
-
- DISALLOW_COPY_AND_ASSIGN(MessageQueue);
-};
-
// A MessageHandler is an entity capable of accepting messages.
class MessageHandler {
protected:
MessageHandler();
- // Allows subclasses to provide custom message notification.
- virtual void MessageNotify(Message::Priority priority);
-
public:
virtual ~MessageHandler();
// Allow subclasses to provide a handler name.
virtual const char* name() const;
+ typedef uword CallbackData;
+ typedef bool (*StartCallback)(CallbackData data);
+ typedef void (*EndCallback)(CallbackData data);
+
+ // Runs this message handler on the thread pool.
+ //
+ // Before processing messages, the optional StartFunction is run.
+ //
+ // A message handler will run until it terminates either normally or
+ // abnormally. Normal termination occurs when the message handler
+ // no longer has any live ports. Abnormal termination occurs when
+ // HandleMessage() indicates that an error has occurred during
+ // message processing.
+ void Run(ThreadPool* pool,
+ StartCallback start_callback,
+ EndCallback end_callback,
+ CallbackData data);
+
+ // Handles the next message for this message handler. Should only
+ // be used when not running the handler on the thread pool (via Run
+ // or RunBlocking).
+ //
+ // Returns true on success.
+ bool HandleNextMessage();
+
+ // Handles any OOB messages for this message handler. Can be used
+ // even if the message handler is running on the thread pool.
+ //
+ // Returns true on success.
+ bool HandleOOBMessages();
+
+ // A message handler tracks how many live ports it has.
+ bool HasLivePorts() const { return live_ports_ > 0; }
+
#if defined(DEBUG)
// Check that it is safe to access this message handler.
//
@@ -120,37 +65,65 @@
virtual void CheckAccess();
#endif
+ protected:
+ // ------------ START PortMap API ------------
+ // These functions should only be called from the PortMap.
+
+ // Posts a message on this handler's message queue.
void PostMessage(Message* message);
+
+ // Notifies this handler that a port is being closed.
void ClosePort(Dart_Port port);
+
+ // Notifies this handler that all ports are being closed.
void CloseAllPorts();
- // A message handler tracks how many live ports it has.
- bool HasLivePorts() const { return live_ports_ > 0; }
- void increment_live_ports() {
-#if defined(DEBUG)
- CheckAccess();
-#endif
- live_ports_++;
- }
- void decrement_live_ports() {
-#if defined(DEBUG)
- CheckAccess();
-#endif
- live_ports_--;
- }
-
// Returns true if the handler is owned by the PortMap.
//
// This is used to delete handlers when their last live port is closed.
virtual bool OwnedByPortMap() const { return false; }
- MessageQueue* queue() const { return queue_; }
+ void increment_live_ports();
+ void decrement_live_ports();
+ // ------------ END PortMap API ------------
+ // Custom message notification. Optionally provided by subclass.
+ virtual void MessageNotify(Message::Priority priority);
+
+ // Handles a single message. Provided by subclass.
+ //
+ // Returns true on success.
+ virtual bool HandleMessage(Message* message) = 0;
+
private:
- intptr_t live_ports_;
+ friend class PortMap;
+ friend class MessageHandlerTestPeer;
+ friend class MessageHandlerTask;
+
+ // Called by MessageHandlerTask to process our task queue.
+ void TaskCallback();
+
+ // Dequeue the next message. Prefer messages from the oob_queue_ to
+ // messages from the queue_.
+ Message* DequeueMessage(Message::Priority min_priority);
+
+ // Handles any pending messages.
+ bool HandleMessages(bool allow_normal_messages,
+ bool allow_multiple_normal_messages);
+
+ Monitor monitor_; // Protects all fields in MessageHandler.
MessageQueue* queue_;
+ MessageQueue* oob_queue_;
+ intptr_t live_ports_;
+ ThreadPool* pool_;
+ ThreadPool::Task* task_;
+ StartCallback start_callback_;
+ EndCallback end_callback_;
+ CallbackData callback_data_;
+
+ DISALLOW_COPY_AND_ASSIGN(MessageHandler);
};
} // namespace dart
-#endif // VM_MESSAGE_H_
+#endif // VM_MESSAGE_HANDLER_H_
« no previous file with comments | « runtime/vm/message.cc ('k') | runtime/vm/message_handler.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698