Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: runtime/vm/message.cc

Issue 9924015: Use the ThreadPool for all isolates and native ports. Previously, (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/message.h ('k') | runtime/vm/message_handler.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "vm/message.h" 5 #include "vm/message.h"
6 6
7 namespace dart { 7 namespace dart {
8 8
9 DECLARE_FLAG(bool, trace_isolates); 9 DECLARE_FLAG(bool, trace_isolates);
10 10
11
12 MessageHandler::MessageHandler()
13 : live_ports_(0),
14 queue_(new MessageQueue()) {
15 ASSERT(queue_ != NULL);
16 }
17
18
19 MessageHandler::~MessageHandler() {
20 delete queue_;
21 }
22
23
24 const char* MessageHandler::name() const {
25 return "<unnamed>";
26 }
27
28
29 #if defined(DEBUG)
30 void MessageHandler::CheckAccess() {
31 // By default there is no checking.
32 }
33 #endif
34
35
36 void MessageHandler::MessageNotify(Message::Priority priority) {
37 // By default, there is no custom message notification.
38 }
39
40
41 void MessageHandler::PostMessage(Message* message) {
42 if (FLAG_trace_isolates) {
43 const char* source_name = "<native code>";
44 Isolate* source_isolate = Isolate::Current();
45 if (source_isolate) {
46 source_name = source_isolate->name();
47 }
48 OS::Print("[>] Posting message:\n"
49 "\tsource: %s\n"
50 "\treply_port: %lld\n"
51 "\tdest: %s\n"
52 "\tdest_port: %lld\n",
53 source_name, message->reply_port(), name(), message->dest_port());
54 }
55
56 Message::Priority priority = message->priority();
57 queue()->Enqueue(message);
58 message = NULL; // Do not access message. May have been deleted.
59
60 // Invoke any custom message notification.
61 MessageNotify(priority);
62 }
63
64
65 void MessageHandler::ClosePort(Dart_Port port) {
66 queue()->Flush(port);
67 }
68
69
70 void MessageHandler::CloseAllPorts() {
71 queue()->FlushAll();
72 }
73
74
75 MessageQueue::MessageQueue() { 11 MessageQueue::MessageQueue() {
76 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 12 head_ = NULL;
77 head_[p] = NULL; 13 tail_ = NULL;
78 tail_[p] = NULL;
79 }
80 } 14 }
81 15
82 16
83 MessageQueue::~MessageQueue() { 17 MessageQueue::~MessageQueue() {
84 // Ensure that all pending messages have been released. 18 // Ensure that all pending messages have been released.
85 #if defined(DEBUG) 19 #if defined(DEBUG)
86 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 20 ASSERT(head_ == NULL);
87 ASSERT(head_[p] == NULL);
88 }
89 #endif 21 #endif
90 } 22 }
91 23
92 24
93 void MessageQueue::Enqueue(Message* msg) { 25 void MessageQueue::Enqueue(Message* msg) {
94 MonitorLocker ml(&monitor_);
95 Message::Priority p = msg->priority();
96 // Make sure messages are not reused. 26 // Make sure messages are not reused.
97 ASSERT(msg->next_ == NULL); 27 ASSERT(msg->next_ == NULL);
98 if (head_[p] == NULL) { 28 if (head_ == NULL) {
99 // Only element in the queue. 29 // Only element in the queue.
100 head_[p] = msg; 30 ASSERT(tail_ == NULL);
101 tail_[p] = msg; 31 head_ = msg;
102 32 tail_ = msg;
103 // We only need to notify if the queue was empty.
104 monitor_.Notify();
105 } else { 33 } else {
106 ASSERT(tail_[p] != NULL); 34 ASSERT(tail_ != NULL);
107 // Append at the tail. 35 // Append at the tail.
108 tail_[p]->next_ = msg; 36 tail_->next_ = msg;
109 tail_[p] = msg; 37 tail_ = msg;
110 } 38 }
111 } 39 }
112 40
113 41
114 Message* MessageQueue::DequeueNoWait() { 42 Message* MessageQueue::Dequeue() {
115 MonitorLocker ml(&monitor_); 43 Message* result = head_;
116 return DequeueNoWaitHoldsLock(Message::kFirstPriority); 44 if (result != NULL) {
117 } 45 head_ = result->next_;
118 46 // The following update to tail_ is not strictly needed.
119 47 if (head_ == NULL) {
120 Message* MessageQueue::DequeueNoWaitWithPriority( 48 tail_ = NULL;
121 Message::Priority min_priority) { 49 }
122 MonitorLocker ml(&monitor_);
123 return DequeueNoWaitHoldsLock(min_priority);
124 }
125
126
127 Message* MessageQueue::DequeueNoWaitHoldsLock(Message::Priority min_priority) {
128 // Look for the highest priority available message.
129 for (int p = Message::kNumPriorities-1; p >= min_priority; p--) {
130 Message* result = head_[p];
131 if (result != NULL) {
132 head_[p] = result->next_;
133 // The following update to tail_ is not strictly needed.
134 if (head_[p] == NULL) {
135 tail_[p] = NULL;
136 }
137 #if defined(DEBUG) 50 #if defined(DEBUG)
138 result->next_ = result; // Make sure to trigger ASSERT in Enqueue. 51 result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
139 #endif // DEBUG 52 #endif // DEBUG
140 return result; 53 return result;
141 }
142 } 54 }
143 return NULL; 55 return NULL;
144 } 56 }
145 57
146 58
147 Message* MessageQueue::Dequeue(int64_t millis) { 59 void MessageQueue::Flush(Dart_Port port) {
148 ASSERT(millis >= 0); 60 Message* cur = head_;
149 MonitorLocker ml(&monitor_); 61 Message* prev = NULL;
150 Message* result = DequeueNoWaitHoldsLock(Message::kFirstPriority); 62 while (cur != NULL) {
151 if (result == NULL) { 63 Message* next = cur->next_;
152 // No message available at any priority. 64 // If the message matches, then remove it from the queue and delete it.
153 monitor_.Wait(millis); 65 if (cur->dest_port() == port) {
154 result = DequeueNoWaitHoldsLock(Message::kFirstPriority); 66 if (prev != NULL) {
67 prev->next_ = next;
68 } else {
69 head_ = next;
70 }
71 delete cur;
72 } else {
73 // Move prev forward.
74 prev = cur;
75 }
76 // Advance to the next message in the queue.
77 cur = next;
155 } 78 }
156 return result; 79 tail_ = prev;
157 }
158
159
160 void MessageQueue::Flush(Dart_Port port) {
161 MonitorLocker ml(&monitor_);
162 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
163 Message* cur = head_[p];
164 Message* prev = NULL;
165 while (cur != NULL) {
166 Message* next = cur->next_;
167 // If the message matches, then remove it from the queue and delete it.
168 if (cur->dest_port() == port) {
169 if (prev != NULL) {
170 prev->next_ = next;
171 } else {
172 head_[p] = next;
173 }
174 delete cur;
175 } else {
176 // Move prev forward.
177 prev = cur;
178 }
179 // Advance to the next message in the queue.
180 cur = next;
181 }
182 tail_[p] = prev;
183 }
184 } 80 }
185 81
186 82
187 void MessageQueue::FlushAll() { 83 void MessageQueue::FlushAll() {
188 MonitorLocker ml(&monitor_); 84 Message* cur = head_;
189 for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { 85 head_ = NULL;
190 Message* cur = head_[p]; 86 tail_ = NULL;
191 head_[p] = NULL; 87 while (cur != NULL) {
192 tail_[p] = NULL; 88 Message* next = cur->next_;
193 while (cur != NULL) { 89 delete cur;
194 Message* next = cur->next_; 90 cur = next;
195 delete cur;
196 cur = next;
197 }
198 } 91 }
199 } 92 }
200 93
201 94
202 } // namespace dart 95 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/message.h ('k') | runtime/vm/message_handler.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698