OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 #include "platform/assert.h" | |
6 #include "vm/message_queue.h" | |
7 #include "vm/unit_test.h" | |
8 | |
9 namespace dart { | |
10 | |
11 | |
12 // Provide access to private members of MessageQueue for testing. | |
13 class MessageQueueTestPeer { | |
14 public: | |
15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} | |
16 | |
17 bool HasMessage() const { | |
18 // We don't really need to grab the monitor during the unit test, | |
19 // but it doesn't hurt. | |
20 queue_->monitor_.Enter(); | |
21 bool result = (queue_->head_[Message::kNormalPriority] != NULL || | |
22 queue_->head_[Message::kOOBPriority] != NULL); | |
23 queue_->monitor_.Exit(); | |
24 return result; | |
25 } | |
26 | |
27 private: | |
28 MessageQueue* queue_; | |
29 }; | |
30 | |
31 | |
32 static uint8_t* AllocMsg(const char* str) { | |
33 return reinterpret_cast<uint8_t*>(strdup(str)); | |
34 } | |
35 | |
36 | |
37 TEST_CASE(MessageQueue_BasicOperations) { | |
38 MessageQueue queue; | |
39 MessageQueueTestPeer queue_peer(&queue); | |
40 EXPECT(!queue_peer.HasMessage()); | |
41 | |
42 Dart_Port port = 1; | |
43 | |
44 // Add two messages. | |
45 Message* msg1 = | |
46 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
47 queue.Enqueue(msg1); | |
48 EXPECT(queue_peer.HasMessage()); | |
49 | |
50 Message* msg2 = | |
51 new Message(port, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
52 | |
53 queue.Enqueue(msg2); | |
54 EXPECT(queue_peer.HasMessage()); | |
55 | |
56 // Remove two messages. | |
57 Message* msg = queue.Dequeue(0); | |
58 EXPECT(msg != NULL); | |
59 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); | |
60 EXPECT(queue_peer.HasMessage()); | |
61 | |
62 msg = queue.Dequeue(0); | |
63 EXPECT(msg != NULL); | |
64 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | |
65 EXPECT(!queue_peer.HasMessage()); | |
66 | |
67 delete msg1; | |
68 delete msg2; | |
69 } | |
70 | |
71 | |
72 TEST_CASE(MessageQueue_Priorities) { | |
73 MessageQueue queue; | |
74 MessageQueueTestPeer queue_peer(&queue); | |
75 EXPECT(!queue_peer.HasMessage()); | |
76 | |
77 Dart_Port port = 1; | |
78 | |
79 // Add two messages. | |
80 Message* msg1 = | |
81 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
82 queue.Enqueue(msg1); | |
83 EXPECT(queue_peer.HasMessage()); | |
84 | |
85 Message* msg2 = | |
86 new Message(port, 0, AllocMsg("msg2"), Message::kOOBPriority); | |
87 | |
88 queue.Enqueue(msg2); | |
89 EXPECT(queue_peer.HasMessage()); | |
90 | |
91 // The higher priority message is delivered first. | |
92 Message* msg = queue.Dequeue(0); | |
93 EXPECT(msg != NULL); | |
94 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | |
95 EXPECT(queue_peer.HasMessage()); | |
96 | |
97 msg = queue.Dequeue(0); | |
98 EXPECT(msg != NULL); | |
99 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); | |
100 EXPECT(!queue_peer.HasMessage()); | |
101 | |
102 delete msg1; | |
103 delete msg2; | |
104 } | |
105 | |
106 | |
107 // A thread which receives an expected sequence of messages. | |
108 static Monitor* sync = NULL; | |
109 static MessageQueue* shared_queue = NULL; | |
110 void MessageReceiver_start(uword unused) { | |
111 // We only need an isolate here because the MonitorLocker in the | |
112 // MessageQueue expects it, we don't need to initialize the isolate | |
113 // as it does not run any dart code. | |
114 Dart::CreateIsolate(NULL); | |
115 | |
116 // Create a message queue and share it. | |
117 MessageQueue* queue = new MessageQueue(); | |
118 MessageQueueTestPeer peer(queue); | |
119 shared_queue = queue; | |
120 | |
121 // Tell the other thread that the shared queue is ready. | |
122 { | |
123 MonitorLocker ml(sync); | |
124 ml.Notify(); | |
125 } | |
126 | |
127 // Wait for the other thread to fill the queue a bit. | |
128 while (!peer.HasMessage()) { | |
129 MonitorLocker ml(sync); | |
130 ml.Wait(5); | |
131 } | |
132 | |
133 for (int i = 0; i < 3; i++) { | |
134 Message* msg = queue->Dequeue(0); | |
135 EXPECT(msg != NULL); | |
136 EXPECT_EQ(i + 10, msg->dest_port()); | |
137 EXPECT_EQ(i + 100, msg->reply_port()); | |
138 EXPECT_EQ(i + 1000, *(reinterpret_cast<int*>(msg->data()))); | |
139 delete msg; | |
140 } | |
141 for (int i = 0; i < 3; i++) { | |
142 Message* msg = queue->Dequeue(0); | |
143 EXPECT(msg != NULL); | |
144 EXPECT_EQ(i + 20, msg->dest_port()); | |
145 EXPECT_EQ(i + 200, msg->reply_port()); | |
146 EXPECT_EQ(i + 2000, *(reinterpret_cast<int*>(msg->data()))); | |
147 delete msg; | |
148 } | |
149 shared_queue = NULL; | |
150 delete queue; | |
151 Dart::ShutdownIsolate(); | |
152 } | |
153 | |
154 | |
155 TEST_CASE(MessageQueue_WaitNotify) { | |
156 sync = new Monitor(); | |
157 | |
158 Thread* thread = new Thread(MessageReceiver_start, 0); | |
159 EXPECT(thread != NULL); | |
160 | |
161 // Wait for the shared queue to be created. | |
162 while (shared_queue == NULL) { | |
163 MonitorLocker ml(sync); | |
164 ml.Wait(5); | |
165 } | |
166 ASSERT(shared_queue != NULL); | |
167 | |
168 // Pile up three messages before the other thread runs. | |
169 for (int i = 0; i < 3; i++) { | |
170 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | |
171 *data = i + 1000; | |
172 Message* msg = | |
173 new Message(i + 10, i + 100, reinterpret_cast<uint8_t*>(data), | |
174 Message::kNormalPriority); | |
175 shared_queue->Enqueue(msg); | |
176 } | |
177 | |
178 // Wake the other thread and have it start consuming messages. | |
179 { | |
180 MonitorLocker ml(sync); | |
181 ml.Notify(); | |
182 } | |
183 | |
184 // Add a few more messages after sleeping to allow the other thread | |
185 // to potentially exercise the blocking code path in Dequeue. | |
186 OS::Sleep(5); | |
187 for (int i = 0; i < 3; i++) { | |
188 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); | |
189 *data = i + 2000; | |
190 Message* msg = | |
191 new Message(i + 20, i + 200, reinterpret_cast<uint8_t*>(data), | |
192 Message::kNormalPriority); | |
193 shared_queue->Enqueue(msg); | |
194 } | |
195 | |
196 sync = NULL; | |
197 delete sync; | |
198 | |
199 // Give the spawned thread enough time to properly exit. | |
200 OS::Sleep(20); | |
201 } | |
202 | |
203 | |
204 TEST_CASE(MessageQueue_FlushAll) { | |
205 MessageQueue queue; | |
206 MessageQueueTestPeer queue_peer(&queue); | |
207 Dart_Port port1 = 1; | |
208 Dart_Port port2 = 2; | |
209 | |
210 // Add two messages. | |
211 Message* msg1 = | |
212 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
213 queue.Enqueue(msg1); | |
214 Message* msg2 = | |
215 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
216 queue.Enqueue(msg2); | |
217 | |
218 EXPECT(queue_peer.HasMessage()); | |
219 queue.FlushAll(); | |
220 EXPECT(!queue_peer.HasMessage()); | |
221 | |
222 // msg1 and msg2 already delete by FlushAll. | |
223 } | |
224 | |
225 | |
226 TEST_CASE(MessageQueue_Flush) { | |
227 MessageQueue queue; | |
228 MessageQueueTestPeer queue_peer(&queue); | |
229 Dart_Port port1 = 1; | |
230 Dart_Port port2 = 2; | |
231 | |
232 // Add two messages on different ports. | |
233 Message* msg1 = | |
234 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
235 queue.Enqueue(msg1); | |
236 Message* msg2 = | |
237 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
238 queue.Enqueue(msg2); | |
239 EXPECT(queue_peer.HasMessage()); | |
240 | |
241 queue.Flush(port1); | |
242 | |
243 // One message is left in the queue. | |
244 EXPECT(queue_peer.HasMessage()); | |
245 Message* msg = queue.Dequeue(0); | |
246 EXPECT(msg != NULL); | |
247 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); | |
248 | |
249 EXPECT(!queue_peer.HasMessage()); | |
250 | |
251 // msg1 is already deleted by Flush. | |
252 delete msg2; | |
253 } | |
254 | |
255 | |
256 TEST_CASE(MessageQueue_Flush_MultipleMessages) { | |
257 MessageQueue queue; | |
258 MessageQueueTestPeer queue_peer(&queue); | |
259 Dart_Port port1 = 1; | |
260 | |
261 Message* msg1 = | |
262 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority); | |
263 queue.Enqueue(msg1); | |
264 Message* msg2 = | |
265 new Message(port1, 0, AllocMsg("msg2"), Message::kNormalPriority); | |
266 queue.Enqueue(msg2); | |
267 EXPECT(queue_peer.HasMessage()); | |
268 | |
269 queue.Flush(port1); | |
270 | |
271 // Queue is empty. | |
272 EXPECT(!queue_peer.HasMessage()); | |
273 // msg1 and msg2 are already deleted by Flush. | |
274 } | |
275 | |
276 | |
277 TEST_CASE(MessageQueue_Flush_EmptyQueue) { | |
278 MessageQueue queue; | |
279 MessageQueueTestPeer queue_peer(&queue); | |
280 Dart_Port port1 = 1; | |
281 | |
282 EXPECT(!queue_peer.HasMessage()); | |
283 queue.Flush(port1); | |
284 | |
285 // Queue is still empty. | |
286 EXPECT(!queue_peer.HasMessage()); | |
287 } | |
288 | |
289 } // namespace dart | |
OLD | NEW |