Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: runtime/vm/message_handler.cc

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/message.h" 5 #include "vm/message_handler.h"
6 #include "vm/dart.h"
6 7
7 namespace dart { 8 namespace dart {
8 9
9 DECLARE_FLAG(bool, trace_isolates); 10 DECLARE_FLAG(bool, trace_isolates);
10 11
11 12
13 class MessageHandlerTask : public ThreadPool::Task {
14 public:
15 explicit MessageHandlerTask(MessageHandler* handler)
16 : handler_(handler) {
17 ASSERT(handler != NULL);
18 }
19
20 void Run() {
21 handler_->TaskCallback();
22 }
23
24 private:
25 MessageHandler* handler_;
26
27 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
28 };
29
30
12 MessageHandler::MessageHandler() 31 MessageHandler::MessageHandler()
13 : live_ports_(0), 32 : queue_(new MessageQueue()),
14 queue_(new MessageQueue()) { 33 oob_queue_(new MessageQueue()),
34 live_ports_(0),
35 pool_(NULL),
36 task_(NULL),
37 start_callback_(NULL),
38 end_callback_(NULL),
39 callback_data_(NULL) {
15 ASSERT(queue_ != NULL); 40 ASSERT(queue_ != NULL);
41 ASSERT(oob_queue_ != NULL);
16 } 42 }
17 43
18 44
19 MessageHandler::~MessageHandler() { 45 MessageHandler::~MessageHandler() {
20 delete queue_; 46 delete queue_;
47 delete oob_queue_;
21 } 48 }
22 49
23 50
24 const char* MessageHandler::name() const { 51 const char* MessageHandler::name() const {
25 return "<unnamed>"; 52 return "<unnamed>";
26 } 53 }
27 54
28 55
29 #if defined(DEBUG) 56 #if defined(DEBUG)
30 void MessageHandler::CheckAccess() { 57 void MessageHandler::CheckAccess() {
31 // By default there is no checking. 58 // By default there is no checking.
32 } 59 }
33 #endif 60 #endif
34 61
35 62
36 void MessageHandler::MessageNotify(Message::Priority priority) { 63 void MessageHandler::MessageNotify(Message::Priority priority) {
37 // By default, there is no custom message notification. 64 // By default, there is no custom message notification.
38 } 65 }
39 66
40 67
68 void MessageHandler::Run(ThreadPool* pool,
69 StartCallback start_callback,
70 EndCallback end_callback,
71 CallbackData data) {
72 MonitorLocker ml(&monitor_);
73 if (FLAG_trace_isolates) {
74 OS::Print("[+] Starting message handler:\n"
75 "\thandler: %s\n",
76 name());
77 }
78 ASSERT(pool_ == NULL);
79 pool_ = pool;
80 start_callback_ = start_callback;
81 end_callback_ = end_callback;
82 callback_data_ = data;
83 task_ = new MessageHandlerTask(this);
84 pool_->Run(task_);
85 }
86
87
41 void MessageHandler::PostMessage(Message* message) { 88 void MessageHandler::PostMessage(Message* message) {
89 MonitorLocker ml(&monitor_);
42 if (FLAG_trace_isolates) { 90 if (FLAG_trace_isolates) {
43 const char* source_name = "<native code>"; 91 const char* source_name = "<native code>";
44 Isolate* source_isolate = Isolate::Current(); 92 Isolate* source_isolate = Isolate::Current();
45 if (source_isolate) { 93 if (source_isolate) {
46 source_name = source_isolate->name(); 94 source_name = source_isolate->name();
47 } 95 }
48 OS::Print("[>] Posting message:\n" 96 OS::Print("[>] Posting message:\n"
49 "\tsource: %s\n" 97 "\tsource: %s\n"
50 "\treply_port: %lld\n" 98 "\treply_port: %lld\n"
51 "\tdest: %s\n" 99 "\tdest: %s\n"
52 "\tdest_port: %lld\n", 100 "\tdest_port: %lld\n",
53 source_name, message->reply_port(), name(), message->dest_port()); 101 source_name, message->reply_port(), name(), message->dest_port());
54 } 102 }
55 103
56 Message::Priority priority = message->priority(); 104 Message::Priority saved_priority = message->priority();
57 queue()->Enqueue(message); 105 if (message->IsOOB()) {
106 oob_queue_->Enqueue(message);
107 } else {
108 queue_->Enqueue(message);
109 }
58 message = NULL; // Do not access message. May have been deleted. 110 message = NULL; // Do not access message. May have been deleted.
59 111
112 if (pool_ != NULL && task_ == NULL) {
113 task_ = new MessageHandlerTask(this);
114 pool_->Run(task_);
115 }
116
60 // Invoke any custom message notification. 117 // Invoke any custom message notification.
61 MessageNotify(priority); 118 MessageNotify(saved_priority);
62 } 119 }
63 120
64 121
65 void MessageHandler::ClosePort(Dart_Port port) { 122 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {
66 queue()->Flush(port); 123 // TODO(turnidge): Add assert that monitor_ is held here.
124 Message* message = oob_queue_->Dequeue();
125 if (message == NULL && min_priority < Message::kOOBPriority) {
126 message = queue_->Dequeue();
127 }
128 return message;
67 } 129 }
68 130
69 131
70 void MessageHandler::CloseAllPorts() { 132 bool MessageHandler::HandleMessages(bool allow_normal_messages,
71 queue()->FlushAll(); 133 bool allow_multiple_normal_messages) {
72 } 134 // TODO(turnidge): Add assert that monitor_ is held here.
135 bool result = true;
136 Message::Priority min_priority = (allow_normal_messages
137 ? Message::kNormalPriority
138 : Message::kOOBPriority);
139 Message* message = DequeueMessage(min_priority);
140 while (message) {
141 if (FLAG_trace_isolates) {
142 OS::Print("[<] Handling message:\n"
143 "\thandler: %s\n"
144 "\tport: %lld\n",
145 name(), message->dest_port());
146 }
73 147
148 // Release the monitor_ temporarily while we handle the message.
149 // The monitor was acquired in MessageHandler::TaskCallback().
150 monitor_.Exit();
151 Message::Priority saved_priority = message->priority();
152 result = HandleMessage(message);
153 // ASSERT(Isolate::Current() == NULL);
154 monitor_.Enter();
74 155
75 MessageQueue::MessageQueue() { 156 if (!result) {
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 157 // If we hit an error, we're done processing messages.
77 head_[p] = NULL; 158 break;
78 tail_[p] = NULL;
79 }
80 }
81
82
83 MessageQueue::~MessageQueue() {
84 // Ensure that all pending messages have been released.
85 #if defined(DEBUG)
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
87 ASSERT(head_[p] == NULL);
88 }
89 #endif
90 }
91
92
93 void MessageQueue::Enqueue(Message* msg) {
94 MonitorLocker ml(&monitor_);
95 Message::Priority p = msg->priority();
96 // Make sure messages are not reused.
97 ASSERT(msg->next_ == NULL);
98 if (head_[p] == NULL) {
99 // Only element in the queue.
100 head_[p] = msg;
101 tail_[p] = msg;
102
103 // We only need to notify if the queue was empty.
104 monitor_.Notify();
105 } else {
106 ASSERT(tail_[p] != NULL);
107 // Append at the tail.
108 tail_[p]->next_ = msg;
109 tail_[p] = msg;
110 }
111 }
112
113
114 Message* MessageQueue::DequeueNoWait() {
115 MonitorLocker ml(&monitor_);
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority);
117 }
118
119
120 Message* MessageQueue::DequeueNoWaitWithPriority(
121 Message::Priority min_priority) {
122 MonitorLocker ml(&monitor_);
123 return DequeueNoWaitHoldsLock(min_priority);
124 }
125
126
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
128 // Look for the highest priority available message.
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
130 Message* result = head_[p];
131 if (result != NULL) {
132 head_[p] = result->next_;
133 // The following update to tail_ is not strictly needed.
134 if (head_[p] == NULL) {
135 tail_[p] = NULL;
136 }
137 #if defined(DEBUG)
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
139 #endif // DEBUG
140 return result;
141 } 159 }
142 } 160 if (!allow_multiple_normal_messages &&
143 return NULL; 161 saved_priority == Message::kNormalPriority) {
144 } 162 // Some callers want to process only one normal message and then quit.
145 163 break;
146 164 }
147 Message* MessageQueue::Dequeue(int64_t millis) { 165 message = DequeueMessage(min_priority);
148 ASSERT(millis >= 0);
149 MonitorLocker ml(&monitor_);
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
151 if (result == NULL) {
152 // No message available at any priority.
153 monitor_.Wait(millis);
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority);
155 } 166 }
156 return result; 167 return result;
157 } 168 }
158 169
159 170
160 void MessageQueue::Flush(Dart_Port port) { 171 bool MessageHandler::HandleNextMessage() {
172 // We can only call HandleNextMessage when this handler is not
173 // assigned to a thread pool.
161 MonitorLocker ml(&monitor_); 174 MonitorLocker ml(&monitor_);
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 175 ASSERT(pool_ == NULL);
163 Message* cur = head_[p]; 176 #if defined(DEBUG)
164 Message* prev = NULL; 177 CheckAccess();
165 while (cur != NULL) { 178 #endif
166 Message* next = cur->next_; 179 return HandleMessages(true, false);
167 // If the message matches, then remove it from the queue and delete it. 180 }
168 if (cur->dest_port() == port) { 181
169 if (prev != NULL) { 182
170 prev->next_ = next; 183 bool MessageHandler::HandleOOBMessages() {
171 } else { 184 MonitorLocker ml(&monitor_);
172 head_[p] = next; 185 #if defined(DEBUG)
173 } 186 CheckAccess();
174 delete cur; 187 #endif
175 } else { 188 return HandleMessages(false, false);
176 // Move prev forward. 189 }
177 prev = cur; 190
191
192 void MessageHandler::TaskCallback() {
193 ASSERT(Isolate::Current() == NULL);
194 bool ok = true;
195 bool run_end_callback = false;
196 {
197 MonitorLocker ml(&monitor_);
198 // Initialize the message handler by running its start function,
199 // if we have one. For an isolate, this will run the isolate's
200 // main() function.
201 if (start_callback_) {
202 monitor_.Exit();
203 ok = start_callback_(callback_data_);
204 ASSERT(Isolate::Current() == NULL);
205 start_callback_ = NULL;
206 monitor_.Enter();
207 }
208
209 // Handle any pending messages for this message handler.
210 if (ok) {
211 ok = HandleMessages(true, true);
212 }
213 task_ = NULL; // No task in queue.
214
215 if (!ok || !HasLivePorts()) {
216 if (FLAG_trace_isolates) {
217 OS::Print("[-] Stopping message handler (%s):\n"
218 "\thandler: %s\n",
219 (ok ? "no live ports" : "error"),
220 name());
178 } 221 }
179 // Advance to the next message in the queue. 222 pool_ = NULL;
180 cur = next; 223 run_end_callback = true;
181 } 224 }
182 tail_[p] = prev; 225 }
226 if (run_end_callback && end_callback_ != NULL) {
227 end_callback_(callback_data_);
228 // The handler may have been deleted after this point.
183 } 229 }
184 } 230 }
185 231
186 232
187 void MessageQueue::FlushAll() { 233 void MessageHandler::ClosePort(Dart_Port port) {
188 MonitorLocker ml(&monitor_); 234 MonitorLocker ml(&monitor_);
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 235 if (FLAG_trace_isolates) {
190 Message* cur = head_[p]; 236 OS::Print("[-] Closing port:\n"
191 head_[p] = NULL; 237 "\thandler: %s\n"
192 tail_[p] = NULL; 238 "\tport: %d\n",
193 while (cur != NULL) { 239 name(), port);
194 Message* next = cur->next_;
195 delete cur;
196 cur = next;
197 }
198 } 240 }
241 queue_->Flush(port);
242 oob_queue_->Flush(port);
199 } 243 }
200 244
201 245
246 void MessageHandler::CloseAllPorts() {
247 MonitorLocker ml(&monitor_);
248 if (FLAG_trace_isolates) {
249 OS::Print("[-] Closing all ports:\n"
250 "\thandler: %s\n",
251 name());
252 }
253 queue_->FlushAll();
254 oob_queue_->FlushAll();
255 }
256
257
258 void MessageHandler::increment_live_ports() {
259 MonitorLocker ml(&monitor_);
260 #if defined(DEBUG)
261 CheckAccess();
262 #endif
263 live_ports_++;
264 }
265
266
267 void MessageHandler::decrement_live_ports() {
268 MonitorLocker ml(&monitor_);
269 #if defined(DEBUG)
270 CheckAccess();
271 #endif
272 live_ports_--;
273 }
274
202 } // namespace dart 275 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/message_handler.h ('k') | runtime/vm/message_handler_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698