Chromium Code Reviews| Index: third_party/libjingle/overrides/talk/base/messagequeue.cc |
| =================================================================== |
| --- third_party/libjingle/overrides/talk/base/messagequeue.cc (revision 0) |
| +++ third_party/libjingle/overrides/talk/base/messagequeue.cc (revision 0) |
| @@ -0,0 +1,386 @@ |
| +/* |
| + * libjingle |
| + * Copyright 2004--2005, Google Inc. |
| + * |
| + * Redistribution and use in source and binary forms, with or without |
| + * modification, are permitted provided that the following conditions are met: |
| + * |
| + * 1. Redistributions of source code must retain the above copyright notice, |
| + * this list of conditions and the following disclaimer. |
| + * 2. Redistributions in binary form must reproduce the above copyright notice, |
| + * this list of conditions and the following disclaimer in the documentation |
| + * and/or other materials provided with the distribution. |
| + * 3. The name of the author may not be used to endorse or promote products |
| + * derived from this software without specific prior written permission. |
| + * |
| + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
| + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
| + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
| + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
| + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
| + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR |
| + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| + */ |
| + |
| +#if defined(_MSC_VER) && _MSC_VER < 1300 |
| +#pragma warning(disable:4786) |
| +#endif |
| + |
| +#ifdef POSIX |
| +#include <sys/time.h> |
| +#endif |
| + |
| +#include "talk/base/common.h" |
| +#include "talk/base/logging.h" |
| +#include "talk/base/messagequeue.h" |
| +#include "talk/base/physicalsocketserver.h" |
| + |
| + |
| +namespace talk_base { |
| + |
| +const uint32 kMaxMsgLatency = 150; // 150 ms |
| + |
| +//------------------------------------------------------------------ |
| +// MessageQueueManager |
| + |
| +MessageQueueManager* MessageQueueManager::instance_; |
| + |
| +MessageQueueManager* MessageQueueManager::Instance() { |
| + // Note: This is not thread safe, but it is first called before threads are |
| + // spawned. |
| + if (!instance_) |
| + instance_ = new MessageQueueManager; |
| + return instance_; |
| +} |
| + |
| +MessageQueueManager::MessageQueueManager() { |
| +} |
| + |
| +MessageQueueManager::~MessageQueueManager() { |
| +} |
| + |
| +void MessageQueueManager::Add(MessageQueue *message_queue) { |
| + // MessageQueueManager methods should be non-reentrant, so we |
| + // ASSERT that is the case. If any of these ASSERT, please |
| + // contact bpm or jbeda. |
| + ASSERT(!crit_.CurrentThreadIsOwner()); |
| + CritScope cs(&crit_); |
| + message_queues_.push_back(message_queue); |
| +} |
| + |
| +void MessageQueueManager::Remove(MessageQueue *message_queue) { |
| + ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
| + // If this is the last MessageQueue, destroy the manager as well so that |
| + // we don't leak this object at program shutdown. As mentioned above, this is |
| + // not thread-safe, but this should only happen at program termination (when |
| + // the ThreadManager is destroyed, and threads are no longer active). |
| + bool destroy = false; |
| + { |
| + CritScope cs(&crit_); |
| + std::vector<MessageQueue *>::iterator iter; |
| + iter = std::find(message_queues_.begin(), message_queues_.end(), |
| + message_queue); |
| + if (iter != message_queues_.end()) { |
| + message_queues_.erase(iter); |
| + } |
| + destroy = message_queues_.empty(); |
| + } |
| + if (destroy) { |
| + instance_ = NULL; |
| + delete this; |
| + } |
| +} |
| + |
| +void MessageQueueManager::Clear(MessageHandler *handler) { |
| + ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
| + CritScope cs(&crit_); |
| + std::vector<MessageQueue *>::iterator iter; |
| + for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
| + (*iter)->Clear(handler); |
| +} |
| + |
| +//------------------------------------------------------------------ |
| +// MessageQueue |
| + |
| +MessageQueue::MessageQueue(SocketServer* ss) |
| + : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), |
| + dmsgq_next_num_(0) { |
| + if (!ss_) { |
| + // Currently, MessageQueue holds a socket server, and is the base class for |
| + // Thread. It seems like it makes more sense for Thread to hold the socket |
| + // server, and provide it to the MessageQueue, since the Thread controls |
| + // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| + // messagequeue_unittest to depend on network libraries... yuck. |
| + default_ss_.reset(new PhysicalSocketServer()); |
| + ss_ = default_ss_.get(); |
| + } else { |
| + // This is only for chrome where we can't release the SocketServer. |
| + default_ss_.reset(ss_); |
|
Ronghua Wu (Left Chromium)
2012/02/24 23:39:16
The Change is between L119-L121
|
| + } |
| + ss_->SetMessageQueue(this); |
| +} |
| + |
| +MessageQueue::~MessageQueue() { |
| + // The signal is done from here to ensure |
| + // that it always gets called when the queue |
| + // is going away. |
| + SignalQueueDestroyed(); |
| + if (active_) { |
| + MessageQueueManager::Instance()->Remove(this); |
| + Clear(NULL); |
| + } |
| + if (ss_) { |
| + ss_->SetMessageQueue(NULL); |
| + } |
| +} |
| + |
| +void MessageQueue::set_socketserver(SocketServer* ss) { |
| + ss_ = ss ? ss : default_ss_.get(); |
| + ss_->SetMessageQueue(this); |
| +} |
| + |
| +void MessageQueue::Quit() { |
| + fStop_ = true; |
| + ss_->WakeUp(); |
| +} |
| + |
| +bool MessageQueue::IsQuitting() { |
| + return fStop_; |
| +} |
| + |
| +void MessageQueue::Restart() { |
| + fStop_ = false; |
| +} |
| + |
| +bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| + if (fPeekKeep_) { |
| + *pmsg = msgPeek_; |
| + return true; |
| + } |
| + if (!Get(pmsg, cmsWait)) |
| + return false; |
| + msgPeek_ = *pmsg; |
| + fPeekKeep_ = true; |
| + return true; |
| +} |
| + |
| +bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { |
| + // Return and clear peek if present |
| + // Always return the peek if it exists so there is Peek/Get symmetry |
| + |
| + if (fPeekKeep_) { |
| + *pmsg = msgPeek_; |
| + fPeekKeep_ = false; |
| + return true; |
| + } |
| + |
| + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch |
| + |
| + int cmsTotal = cmsWait; |
| + int cmsElapsed = 0; |
| + uint32 msStart = Time(); |
| + uint32 msCurrent = msStart; |
| + while (true) { |
| + // Check for sent messages |
| + |
| + ReceiveSends(); |
| + |
| + // Check queues |
| + |
| + int cmsDelayNext = kForever; |
| + { |
| + CritScope cs(&crit_); |
| + |
| + // Check for delayed messages that have been triggered |
| + // Calc the next trigger too |
| + |
| + while (!dmsgq_.empty()) { |
| + if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { |
| + cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); |
| + break; |
| + } |
| + msgq_.push_back(dmsgq_.top().msg_); |
| + dmsgq_.pop(); |
| + } |
| + |
| + // Check for posted events |
| + |
| + while (!msgq_.empty()) { |
| + *pmsg = msgq_.front(); |
| + if (pmsg->ts_sensitive) { |
| + long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); |
| + if (delay > 0) { |
| + LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " |
| + << (delay + kMaxMsgLatency) << "ms"; |
| + } |
| + } |
| + msgq_.pop_front(); |
| + if (MQID_DISPOSE == pmsg->message_id) { |
| + ASSERT(NULL == pmsg->phandler); |
| + delete pmsg->pdata; |
| + continue; |
| + } |
| + return true; |
| + } |
| + } |
| + |
| + if (fStop_) |
| + break; |
| + |
| + // Which is shorter, the delay wait or the asked wait? |
| + |
| + int cmsNext; |
| + if (cmsWait == kForever) { |
| + cmsNext = cmsDelayNext; |
| + } else { |
| + cmsNext = _max(0, cmsTotal - cmsElapsed); |
| + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| + cmsNext = cmsDelayNext; |
| + } |
| + |
| + // Wait and multiplex in the meantime |
| + if (!ss_->Wait(cmsNext, process_io)) |
| + return false; |
| + |
| + // If the specified timeout expired, return |
| + |
| + msCurrent = Time(); |
| + cmsElapsed = TimeDiff(msCurrent, msStart); |
| + if (cmsWait != kForever) { |
| + if (cmsElapsed >= cmsWait) |
| + return false; |
| + } |
| + } |
| + return false; |
| +} |
| + |
| +void MessageQueue::ReceiveSends() { |
| +} |
| + |
| +void MessageQueue::Post(MessageHandler *phandler, uint32 id, |
| + MessageData *pdata, bool time_sensitive) { |
| + if (fStop_) |
| + return; |
| + |
| + // Keep thread safe |
| + // Add the message to the end of the queue |
| + // Signal for the multiplexer to return |
| + |
| + CritScope cs(&crit_); |
| + EnsureActive(); |
| + Message msg; |
| + msg.phandler = phandler; |
| + msg.message_id = id; |
| + msg.pdata = pdata; |
| + if (time_sensitive) { |
| + msg.ts_sensitive = Time() + kMaxMsgLatency; |
| + } |
| + msgq_.push_back(msg); |
| + ss_->WakeUp(); |
| +} |
| + |
| +void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, |
| + MessageHandler *phandler, uint32 id, MessageData* pdata) { |
| + if (fStop_) |
| + return; |
| + |
| + // Keep thread safe |
| + // Add to the priority queue. Gets sorted soonest first. |
| + // Signal for the multiplexer to return. |
| + |
| + CritScope cs(&crit_); |
| + EnsureActive(); |
| + Message msg; |
| + msg.phandler = phandler; |
| + msg.message_id = id; |
| + msg.pdata = pdata; |
| + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
| + dmsgq_.push(dmsg); |
| + // If this message queue processes 1 message every millisecond for 50 days, |
| + // we will wrap this number. Even then, only messages with identical times |
| + // will be misordered, and then only briefly. This is probably ok. |
| + VERIFY(0 != ++dmsgq_next_num_); |
| + ss_->WakeUp(); |
| +} |
| + |
| +int MessageQueue::GetDelay() { |
| + CritScope cs(&crit_); |
| + |
| + if (!msgq_.empty()) |
| + return 0; |
| + |
| + if (!dmsgq_.empty()) { |
| + int delay = TimeUntil(dmsgq_.top().msTrigger_); |
| + if (delay < 0) |
| + delay = 0; |
| + return delay; |
| + } |
| + |
| + return kForever; |
| +} |
| + |
| +void MessageQueue::Clear(MessageHandler *phandler, uint32 id, |
| + MessageList* removed) { |
| + CritScope cs(&crit_); |
| + |
| + // Remove messages with phandler |
| + |
| + if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { |
| + if (removed) { |
| + removed->push_back(msgPeek_); |
| + } else { |
| + delete msgPeek_.pdata; |
| + } |
| + fPeekKeep_ = false; |
| + } |
| + |
| + // Remove from ordered message queue |
| + |
| + for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { |
| + if (it->Match(phandler, id)) { |
| + if (removed) { |
| + removed->push_back(*it); |
| + } else { |
| + delete it->pdata; |
| + } |
| + it = msgq_.erase(it); |
| + } else { |
| + ++it; |
| + } |
| + } |
| + |
| + // Remove from priority queue. Not directly iterable, so use this approach |
| + |
| + PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); |
| + for (PriorityQueue::container_type::iterator it = new_end; |
| + it != dmsgq_.container().end(); ++it) { |
| + if (it->msg_.Match(phandler, id)) { |
| + if (removed) { |
| + removed->push_back(it->msg_); |
| + } else { |
| + delete it->msg_.pdata; |
| + } |
| + } else { |
| + *new_end++ = *it; |
| + } |
| + } |
| + dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| + dmsgq_.reheap(); |
| +} |
| + |
| +void MessageQueue::Dispatch(Message *pmsg) { |
| + pmsg->phandler->OnMessage(pmsg); |
| +} |
| + |
| +void MessageQueue::EnsureActive() { |
| + ASSERT(crit_.CurrentThreadIsOwner()); |
| + if (!active_) { |
| + active_ = true; |
| + MessageQueueManager::Instance()->Add(this); |
| + } |
| +} |
| + |
| +} // namespace talk_base |
| Property changes on: third_party\libjingle\overrides\talk\base\messagequeue.cc |
| ___________________________________________________________________ |
| Added: svn:eol-style |
| + LF |