OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "platform/assert.h" | 5 #include "platform/assert.h" |
6 #include "vm/message_queue.h" | 6 #include "vm/message_queue.h" |
7 #include "vm/unit_test.h" | 7 #include "vm/unit_test.h" |
8 | 8 |
9 namespace dart { | 9 namespace dart { |
10 | 10 |
11 | 11 |
12 // Provide access to private members of MessageQueue for testing. | 12 // Provide access to private members of MessageQueue for testing. |
13 class MessageQueueTestPeer { | 13 class MessageQueueTestPeer { |
14 public: | 14 public: |
15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} | 15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} |
16 | 16 |
17 bool HasMessage() const { return queue_->head_ != NULL; } | 17 bool HasMessage() const { |
| 18 // We don't really need to grab the monitor during the unit test, |
| 19 // but it doesn't hurt. |
| 20 queue_->monitor_.Enter(); |
| 21 bool result = (queue_->head_[Message::kNormalPriority] != NULL || |
| 22 queue_->head_[Message::kOOBPriority] != NULL); |
| 23 queue_->monitor_.Exit(); |
| 24 return result; |
| 25 } |
18 | 26 |
19 private: | 27 private: |
20 MessageQueue* queue_; | 28 MessageQueue* queue_; |
21 }; | 29 }; |
22 | 30 |
23 | 31 |
24 static Dart_Message AllocMsg(const char* str) { | 32 static uint8_t* AllocMsg(const char* str) { |
25 return reinterpret_cast<Dart_Message>(strdup(str)); | 33 return reinterpret_cast<uint8_t*>(strdup(str)); |
26 } | 34 } |
27 | 35 |
28 | 36 |
29 TEST_CASE(MessageQueue_BasicOperations) { | 37 TEST_CASE(MessageQueue_BasicOperations) { |
30 MessageQueue queue; | 38 MessageQueue queue; |
31 MessageQueueTestPeer queue_peer(&queue); | 39 MessageQueueTestPeer queue_peer(&queue); |
32 EXPECT(!queue_peer.HasMessage()); | 40 EXPECT(!queue_peer.HasMessage()); |
33 | 41 |
34 Dart_Port port = 1; | 42 Dart_Port port = 1; |
35 | 43 |
36 // Add two messages. | 44 // Add two messages. |
37 PortMessage* msg1 = new PortMessage(port, 0, AllocMsg("msg1")); | 45 Message* msg1 = |
| 46 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); |
38 queue.Enqueue(msg1); | 47 queue.Enqueue(msg1); |
39 EXPECT(queue_peer.HasMessage()); | 48 EXPECT(queue_peer.HasMessage()); |
40 | 49 |
41 PortMessage* msg2 = new PortMessage(port, 0, AllocMsg("msg2")); | 50 Message* msg2 = |
| 51 new Message(port, 0, AllocMsg("msg2"), Message::kNormalPriority); |
| 52 |
42 queue.Enqueue(msg2); | 53 queue.Enqueue(msg2); |
43 EXPECT(queue_peer.HasMessage()); | 54 EXPECT(queue_peer.HasMessage()); |
44 | 55 |
45 // Remove two messages. | 56 // Remove two messages. |
46 PortMessage* msg = queue.Dequeue(0); | 57 Message* msg = queue.Dequeue(0); |
47 EXPECT(msg != NULL); | 58 EXPECT(msg != NULL); |
48 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); | 59 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); |
49 EXPECT(queue_peer.HasMessage()); | 60 EXPECT(queue_peer.HasMessage()); |
50 | 61 |
51 msg = queue.Dequeue(0); | 62 msg = queue.Dequeue(0); |
52 EXPECT(msg != NULL); | 63 EXPECT(msg != NULL); |
53 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | 64 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); |
54 EXPECT(!queue_peer.HasMessage()); | 65 EXPECT(!queue_peer.HasMessage()); |
55 | 66 |
56 delete msg1; | 67 delete msg1; |
57 delete msg2; | 68 delete msg2; |
58 } | 69 } |
59 | 70 |
60 | 71 |
| 72 TEST_CASE(MessageQueue_Priorities) { |
| 73 MessageQueue queue; |
| 74 MessageQueueTestPeer queue_peer(&queue); |
| 75 EXPECT(!queue_peer.HasMessage()); |
| 76 |
| 77 Dart_Port port = 1; |
| 78 |
| 79 // Add two messages. |
| 80 Message* msg1 = |
| 81 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); |
| 82 queue.Enqueue(msg1); |
| 83 EXPECT(queue_peer.HasMessage()); |
| 84 |
| 85 Message* msg2 = |
| 86 new Message(port, 0, AllocMsg("msg2"), Message::kOOBPriority); |
| 87 |
| 88 queue.Enqueue(msg2); |
| 89 EXPECT(queue_peer.HasMessage()); |
| 90 |
| 91 // The higher priority message is delivered first. |
| 92 Message* msg = queue.Dequeue(0); |
| 93 EXPECT(msg != NULL); |
| 94 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); |
| 95 EXPECT(queue_peer.HasMessage()); |
| 96 |
| 97 msg = queue.Dequeue(0); |
| 98 EXPECT(msg != NULL); |
| 99 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); |
| 100 EXPECT(!queue_peer.HasMessage()); |
| 101 |
| 102 delete msg1; |
| 103 delete msg2; |
| 104 } |
| 105 |
| 106 |
61 // A thread which receives an expected sequence of messages. | 107 // A thread which receives an expected sequence of messages. |
62 static Monitor* sync = NULL; | 108 static Monitor* sync = NULL; |
63 static MessageQueue* shared_queue = NULL; | 109 static MessageQueue* shared_queue = NULL; |
64 void MessageReceiver_start(uword unused) { | 110 void MessageReceiver_start(uword unused) { |
65 // We only need an isolate here because the MonitorLocker in the | 111 // We only need an isolate here because the MonitorLocker in the |
66 // MessageQueue expects it, we don't need to initialize the isolate | 112 // MessageQueue expects it, we don't need to initialize the isolate |
67 // as it does not run any dart code. | 113 // as it does not run any dart code. |
68 Dart::CreateIsolate(NULL); | 114 Dart::CreateIsolate(NULL); |
69 | 115 |
70 // Create a message queue and share it. | 116 // Create a message queue and share it. |
71 MessageQueue* queue = new MessageQueue(); | 117 MessageQueue* queue = new MessageQueue(); |
72 MessageQueueTestPeer peer(queue); | 118 MessageQueueTestPeer peer(queue); |
73 shared_queue = queue; | 119 shared_queue = queue; |
74 | 120 |
75 // Tell the other thread that the shared queue is ready. | 121 // Tell the other thread that the shared queue is ready. |
76 { | 122 { |
77 MonitorLocker ml(sync); | 123 MonitorLocker ml(sync); |
78 ml.Notify(); | 124 ml.Notify(); |
79 } | 125 } |
80 | 126 |
81 // Wait for the other thread to fill the queue a bit. | 127 // Wait for the other thread to fill the queue a bit. |
82 while (!peer.HasMessage()) { | 128 while (!peer.HasMessage()) { |
83 MonitorLocker ml(sync); | 129 MonitorLocker ml(sync); |
84 ml.Wait(5); | 130 ml.Wait(5); |
85 } | 131 } |
86 | 132 |
87 for (int i = 0; i < 3; i++) { | 133 for (int i = 0; i < 3; i++) { |
88 PortMessage* msg = queue->Dequeue(0); | 134 Message* msg = queue->Dequeue(0); |
89 EXPECT(msg != NULL); | 135 EXPECT(msg != NULL); |
90 EXPECT_EQ(i+10, msg->dest_port()); | 136 EXPECT_EQ(i + 10, msg->dest_port()); |
91 EXPECT_EQ(i+100, msg->reply_port()); | 137 EXPECT_EQ(i + 100, msg->reply_port()); |
92 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data()))); | 138 EXPECT_EQ(i + 1000, *(reinterpret_cast<int*>(msg->data()))); |
93 delete msg; | 139 delete msg; |
94 } | 140 } |
95 for (int i = 0; i < 3; i++) { | 141 for (int i = 0; i < 3; i++) { |
96 PortMessage* msg = queue->Dequeue(0); | 142 Message* msg = queue->Dequeue(0); |
97 EXPECT(msg != NULL); | 143 EXPECT(msg != NULL); |
98 EXPECT_EQ(i+20, msg->dest_port()); | 144 EXPECT_EQ(i + 20, msg->dest_port()); |
99 EXPECT_EQ(i+200, msg->reply_port()); | 145 EXPECT_EQ(i + 200, msg->reply_port()); |
100 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data()))); | 146 EXPECT_EQ(i + 2000, *(reinterpret_cast<int*>(msg->data()))); |
101 delete msg; | 147 delete msg; |
102 } | 148 } |
103 shared_queue = NULL; | 149 shared_queue = NULL; |
104 delete queue; | 150 delete queue; |
105 Dart::ShutdownIsolate(); | 151 Dart::ShutdownIsolate(); |
106 } | 152 } |
107 | 153 |
108 | 154 |
109 TEST_CASE(MessageQueue_WaitNotify) { | 155 TEST_CASE(MessageQueue_WaitNotify) { |
110 sync = new Monitor(); | 156 sync = new Monitor(); |
111 | 157 |
112 Thread* thread = new Thread(MessageReceiver_start, 0); | 158 Thread* thread = new Thread(MessageReceiver_start, 0); |
113 EXPECT(thread != NULL); | 159 EXPECT(thread != NULL); |
114 | 160 |
115 // Wait for the shared queue to be created. | 161 // Wait for the shared queue to be created. |
116 while (shared_queue == NULL) { | 162 while (shared_queue == NULL) { |
117 MonitorLocker ml(sync); | 163 MonitorLocker ml(sync); |
118 ml.Wait(5); | 164 ml.Wait(5); |
119 } | 165 } |
120 ASSERT(shared_queue != NULL); | 166 ASSERT(shared_queue != NULL); |
121 | 167 |
122 // Pile up three messages before the other thread runs. | 168 // Pile up three messages before the other thread runs. |
123 for (int i = 0; i < 3; i++) { | 169 for (int i = 0; i < 3; i++) { |
124 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | 170 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); |
125 *data = i+1000; | 171 *data = i + 1000; |
126 PortMessage* msg = | 172 Message* msg = |
127 new PortMessage(i+10, i+100, reinterpret_cast<Dart_Message>(data)); | 173 new Message(i + 10, i + 100, reinterpret_cast<uint8_t*>(data), |
| 174 Message::kNormalPriority); |
128 shared_queue->Enqueue(msg); | 175 shared_queue->Enqueue(msg); |
129 } | 176 } |
130 | 177 |
131 // Wake the other thread and have it start consuming messages. | 178 // Wake the other thread and have it start consuming messages. |
132 { | 179 { |
133 MonitorLocker ml(sync); | 180 MonitorLocker ml(sync); |
134 ml.Notify(); | 181 ml.Notify(); |
135 } | 182 } |
136 | 183 |
137 // Add a few more messages after sleeping to allow the other thread | 184 // Add a few more messages after sleeping to allow the other thread |
138 // to potentially exercise the blocking code path in Dequeue. | 185 // to potentially exercise the blocking code path in Dequeue. |
139 OS::Sleep(5); | 186 OS::Sleep(5); |
140 for (int i = 0; i < 3; i++) { | 187 for (int i = 0; i < 3; i++) { |
141 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | 188 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); |
142 *data = i+2000; | 189 *data = i + 2000; |
143 PortMessage* msg = | 190 Message* msg = |
144 new PortMessage(i+20, i+200, reinterpret_cast<Dart_Message>(data)); | 191 new Message(i + 20, i + 200, reinterpret_cast<uint8_t*>(data), |
| 192 Message::kNormalPriority); |
145 shared_queue->Enqueue(msg); | 193 shared_queue->Enqueue(msg); |
146 } | 194 } |
147 | 195 |
148 sync = NULL; | 196 sync = NULL; |
149 delete sync; | 197 delete sync; |
150 | 198 |
151 // Give the spawned thread enough time to properly exit. | 199 // Give the spawned thread enough time to properly exit. |
152 OS::Sleep(20); | 200 OS::Sleep(20); |
153 } | 201 } |
154 | 202 |
155 | 203 |
156 TEST_CASE(MessageQueue_FlushAll) { | 204 TEST_CASE(MessageQueue_FlushAll) { |
157 MessageQueue queue; | 205 MessageQueue queue; |
158 MessageQueueTestPeer queue_peer(&queue); | 206 MessageQueueTestPeer queue_peer(&queue); |
159 Dart_Port port1 = 1; | 207 Dart_Port port1 = 1; |
160 Dart_Port port2 = 2; | 208 Dart_Port port2 = 2; |
161 | 209 |
162 // Add two messages. | 210 // Add two messages. |
163 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 211 Message* msg1 = |
| 212 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); |
164 queue.Enqueue(msg1); | 213 queue.Enqueue(msg1); |
165 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); | 214 Message* msg2 = |
| 215 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); |
166 queue.Enqueue(msg2); | 216 queue.Enqueue(msg2); |
167 | 217 |
168 EXPECT(queue_peer.HasMessage()); | 218 EXPECT(queue_peer.HasMessage()); |
169 queue.FlushAll(); | 219 queue.FlushAll(); |
170 EXPECT(!queue_peer.HasMessage()); | 220 EXPECT(!queue_peer.HasMessage()); |
171 | 221 |
172 // msg1 and msg2 already delete by FlushAll. | 222 // msg1 and msg2 already delete by FlushAll. |
173 } | 223 } |
174 | 224 |
175 | 225 |
176 TEST_CASE(MessageQueue_Flush) { | 226 TEST_CASE(MessageQueue_Flush) { |
177 MessageQueue queue; | 227 MessageQueue queue; |
178 MessageQueueTestPeer queue_peer(&queue); | 228 MessageQueueTestPeer queue_peer(&queue); |
179 Dart_Port port1 = 1; | 229 Dart_Port port1 = 1; |
180 Dart_Port port2 = 2; | 230 Dart_Port port2 = 2; |
181 | 231 |
182 // Add two messages on different ports. | 232 // Add two messages on different ports. |
183 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 233 Message* msg1 = |
| 234 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); |
184 queue.Enqueue(msg1); | 235 queue.Enqueue(msg1); |
185 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); | 236 Message* msg2 = |
| 237 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); |
186 queue.Enqueue(msg2); | 238 queue.Enqueue(msg2); |
187 EXPECT(queue_peer.HasMessage()); | 239 EXPECT(queue_peer.HasMessage()); |
188 | 240 |
189 queue.Flush(port1); | 241 queue.Flush(port1); |
190 | 242 |
191 // One message is left in the queue. | 243 // One message is left in the queue. |
192 EXPECT(queue_peer.HasMessage()); | 244 EXPECT(queue_peer.HasMessage()); |
193 PortMessage* msg = queue.Dequeue(0); | 245 Message* msg = queue.Dequeue(0); |
194 EXPECT(msg != NULL); | 246 EXPECT(msg != NULL); |
195 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | 247 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); |
196 | 248 |
197 EXPECT(!queue_peer.HasMessage()); | 249 EXPECT(!queue_peer.HasMessage()); |
198 | 250 |
199 // msg1 is already deleted by Flush. | 251 // msg1 is already deleted by Flush. |
200 delete msg2; | 252 delete msg2; |
201 } | 253 } |
202 | 254 |
203 | 255 |
204 TEST_CASE(MessageQueue_Flush_MultipleMessages) { | 256 TEST_CASE(MessageQueue_Flush_MultipleMessages) { |
205 MessageQueue queue; | 257 MessageQueue queue; |
206 MessageQueueTestPeer queue_peer(&queue); | 258 MessageQueueTestPeer queue_peer(&queue); |
207 Dart_Port port1 = 1; | 259 Dart_Port port1 = 1; |
208 | 260 |
209 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); | 261 Message* msg1 = |
| 262 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); |
210 queue.Enqueue(msg1); | 263 queue.Enqueue(msg1); |
211 PortMessage* msg2 = new PortMessage(port1, 0, AllocMsg("msg2")); | 264 Message* msg2 = |
| 265 new Message(port1, 0, AllocMsg("msg2"), Message::kNormalPriority); |
212 queue.Enqueue(msg2); | 266 queue.Enqueue(msg2); |
213 EXPECT(queue_peer.HasMessage()); | 267 EXPECT(queue_peer.HasMessage()); |
214 | 268 |
215 queue.Flush(port1); | 269 queue.Flush(port1); |
216 | 270 |
217 // Queue is empty. | 271 // Queue is empty. |
218 EXPECT(!queue_peer.HasMessage()); | 272 EXPECT(!queue_peer.HasMessage()); |
219 // msg1 and msg2 are already deleted by Flush. | 273 // msg1 and msg2 are already deleted by Flush. |
220 } | 274 } |
221 | 275 |
222 | 276 |
223 TEST_CASE(MessageQueue_Flush_EmptyQueue) { | 277 TEST_CASE(MessageQueue_Flush_EmptyQueue) { |
224 MessageQueue queue; | 278 MessageQueue queue; |
225 MessageQueueTestPeer queue_peer(&queue); | 279 MessageQueueTestPeer queue_peer(&queue); |
226 Dart_Port port1 = 1; | 280 Dart_Port port1 = 1; |
227 | 281 |
228 EXPECT(!queue_peer.HasMessage()); | 282 EXPECT(!queue_peer.HasMessage()); |
229 queue.Flush(port1); | 283 queue.Flush(port1); |
230 | 284 |
231 // Queue is still empty. | 285 // Queue is still empty. |
232 EXPECT(!queue_peer.HasMessage()); | 286 EXPECT(!queue_peer.HasMessage()); |
233 } | 287 } |
234 | 288 |
235 | |
236 } // namespace dart | 289 } // namespace dart |
OLD | NEW |