Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: runtime/vm/message_handler.h

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #ifndef VM_MESSAGE_H_ 5 #ifndef VM_MESSAGE_HANDLER_H_
6 #define VM_MESSAGE_H_ 6 #define VM_MESSAGE_HANDLER_H_
7 7
8 #include "vm/message.h"
8 #include "vm/thread.h" 9 #include "vm/thread.h"
9 10 #include "vm/thread_pool.h"
10 // Duplicated from dart_api.h to avoid including the whole header.
11 typedef int64_t Dart_Port;
12 11
13 namespace dart { 12 namespace dart {
14 13
15 class Message {
16 public:
17 typedef enum {
18 kNormalPriority = 0, // Deliver message when idle.
19 kOOBPriority = 1, // Deliver message asap.
20
21 // Iteration.
22 kFirstPriority = 0,
23 kNumPriorities = 2,
24 } Priority;
25
26 // A port number which is never used.
27 static const Dart_Port kIllegalPort = 0;
28
29 // A new message to be sent between two isolates. The data handed to this
30 // message will be disposed by calling free() once the message object is
31 // being destructed (after delivery or when the receiving port is closed).
32 //
33 // If reply_port is kIllegalPort, then there is no reply port.
34 Message(Dart_Port dest_port, Dart_Port reply_port,
35 uint8_t* data, Priority priority)
36 : next_(NULL),
37 dest_port_(dest_port),
38 reply_port_(reply_port),
39 data_(data),
40 priority_(priority) {}
41 ~Message() {
42 free(data_);
43 }
44
45 Dart_Port dest_port() const { return dest_port_; }
46 Dart_Port reply_port() const { return reply_port_; }
47 uint8_t* data() const { return data_; }
48 Priority priority() const { return priority_; }
49
50 private:
51 friend class MessageQueue;
52
53 Message* next_;
54 Dart_Port dest_port_;
55 Dart_Port reply_port_;
56 uint8_t* data_;
57 Priority priority_;
58
59 DISALLOW_COPY_AND_ASSIGN(Message);
60 };
61
62 // There is a message queue per isolate.
63 class MessageQueue {
64 public:
65 MessageQueue();
66 ~MessageQueue();
67
68 void Enqueue(Message* msg);
69
70 // Gets the next message from the message queue, possibly blocking
71 // if no message is available. 'millis' is a timeout in
72 // milliseconds. If 'millis' is 0, then this means to block
73 // indefinitely. May block if no message is available. May return
74 // NULL even if 'millis' is 0 due to spurious wakeups.
75 Message* Dequeue(int64_t millis);
76
77 // Gets the next message from the message queue if available. Will
78 // not block.
79 Message* DequeueNoWait();
80
81 // Gets the next message of the specified priority or greater from
82 // the message queue if available. Will not block.
83 Message* DequeueNoWaitWithPriority(Message::Priority min_priority);
84
85 void Flush(Dart_Port port);
86 void FlushAll();
87
88 private:
89 friend class MessageQueueTestPeer;
90
91 Message* DequeueNoWaitHoldsLock(Message::Priority min_priority);
92
93 Monitor monitor_;
94 Message* head_[Message::kNumPriorities];
95 Message* tail_[Message::kNumPriorities];
96
97 DISALLOW_COPY_AND_ASSIGN(MessageQueue);
98 };
99
100 // A MessageHandler is an entity capable of accepting messages. 14 // A MessageHandler is an entity capable of accepting messages.
101 class MessageHandler { 15 class MessageHandler {
102 protected: 16 protected:
103 MessageHandler(); 17 MessageHandler();
104 18
105 // Allows subclasses to provide custom message notification.
106 virtual void MessageNotify(Message::Priority priority);
107
108 public: 19 public:
109 virtual ~MessageHandler(); 20 virtual ~MessageHandler();
110 21
111 // Allow subclasses to provide a handler name. 22 // Allow subclasses to provide a handler name.
112 virtual const char* name() const; 23 virtual const char* name() const;
113 24
25 typedef uword StartData;
26 typedef bool (*StartFunction)(StartData data);
27
28 // Runs this message handler on the thread pool.
29 //
30 // Before processing messages, the optional StartFunction is run.
31 //
32 // A message handler will run until it terminates either normally or
33 // abnormally. Normal termination occurs when the message handler
34 // no longer has any live ports. Abnormal termination occurs when
35 // HandleMessage() indicates that an error has occurred during
36 // message procesing.
37 void Run(ThreadPool* pool, StartFunction function, StartData data);
38
39 // Runs this message handler on the thread pool and waits for the
40 // message handler to terminate.
41 //
42 // Before processing messages, the optional StartFunction is run.
43 //
44 // A message handler will run until it terminates either normally or
45 // abnormally. Normal termination occurs when the message handler
46 // no longer has any live ports. Abnormal termination occurs when
47 // HandleMessage() indicates that an error has occurred during
48 // message procesing.
49 void RunBlocking(ThreadPool* pool, StartFunction function, StartData data);
50
51 // Handles the next message for this message handler. Should only
52 // be used when not running the handler on the thread pool (via Run
53 // or RunBlocking).
54 //
55 // Returns true on success.
56 bool HandleNextMessage();
57
58 // Handles any OOB messages for this message handler. Can be used
59 // even if the message handler is running on the thread pool.
60 //
61 // Returns true on success.
62 bool HandleOOBMessages();
63
64 // A message handler tracks how many live ports it has.
65 bool HasLivePorts() const { return live_ports_ > 0; }
66
114 #if defined(DEBUG) 67 #if defined(DEBUG)
115 // Check that it is safe to access this message handler. 68 // Check that it is safe to access this message handler.
116 // 69 //
117 // For example, if this MessageHandler is an isolate, then it is 70 // For example, if this MessageHandler is an isolate, then it is
118 // only safe to access it when the MessageHandler is the current 71 // only safe to access it when the MessageHandler is the current
119 // isolate. 72 // isolate.
120 virtual void CheckAccess(); 73 virtual void CheckAccess();
121 #endif 74 #endif
122 75
76 // ------------ PortMap API ------------
77 // These functions should only be called from the PortMap.
78
79 // Posts a message on this handler's message queue.
123 void PostMessage(Message* message); 80 void PostMessage(Message* message);
81
82 // Notifies this handler that a port is being closed.
124 void ClosePort(Dart_Port port); 83 void ClosePort(Dart_Port port);
84
85 // Notifies this handler that all ports are being closed.
125 void CloseAllPorts(); 86 void CloseAllPorts();
126 87
127 // A message handler tracks how many live ports it has. 88 // Returns true if the handler is owned by the PortMap.
128 bool HasLivePorts() const { return live_ports_ > 0; } 89 //
90 // This is used to delete handlers when their last live port is closed.
91 virtual bool OwnedByPortMap() const { return false; }
92
129 void increment_live_ports() { 93 void increment_live_ports() {
130 #if defined(DEBUG) 94 #if defined(DEBUG)
131 CheckAccess(); 95 CheckAccess();
132 #endif 96 #endif
133 live_ports_++; 97 live_ports_++;
134 } 98 }
135 void decrement_live_ports() { 99 void decrement_live_ports() {
136 #if defined(DEBUG) 100 #if defined(DEBUG)
137 CheckAccess(); 101 CheckAccess();
138 #endif 102 #endif
139 live_ports_--; 103 live_ports_--;
140 } 104 }
141 105
142 // Returns true if the handler is owned by the PortMap. 106 protected:
107 // Custom message notification. Optionally provided by subclass.
108 virtual void MessageNotify(Message::Priority priority);
109
110 // Handles a single message. Provided by subclass.
143 // 111 //
144 // This is used to delete handlers when their last live port is closed. 112 // Returns true on success.
145 virtual bool OwnedByPortMap() const { return false; } 113 virtual bool HandleMessage(Message* message) = 0;
146 114
147 MessageQueue* queue() const { return queue_; } 115 // Gets the error string for this message handler, if any. The
116 // returned string should be allocated by malloc. Caller is
117 // responsible for freeing the string.
118 virtual char* GetErrorCString() { return NULL; }
148 119
149 private: 120 private:
121 friend class MessageHandlerTestPeer;
122 friend class MessageHandlerTask;
123
124 // Called by MessageHandlerTask to process our task queue.
125 void TaskCallback();
126
127 // Dequeue the next message. Prefer messages from the oob_queue_ to
128 // messages from the queue_.
129 Message* DequeueMessage(Message::Priority min_priority);
130
131 // Handles any pending messages.
132 bool HandleMessages();
133
134 void CheckTermination(MonitorLocker* ml, bool error);
135
136 Monitor monitor_;
137 MessageQueue* queue_;
138 MessageQueue* oob_queue_;
139 ThreadPool* pool_;
140 bool blocking_;
141 ThreadPool::Task* task_;
142 StartFunction start_function_;
143 StartData start_data_;
150 intptr_t live_ports_; 144 intptr_t live_ports_;
151 MessageQueue* queue_;
152 }; 145 };
153 146
154 } // namespace dart 147 } // namespace dart
155 148
156 #endif // VM_MESSAGE_H_ 149 #endif // VM_MESSAGE_HANDLER_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698