Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(261)

Side by Side Diff: runtime/vm/message_queue_test.cc

Issue 9182001: OOB messages and general message refactor. (Closed) Base URL: http://dart.googlecode.com/svn/branches/bleeding_edge/dart/
Patch Set: '' Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/vm/message_queue.cc ('k') | runtime/vm/port.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "platform/assert.h" 5 #include "platform/assert.h"
6 #include "vm/message_queue.h" 6 #include "vm/message_queue.h"
7 #include "vm/unit_test.h" 7 #include "vm/unit_test.h"
8 8
9 namespace dart { 9 namespace dart {
10 10
11 11
12 // Provide access to private members of MessageQueue for testing. 12 // Provide access to private members of MessageQueue for testing.
13 class MessageQueueTestPeer { 13 class MessageQueueTestPeer {
14 public: 14 public:
15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {} 15 explicit MessageQueueTestPeer(MessageQueue* queue) : queue_(queue) {}
16 16
17 bool HasMessage() const { return queue_->head_ != NULL; } 17 bool HasMessage() const {
18 // We don't really need to grab the monitor during the unit test,
19 // but it doesn't hurt.
20 queue_->monitor_.Enter();
21 bool result = (queue_->head_[Message::kNormalPriority] != NULL ||
22 queue_->head_[Message::kOOBPriority] != NULL);
23 queue_->monitor_.Exit();
24 return result;
25 }
18 26
19 private: 27 private:
20 MessageQueue* queue_; 28 MessageQueue* queue_;
21 }; 29 };
22 30
23 31
24 static Dart_Message AllocMsg(const char* str) { 32 static uint8_t* AllocMsg(const char* str) {
25 return reinterpret_cast<Dart_Message>(strdup(str)); 33 return reinterpret_cast<uint8_t*>(strdup(str));
26 } 34 }
27 35
28 36
29 TEST_CASE(MessageQueue_BasicOperations) { 37 TEST_CASE(MessageQueue_BasicOperations) {
30 MessageQueue queue; 38 MessageQueue queue;
31 MessageQueueTestPeer queue_peer(&queue); 39 MessageQueueTestPeer queue_peer(&queue);
32 EXPECT(!queue_peer.HasMessage()); 40 EXPECT(!queue_peer.HasMessage());
33 41
34 Dart_Port port = 1; 42 Dart_Port port = 1;
35 43
36 // Add two messages. 44 // Add two messages.
37 PortMessage* msg1 = new PortMessage(port, 0, AllocMsg("msg1")); 45 Message* msg1 =
46 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority);
38 queue.Enqueue(msg1); 47 queue.Enqueue(msg1);
39 EXPECT(queue_peer.HasMessage()); 48 EXPECT(queue_peer.HasMessage());
40 49
41 PortMessage* msg2 = new PortMessage(port, 0, AllocMsg("msg2")); 50 Message* msg2 =
51 new Message(port, 0, AllocMsg("msg2"), Message::kNormalPriority);
52
42 queue.Enqueue(msg2); 53 queue.Enqueue(msg2);
43 EXPECT(queue_peer.HasMessage()); 54 EXPECT(queue_peer.HasMessage());
44 55
45 // Remove two messages. 56 // Remove two messages.
46 PortMessage* msg = queue.Dequeue(0); 57 Message* msg = queue.Dequeue(0);
47 EXPECT(msg != NULL); 58 EXPECT(msg != NULL);
48 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data())); 59 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data()));
49 EXPECT(queue_peer.HasMessage()); 60 EXPECT(queue_peer.HasMessage());
50 61
51 msg = queue.Dequeue(0); 62 msg = queue.Dequeue(0);
52 EXPECT(msg != NULL); 63 EXPECT(msg != NULL);
53 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); 64 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
54 EXPECT(!queue_peer.HasMessage()); 65 EXPECT(!queue_peer.HasMessage());
55 66
56 delete msg1; 67 delete msg1;
57 delete msg2; 68 delete msg2;
58 } 69 }
59 70
60 71
72 TEST_CASE(MessageQueue_Priorities) {
73 MessageQueue queue;
74 MessageQueueTestPeer queue_peer(&queue);
75 EXPECT(!queue_peer.HasMessage());
76
77 Dart_Port port = 1;
78
79 // Add two messages.
80 Message* msg1 =
81 new Message(port, 0, AllocMsg("msg1"), Message::kNormalPriority);
82 queue.Enqueue(msg1);
83 EXPECT(queue_peer.HasMessage());
84
85 Message* msg2 =
86 new Message(port, 0, AllocMsg("msg2"), Message::kOOBPriority);
87
88 queue.Enqueue(msg2);
89 EXPECT(queue_peer.HasMessage());
90
91 // The higher priority message is delivered first.
92 Message* msg = queue.Dequeue(0);
93 EXPECT(msg != NULL);
94 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
95 EXPECT(queue_peer.HasMessage());
96
97 msg = queue.Dequeue(0);
98 EXPECT(msg != NULL);
99 EXPECT_STREQ("msg1", reinterpret_cast<char*>(msg->data()));
100 EXPECT(!queue_peer.HasMessage());
101
102 delete msg1;
103 delete msg2;
104 }
105
106
61 // A thread which receives an expected sequence of messages. 107 // A thread which receives an expected sequence of messages.
62 static Monitor* sync = NULL; 108 static Monitor* sync = NULL;
63 static MessageQueue* shared_queue = NULL; 109 static MessageQueue* shared_queue = NULL;
64 void MessageReceiver_start(uword unused) { 110 void MessageReceiver_start(uword unused) {
65 // We only need an isolate here because the MonitorLocker in the 111 // We only need an isolate here because the MonitorLocker in the
66 // MessageQueue expects it, we don't need to initialize the isolate 112 // MessageQueue expects it, we don't need to initialize the isolate
67 // as it does not run any dart code. 113 // as it does not run any dart code.
68 Dart::CreateIsolate(NULL); 114 Dart::CreateIsolate(NULL);
69 115
70 // Create a message queue and share it. 116 // Create a message queue and share it.
71 MessageQueue* queue = new MessageQueue(); 117 MessageQueue* queue = new MessageQueue();
72 MessageQueueTestPeer peer(queue); 118 MessageQueueTestPeer peer(queue);
73 shared_queue = queue; 119 shared_queue = queue;
74 120
75 // Tell the other thread that the shared queue is ready. 121 // Tell the other thread that the shared queue is ready.
76 { 122 {
77 MonitorLocker ml(sync); 123 MonitorLocker ml(sync);
78 ml.Notify(); 124 ml.Notify();
79 } 125 }
80 126
81 // Wait for the other thread to fill the queue a bit. 127 // Wait for the other thread to fill the queue a bit.
82 while (!peer.HasMessage()) { 128 while (!peer.HasMessage()) {
83 MonitorLocker ml(sync); 129 MonitorLocker ml(sync);
84 ml.Wait(5); 130 ml.Wait(5);
85 } 131 }
86 132
87 for (int i = 0; i < 3; i++) { 133 for (int i = 0; i < 3; i++) {
88 PortMessage* msg = queue->Dequeue(0); 134 Message* msg = queue->Dequeue(0);
89 EXPECT(msg != NULL); 135 EXPECT(msg != NULL);
90 EXPECT_EQ(i+10, msg->dest_port()); 136 EXPECT_EQ(i + 10, msg->dest_port());
91 EXPECT_EQ(i+100, msg->reply_port()); 137 EXPECT_EQ(i + 100, msg->reply_port());
92 EXPECT_EQ(i+1000, *(reinterpret_cast<int*>(msg->data()))); 138 EXPECT_EQ(i + 1000, *(reinterpret_cast<int*>(msg->data())));
93 delete msg; 139 delete msg;
94 } 140 }
95 for (int i = 0; i < 3; i++) { 141 for (int i = 0; i < 3; i++) {
96 PortMessage* msg = queue->Dequeue(0); 142 Message* msg = queue->Dequeue(0);
97 EXPECT(msg != NULL); 143 EXPECT(msg != NULL);
98 EXPECT_EQ(i+20, msg->dest_port()); 144 EXPECT_EQ(i + 20, msg->dest_port());
99 EXPECT_EQ(i+200, msg->reply_port()); 145 EXPECT_EQ(i + 200, msg->reply_port());
100 EXPECT_EQ(i+2000, *(reinterpret_cast<int*>(msg->data()))); 146 EXPECT_EQ(i + 2000, *(reinterpret_cast<int*>(msg->data())));
101 delete msg; 147 delete msg;
102 } 148 }
103 shared_queue = NULL; 149 shared_queue = NULL;
104 delete queue; 150 delete queue;
105 Dart::ShutdownIsolate(); 151 Dart::ShutdownIsolate();
106 } 152 }
107 153
108 154
109 TEST_CASE(MessageQueue_WaitNotify) { 155 TEST_CASE(MessageQueue_WaitNotify) {
110 sync = new Monitor(); 156 sync = new Monitor();
111 157
112 Thread* thread = new Thread(MessageReceiver_start, 0); 158 Thread* thread = new Thread(MessageReceiver_start, 0);
113 EXPECT(thread != NULL); 159 EXPECT(thread != NULL);
114 160
115 // Wait for the shared queue to be created. 161 // Wait for the shared queue to be created.
116 while (shared_queue == NULL) { 162 while (shared_queue == NULL) {
117 MonitorLocker ml(sync); 163 MonitorLocker ml(sync);
118 ml.Wait(5); 164 ml.Wait(5);
119 } 165 }
120 ASSERT(shared_queue != NULL); 166 ASSERT(shared_queue != NULL);
121 167
122 // Pile up three messages before the other thread runs. 168 // Pile up three messages before the other thread runs.
123 for (int i = 0; i < 3; i++) { 169 for (int i = 0; i < 3; i++) {
124 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); 170 int* data = reinterpret_cast<int*>(malloc(sizeof(*data)));
125 *data = i+1000; 171 *data = i + 1000;
126 PortMessage* msg = 172 Message* msg =
127 new PortMessage(i+10, i+100, reinterpret_cast<Dart_Message>(data)); 173 new Message(i + 10, i + 100, reinterpret_cast<uint8_t*>(data),
174 Message::kNormalPriority);
128 shared_queue->Enqueue(msg); 175 shared_queue->Enqueue(msg);
129 } 176 }
130 177
131 // Wake the other thread and have it start consuming messages. 178 // Wake the other thread and have it start consuming messages.
132 { 179 {
133 MonitorLocker ml(sync); 180 MonitorLocker ml(sync);
134 ml.Notify(); 181 ml.Notify();
135 } 182 }
136 183
137 // Add a few more messages after sleeping to allow the other thread 184 // Add a few more messages after sleeping to allow the other thread
138 // to potentially exercise the blocking code path in Dequeue. 185 // to potentially exercise the blocking code path in Dequeue.
139 OS::Sleep(5); 186 OS::Sleep(5);
140 for (int i = 0; i < 3; i++) { 187 for (int i = 0; i < 3; i++) {
141 int* data = reinterpret_cast<int*>(malloc(sizeof(*data))); 188 int* data = reinterpret_cast<int*>(malloc(sizeof(*data)));
142 *data = i+2000; 189 *data = i + 2000;
143 PortMessage* msg = 190 Message* msg =
144 new PortMessage(i+20, i+200, reinterpret_cast<Dart_Message>(data)); 191 new Message(i + 20, i + 200, reinterpret_cast<uint8_t*>(data),
192 Message::kNormalPriority);
145 shared_queue->Enqueue(msg); 193 shared_queue->Enqueue(msg);
146 } 194 }
147 195
148 sync = NULL; 196 sync = NULL;
149 delete sync; 197 delete sync;
150 198
151 // Give the spawned thread enough time to properly exit. 199 // Give the spawned thread enough time to properly exit.
152 OS::Sleep(20); 200 OS::Sleep(20);
153 } 201 }
154 202
155 203
156 TEST_CASE(MessageQueue_FlushAll) { 204 TEST_CASE(MessageQueue_FlushAll) {
157 MessageQueue queue; 205 MessageQueue queue;
158 MessageQueueTestPeer queue_peer(&queue); 206 MessageQueueTestPeer queue_peer(&queue);
159 Dart_Port port1 = 1; 207 Dart_Port port1 = 1;
160 Dart_Port port2 = 2; 208 Dart_Port port2 = 2;
161 209
162 // Add two messages. 210 // Add two messages.
163 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 211 Message* msg1 =
212 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
164 queue.Enqueue(msg1); 213 queue.Enqueue(msg1);
165 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); 214 Message* msg2 =
215 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority);
166 queue.Enqueue(msg2); 216 queue.Enqueue(msg2);
167 217
168 EXPECT(queue_peer.HasMessage()); 218 EXPECT(queue_peer.HasMessage());
169 queue.FlushAll(); 219 queue.FlushAll();
170 EXPECT(!queue_peer.HasMessage()); 220 EXPECT(!queue_peer.HasMessage());
171 221
172 // msg1 and msg2 already delete by FlushAll. 222 // msg1 and msg2 already delete by FlushAll.
173 } 223 }
174 224
175 225
176 TEST_CASE(MessageQueue_Flush) { 226 TEST_CASE(MessageQueue_Flush) {
177 MessageQueue queue; 227 MessageQueue queue;
178 MessageQueueTestPeer queue_peer(&queue); 228 MessageQueueTestPeer queue_peer(&queue);
179 Dart_Port port1 = 1; 229 Dart_Port port1 = 1;
180 Dart_Port port2 = 2; 230 Dart_Port port2 = 2;
181 231
182 // Add two messages on different ports. 232 // Add two messages on different ports.
183 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 233 Message* msg1 =
234 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
184 queue.Enqueue(msg1); 235 queue.Enqueue(msg1);
185 PortMessage* msg2 = new PortMessage(port2, 0, AllocMsg("msg2")); 236 Message* msg2 =
237 new Message(port2, 0, AllocMsg("msg2"), Message::kNormalPriority);
186 queue.Enqueue(msg2); 238 queue.Enqueue(msg2);
187 EXPECT(queue_peer.HasMessage()); 239 EXPECT(queue_peer.HasMessage());
188 240
189 queue.Flush(port1); 241 queue.Flush(port1);
190 242
191 // One message is left in the queue. 243 // One message is left in the queue.
192 EXPECT(queue_peer.HasMessage()); 244 EXPECT(queue_peer.HasMessage());
193 PortMessage* msg = queue.Dequeue(0); 245 Message* msg = queue.Dequeue(0);
194 EXPECT(msg != NULL); 246 EXPECT(msg != NULL);
195 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data())); 247 EXPECT_STREQ("msg2", reinterpret_cast<char*>(msg->data()));
196 248
197 EXPECT(!queue_peer.HasMessage()); 249 EXPECT(!queue_peer.HasMessage());
198 250
199 // msg1 is already deleted by Flush. 251 // msg1 is already deleted by Flush.
200 delete msg2; 252 delete msg2;
201 } 253 }
202 254
203 255
204 TEST_CASE(MessageQueue_Flush_MultipleMessages) { 256 TEST_CASE(MessageQueue_Flush_MultipleMessages) {
205 MessageQueue queue; 257 MessageQueue queue;
206 MessageQueueTestPeer queue_peer(&queue); 258 MessageQueueTestPeer queue_peer(&queue);
207 Dart_Port port1 = 1; 259 Dart_Port port1 = 1;
208 260
209 PortMessage* msg1 = new PortMessage(port1, 0, AllocMsg("msg1")); 261 Message* msg1 =
262 new Message(port1, 0, AllocMsg("msg1"), Message::kNormalPriority);
210 queue.Enqueue(msg1); 263 queue.Enqueue(msg1);
211 PortMessage* msg2 = new PortMessage(port1, 0, AllocMsg("msg2")); 264 Message* msg2 =
265 new Message(port1, 0, AllocMsg("msg2"), Message::kNormalPriority);
212 queue.Enqueue(msg2); 266 queue.Enqueue(msg2);
213 EXPECT(queue_peer.HasMessage()); 267 EXPECT(queue_peer.HasMessage());
214 268
215 queue.Flush(port1); 269 queue.Flush(port1);
216 270
217 // Queue is empty. 271 // Queue is empty.
218 EXPECT(!queue_peer.HasMessage()); 272 EXPECT(!queue_peer.HasMessage());
219 // msg1 and msg2 are already deleted by Flush. 273 // msg1 and msg2 are already deleted by Flush.
220 } 274 }
221 275
222 276
223 TEST_CASE(MessageQueue_Flush_EmptyQueue) { 277 TEST_CASE(MessageQueue_Flush_EmptyQueue) {
224 MessageQueue queue; 278 MessageQueue queue;
225 MessageQueueTestPeer queue_peer(&queue); 279 MessageQueueTestPeer queue_peer(&queue);
226 Dart_Port port1 = 1; 280 Dart_Port port1 = 1;
227 281
228 EXPECT(!queue_peer.HasMessage()); 282 EXPECT(!queue_peer.HasMessage());
229 queue.Flush(port1); 283 queue.Flush(port1);
230 284
231 // Queue is still empty. 285 // Queue is still empty.
232 EXPECT(!queue_peer.HasMessage()); 286 EXPECT(!queue_peer.HasMessage());
233 } 287 }
234 288
235
236 } // namespace dart 289 } // namespace dart
OLDNEW
« no previous file with comments | « runtime/vm/message_queue.cc ('k') | runtime/vm/port.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698