OLD | NEW |
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "vm/message_handler.h" | 5 #include "vm/message_handler.h" |
6 | 6 |
7 #include "vm/dart.h" | 7 #include "vm/dart.h" |
8 #include "vm/lockers.h" | 8 #include "vm/lockers.h" |
9 #include "vm/os.h" | 9 #include "vm/os.h" |
10 #include "vm/port.h" | 10 #include "vm/port.h" |
11 #include "vm/thread_interrupter.h" | 11 #include "vm/thread_interrupter.h" |
12 | 12 |
13 | 13 |
14 namespace dart { | 14 namespace dart { |
15 | 15 |
16 DECLARE_FLAG(bool, trace_isolates); | 16 DECLARE_FLAG(bool, trace_isolates); |
17 DECLARE_FLAG(bool, trace_service_pause_events); | 17 DECLARE_FLAG(bool, trace_service_pause_events); |
18 | 18 |
19 class MessageHandlerTask : public ThreadPool::Task { | 19 class MessageHandlerTask : public ThreadPool::Task { |
20 public: | 20 public: |
21 explicit MessageHandlerTask(MessageHandler* handler) | 21 explicit MessageHandlerTask(MessageHandler* handler) |
22 : handler_(handler) { | 22 : handler_(handler) { |
23 ASSERT(handler != NULL); | 23 ASSERT(handler != NULL); |
24 } | 24 } |
25 | 25 |
26 virtual void Run() { | 26 virtual void Run() { |
| 27 ASSERT(handler_ != NULL); |
27 handler_->TaskCallback(); | 28 handler_->TaskCallback(); |
28 } | 29 } |
29 | 30 |
30 private: | 31 private: |
31 MessageHandler* handler_; | 32 MessageHandler* handler_; |
32 | 33 |
33 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); | 34 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask); |
34 }; | 35 }; |
35 | 36 |
36 | 37 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
75 | 76 |
76 void MessageHandler::MessageNotify(Message::Priority priority) { | 77 void MessageHandler::MessageNotify(Message::Priority priority) { |
77 // By default, there is no custom message notification. | 78 // By default, there is no custom message notification. |
78 } | 79 } |
79 | 80 |
80 | 81 |
81 void MessageHandler::Run(ThreadPool* pool, | 82 void MessageHandler::Run(ThreadPool* pool, |
82 StartCallback start_callback, | 83 StartCallback start_callback, |
83 EndCallback end_callback, | 84 EndCallback end_callback, |
84 CallbackData data) { | 85 CallbackData data) { |
| 86 bool task_running; |
85 MonitorLocker ml(&monitor_); | 87 MonitorLocker ml(&monitor_); |
86 if (FLAG_trace_isolates) { | 88 if (FLAG_trace_isolates) { |
87 OS::Print("[+] Starting message handler:\n" | 89 OS::Print("[+] Starting message handler:\n" |
88 "\thandler: %s\n", | 90 "\thandler: %s\n", |
89 name()); | 91 name()); |
90 } | 92 } |
91 ASSERT(pool_ == NULL); | 93 ASSERT(pool_ == NULL); |
92 pool_ = pool; | 94 pool_ = pool; |
93 start_callback_ = start_callback; | 95 start_callback_ = start_callback; |
94 end_callback_ = end_callback; | 96 end_callback_ = end_callback; |
95 callback_data_ = data; | 97 callback_data_ = data; |
96 task_ = new MessageHandlerTask(this); | 98 task_ = new MessageHandlerTask(this); |
97 pool_->Run(task_); | 99 task_running = pool_->Run(task_); |
| 100 ASSERT(task_running); |
98 } | 101 } |
99 | 102 |
100 | 103 |
101 void MessageHandler::PostMessage(Message* message, bool before_events) { | 104 void MessageHandler::PostMessage(Message* message, bool before_events) { |
102 Message::Priority saved_priority; | 105 Message::Priority saved_priority; |
| 106 bool task_running = true; |
103 { | 107 { |
104 MonitorLocker ml(&monitor_); | 108 MonitorLocker ml(&monitor_); |
105 if (FLAG_trace_isolates) { | 109 if (FLAG_trace_isolates) { |
106 const char* source_name = "<native code>"; | 110 const char* source_name = "<native code>"; |
107 Isolate* source_isolate = Isolate::Current(); | 111 Isolate* source_isolate = Isolate::Current(); |
108 if (source_isolate) { | 112 if (source_isolate) { |
109 source_name = source_isolate->name(); | 113 source_name = source_isolate->name(); |
110 } | 114 } |
111 OS::Print("[>] Posting message:\n" | 115 OS::Print("[>] Posting message:\n" |
112 "\tlen: %" Pd "\n" | 116 "\tlen: %" Pd "\n" |
113 "\tsource: %s\n" | 117 "\tsource: %s\n" |
114 "\tdest: %s\n" | 118 "\tdest: %s\n" |
115 "\tdest_port: %" Pd64 "\n", | 119 "\tdest_port: %" Pd64 "\n", |
116 message->len(), source_name, name(), message->dest_port()); | 120 message->len(), source_name, name(), message->dest_port()); |
117 } | 121 } |
118 | 122 |
119 saved_priority = message->priority(); | 123 saved_priority = message->priority(); |
120 if (message->IsOOB()) { | 124 if (message->IsOOB()) { |
121 oob_queue_->Enqueue(message, before_events); | 125 oob_queue_->Enqueue(message, before_events); |
122 } else { | 126 } else { |
123 queue_->Enqueue(message, before_events); | 127 queue_->Enqueue(message, before_events); |
124 } | 128 } |
125 message = NULL; // Do not access message. May have been deleted. | 129 message = NULL; // Do not access message. May have been deleted. |
126 | 130 |
127 if (pool_ != NULL && task_ == NULL) { | 131 if ((pool_ != NULL) && (task_ == NULL)) { |
128 task_ = new MessageHandlerTask(this); | 132 task_ = new MessageHandlerTask(this); |
129 pool_->Run(task_); | 133 task_running = pool_->Run(task_); |
130 } | 134 } |
131 } | 135 } |
| 136 ASSERT(task_running); |
| 137 |
132 // Invoke any custom message notification. | 138 // Invoke any custom message notification. |
133 MessageNotify(saved_priority); | 139 MessageNotify(saved_priority); |
134 } | 140 } |
135 | 141 |
136 | 142 |
137 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { | 143 Message* MessageHandler::DequeueMessage(Message::Priority min_priority) { |
138 // TODO(turnidge): Add assert that monitor_ is held here. | 144 // TODO(turnidge): Add assert that monitor_ is held here. |
139 Message* message = oob_queue_->Dequeue(); | 145 Message* message = oob_queue_->Dequeue(); |
140 if ((message == NULL) && (min_priority < Message::kOOBPriority)) { | 146 if ((message == NULL) && (min_priority < Message::kOOBPriority)) { |
141 message = queue_->Dequeue(); | 147 message = queue_->Dequeue(); |
142 } | 148 } |
143 return message; | 149 return message; |
144 } | 150 } |
145 | 151 |
146 | 152 |
147 bool MessageHandler::HandleMessages(bool allow_normal_messages, | 153 bool MessageHandler::HandleMessages(bool allow_normal_messages, |
148 bool allow_multiple_normal_messages) { | 154 bool allow_multiple_normal_messages) { |
149 // If isolate() returns NULL StartIsolateScope does nothing. | 155 // If isolate() returns NULL StartIsolateScope does nothing. |
150 StartIsolateScope start_isolate(isolate()); | 156 StartIsolateScope start_isolate(isolate()); |
151 | 157 |
152 // ThreadInterrupter may have gone to sleep waiting while waiting for | 158 // ThreadInterrupter may have gone to sleep while waiting for |
153 // an isolate to start handling messages. | 159 // an isolate to start handling messages. |
154 ThreadInterrupter::WakeUp(); | 160 ThreadInterrupter::WakeUp(); |
155 | 161 |
156 // TODO(turnidge): Add assert that monitor_ is held here. | 162 // TODO(turnidge): Add assert that monitor_ is held here. |
157 bool result = true; | 163 bool result = true; |
158 Message::Priority min_priority = (allow_normal_messages && !paused()) ? | 164 Message::Priority min_priority = (allow_normal_messages && !paused()) ? |
159 Message::kNormalPriority : Message::kOOBPriority; | 165 Message::kNormalPriority : Message::kOOBPriority; |
160 Message* message = DequeueMessage(min_priority); | 166 Message* message = DequeueMessage(min_priority); |
161 while (message != NULL) { | 167 while (message != NULL) { |
162 intptr_t message_len = message->len(); | 168 intptr_t message_len = message->len(); |
(...skipping 239 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
402 | 408 |
403 | 409 |
404 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { | 410 void MessageHandler::AcquireQueues(AcquiredQueues* acquired_queues) { |
405 ASSERT(acquired_queues != NULL); | 411 ASSERT(acquired_queues != NULL); |
406 // No double dipping. | 412 // No double dipping. |
407 ASSERT(acquired_queues->handler_ == NULL); | 413 ASSERT(acquired_queues->handler_ == NULL); |
408 acquired_queues->Reset(this); | 414 acquired_queues->Reset(this); |
409 } | 415 } |
410 | 416 |
411 } // namespace dart | 417 } // namespace dart |
OLD | NEW |