Index: runtime/vm/message_queue.cc |
=================================================================== |
--- runtime/vm/message_queue.cc (revision 3557) |
+++ runtime/vm/message_queue.cc (working copy) |
@@ -6,93 +6,127 @@ |
namespace dart { |
+MessageQueue::MessageQueue() { |
+ for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
+ head_[p] = NULL; |
+ tail_[p] = NULL; |
+ } |
+} |
+ |
+ |
MessageQueue::~MessageQueue() { |
// Ensure that all pending messages have been released. |
- ASSERT(head_ == NULL); |
+#if defined(DEBUG) |
+ for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
+ ASSERT(head_[p] == NULL); |
+ } |
+#endif |
} |
-void MessageQueue::Enqueue(PortMessage* msg) { |
- // TODO(turnidge): Can't use MonitorLocker here because |
- // MonitorLocker is a StackResource, which requires a current |
- // isolate. Should MonitorLocker really be a StackResource? |
+void MessageQueue::Enqueue(Message* msg) { |
+ // TODO(turnidge): Add a scoped locker for monitors which is not a |
+ // stack resource. This would probably be useful in the platform |
+ // headers. |
monitor_.Enter(); |
+ |
+ Message::Priority p = msg->priority(); |
// Make sure messages are not reused. |
ASSERT(msg->next_ == NULL); |
- if (head_ == NULL) { |
+ if (head_[p] == NULL) { |
// Only element in the queue. |
- head_ = msg; |
- tail_ = msg; |
+ head_[p] = msg; |
+ tail_[p] = msg; |
// We only need to notify if the queue was empty. |
monitor_.Notify(); |
} else { |
- ASSERT(tail_ != NULL); |
+ ASSERT(tail_[p] != NULL); |
// Append at the tail. |
- tail_->next_ = msg; |
- tail_ = msg; |
+ tail_[p]->next_ = msg; |
+ tail_[p] = msg; |
} |
monitor_.Exit(); |
} |
+Message* MessageQueue::DequeueNoWait() { |
+ MonitorLocker ml(&monitor_); |
+ return DequeueNoWaitHoldsLock(); |
+} |
-PortMessage* MessageQueue::Dequeue(int64_t millis) { |
+Message* MessageQueue::DequeueNoWaitHoldsLock() { |
+ // Look for the highest priority available message. |
+ for (int p = Message::kNumPriorities-1; p >= Message::kFirstPriority; p--) { |
+ Message* result = head_[p]; |
+ if (result != NULL) { |
+ head_[p] = result->next_; |
+ // The following update to tail_ is not strictly needed. |
+ if (head_[p] == NULL) { |
+ tail_[p] = NULL; |
+ } |
+#if defined(DEBUG) |
+ result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
+#endif // DEBUG |
+ return result; |
+ } |
+ } |
+ return NULL; |
+} |
+ |
+ |
+Message* MessageQueue::Dequeue(int64_t millis) { |
ASSERT(millis >= 0); |
MonitorLocker ml(&monitor_); |
- PortMessage* result = head_; |
+ |
+ Message* result = DequeueNoWaitHoldsLock(); |
if (result == NULL) { |
+ // No message available at any priority. |
ml.Wait(millis); |
- result = head_; |
+ result = DequeueNoWaitHoldsLock(); |
} |
- if (result != NULL) { |
- head_ = result->next_; |
- // The following update to tail_ is not strictly needed. |
- if (head_ == NULL) { |
- tail_ = NULL; |
- } |
-#if defined(DEBUG) |
- result->next_ = result; // Make sure to trigger ASSERT in Enqueue. |
-#endif // DEBUG |
- } |
return result; |
} |
void MessageQueue::Flush(Dart_Port port) { |
MonitorLocker ml(&monitor_); |
- PortMessage* cur = head_; |
- PortMessage* prev = NULL; |
- while (cur != NULL) { |
- PortMessage* next = cur->next_; |
- // If the message matches, then remove it from the queue and delete it. |
- if (cur->dest_port() == port) { |
- if (prev != NULL) { |
- prev->next_ = next; |
+ for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
+ Message* cur = head_[p]; |
+ Message* prev = NULL; |
+ while (cur != NULL) { |
+ Message* next = cur->next_; |
+ // If the message matches, then remove it from the queue and delete it. |
+ if (cur->dest_port() == port) { |
+ if (prev != NULL) { |
+ prev->next_ = next; |
+ } else { |
+ head_[p] = next; |
+ } |
+ delete cur; |
} else { |
- head_ = next; |
+ // Move prev forward. |
+ prev = cur; |
} |
- delete cur; |
- } else { |
- // Move prev forward. |
- prev = cur; |
+ // Advance to the next message in the queue. |
+ cur = next; |
} |
- // Advance to the next message in the queue. |
- cur = next; |
+ tail_[p] = prev; |
} |
- tail_ = prev; |
} |
void MessageQueue::FlushAll() { |
MonitorLocker ml(&monitor_); |
- PortMessage* cur = head_; |
- head_ = NULL; |
- tail_ = NULL; |
- while (cur != NULL) { |
- PortMessage* next = cur->next_; |
- delete cur; |
- cur = next; |
+ for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) { |
+ Message* cur = head_[p]; |
+ head_[p] = NULL; |
+ tail_[p] = NULL; |
+ while (cur != NULL) { |
+ Message* next = cur->next_; |
+ delete cur; |
+ cur = next; |
+ } |
} |
} |