| Index: runtime/vm/message_queue.cc
|
| ===================================================================
|
| --- runtime/vm/message_queue.cc (revision 3557)
|
| +++ runtime/vm/message_queue.cc (working copy)
|
| @@ -6,93 +6,127 @@
|
|
|
| namespace dart {
|
|
|
| +MessageQueue::MessageQueue() {
|
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| + head_[p] = NULL;
|
| + tail_[p] = NULL;
|
| + }
|
| +}
|
| +
|
| +
|
| MessageQueue::~MessageQueue() {
|
| // Ensure that all pending messages have been released.
|
| - ASSERT(head_ == NULL);
|
| +#if defined(DEBUG)
|
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| + ASSERT(head_[p] == NULL);
|
| + }
|
| +#endif
|
| }
|
|
|
|
|
| -void MessageQueue::Enqueue(PortMessage* msg) {
|
| - // TODO(turnidge): Can't use MonitorLocker here because
|
| - // MonitorLocker is a StackResource, which requires a current
|
| - // isolate. Should MonitorLocker really be a StackResource?
|
| +void MessageQueue::Enqueue(Message* msg) {
|
| + // TODO(turnidge): Add a scoped locker for monitors which is not a
|
| + // stack resource. This would probably be useful in the platform
|
| + // headers.
|
| monitor_.Enter();
|
| +
|
| + Message::Priority p = msg->priority();
|
| // Make sure messages are not reused.
|
| ASSERT(msg->next_ == NULL);
|
| - if (head_ == NULL) {
|
| + if (head_[p] == NULL) {
|
| // Only element in the queue.
|
| - head_ = msg;
|
| - tail_ = msg;
|
| + head_[p] = msg;
|
| + tail_[p] = msg;
|
|
|
| // We only need to notify if the queue was empty.
|
| monitor_.Notify();
|
| } else {
|
| - ASSERT(tail_ != NULL);
|
| + ASSERT(tail_[p] != NULL);
|
| // Append at the tail.
|
| - tail_->next_ = msg;
|
| - tail_ = msg;
|
| + tail_[p]->next_ = msg;
|
| + tail_[p] = msg;
|
| }
|
|
|
| monitor_.Exit();
|
| }
|
|
|
| +Message* MessageQueue::DequeueNoWait() {
|
| + MonitorLocker ml(&monitor_);
|
| + return DequeueNoWaitHoldsLock();
|
| +}
|
|
|
| -PortMessage* MessageQueue::Dequeue(int64_t millis) {
|
| +Message* MessageQueue::DequeueNoWaitHoldsLock() {
|
| + // Look for the highest priority available message.
|
| + for (int p = Message::kNumPriorities-1; p >= Message::kFirstPriority; p--) {
|
| + Message* result = head_[p];
|
| + if (result != NULL) {
|
| + head_[p] = result->next_;
|
| + // The following update to tail_ is not strictly needed.
|
| + if (head_[p] == NULL) {
|
| + tail_[p] = NULL;
|
| + }
|
| +#if defined(DEBUG)
|
| + result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
|
| +#endif // DEBUG
|
| + return result;
|
| + }
|
| + }
|
| + return NULL;
|
| +}
|
| +
|
| +
|
| +Message* MessageQueue::Dequeue(int64_t millis) {
|
| ASSERT(millis >= 0);
|
| MonitorLocker ml(&monitor_);
|
| - PortMessage* result = head_;
|
| +
|
| + Message* result = DequeueNoWaitHoldsLock();
|
| if (result == NULL) {
|
| + // No message available at any priority.
|
| ml.Wait(millis);
|
| - result = head_;
|
| + result = DequeueNoWaitHoldsLock();
|
| }
|
| - if (result != NULL) {
|
| - head_ = result->next_;
|
| - // The following update to tail_ is not strictly needed.
|
| - if (head_ == NULL) {
|
| - tail_ = NULL;
|
| - }
|
| -#if defined(DEBUG)
|
| - result->next_ = result; // Make sure to trigger ASSERT in Enqueue.
|
| -#endif // DEBUG
|
| - }
|
| return result;
|
| }
|
|
|
|
|
| void MessageQueue::Flush(Dart_Port port) {
|
| MonitorLocker ml(&monitor_);
|
| - PortMessage* cur = head_;
|
| - PortMessage* prev = NULL;
|
| - while (cur != NULL) {
|
| - PortMessage* next = cur->next_;
|
| - // If the message matches, then remove it from the queue and delete it.
|
| - if (cur->dest_port() == port) {
|
| - if (prev != NULL) {
|
| - prev->next_ = next;
|
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| + Message* cur = head_[p];
|
| + Message* prev = NULL;
|
| + while (cur != NULL) {
|
| + Message* next = cur->next_;
|
| + // If the message matches, then remove it from the queue and delete it.
|
| + if (cur->dest_port() == port) {
|
| + if (prev != NULL) {
|
| + prev->next_ = next;
|
| + } else {
|
| + head_[p] = next;
|
| + }
|
| + delete cur;
|
| } else {
|
| - head_ = next;
|
| + // Move prev forward.
|
| + prev = cur;
|
| }
|
| - delete cur;
|
| - } else {
|
| - // Move prev forward.
|
| - prev = cur;
|
| + // Advance to the next message in the queue.
|
| + cur = next;
|
| }
|
| - // Advance to the next message in the queue.
|
| - cur = next;
|
| + tail_[p] = prev;
|
| }
|
| - tail_ = prev;
|
| }
|
|
|
|
|
| void MessageQueue::FlushAll() {
|
| MonitorLocker ml(&monitor_);
|
| - PortMessage* cur = head_;
|
| - head_ = NULL;
|
| - tail_ = NULL;
|
| - while (cur != NULL) {
|
| - PortMessage* next = cur->next_;
|
| - delete cur;
|
| - cur = next;
|
| + for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
|
| + Message* cur = head_[p];
|
| + head_[p] = NULL;
|
| + tail_[p] = NULL;
|
| + while (cur != NULL) {
|
| + Message* next = cur->next_;
|
| + delete cur;
|
| + cur = next;
|
| + }
|
| }
|
| }
|
|
|
|
|