OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * libjingle |
| 3 * Copyright 2004--2005, Google Inc. |
| 4 * |
| 5 * Redistribution and use in source and binary forms, with or without |
| 6 * modification, are permitted provided that the following conditions are met: |
| 7 * |
| 8 * 1. Redistributions of source code must retain the above copyright notice, |
| 9 * this list of conditions and the following disclaimer. |
| 10 * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 11 * this list of conditions and the following disclaimer in the documentation |
| 12 * and/or other materials provided with the distribution. |
| 13 * 3. The name of the author may not be used to endorse or promote products |
| 14 * derived from this software without specific prior written permission. |
| 15 * |
| 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
| 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF |
| 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
| 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
| 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
| 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR |
| 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 26 */ |
| 27 |
| 28 #if defined(_MSC_VER) && _MSC_VER < 1300 |
| 29 #pragma warning(disable:4786) |
| 30 #endif |
| 31 |
| 32 #ifdef POSIX |
| 33 #include <sys/time.h> |
| 34 #endif |
| 35 |
| 36 #include "talk/base/common.h" |
| 37 #include "talk/base/logging.h" |
| 38 #include "talk/base/messagequeue.h" |
| 39 #include "talk/base/physicalsocketserver.h" |
| 40 |
| 41 |
| 42 namespace talk_base { |
| 43 |
| 44 const uint32 kMaxMsgLatency = 150; // 150 ms |
| 45 |
| 46 //------------------------------------------------------------------ |
| 47 // MessageQueueManager |
| 48 |
| 49 MessageQueueManager* MessageQueueManager::instance_; |
| 50 |
| 51 MessageQueueManager* MessageQueueManager::Instance() { |
| 52 // Note: This is not thread safe, but it is first called before threads are |
| 53 // spawned. |
| 54 if (!instance_) |
| 55 instance_ = new MessageQueueManager; |
| 56 return instance_; |
| 57 } |
| 58 |
| 59 MessageQueueManager::MessageQueueManager() { |
| 60 } |
| 61 |
| 62 MessageQueueManager::~MessageQueueManager() { |
| 63 } |
| 64 |
| 65 void MessageQueueManager::Add(MessageQueue *message_queue) { |
| 66 // MessageQueueManager methods should be non-reentrant, so we |
| 67 // ASSERT that is the case. If any of these ASSERT, please |
| 68 // contact bpm or jbeda. |
| 69 ASSERT(!crit_.CurrentThreadIsOwner()); |
| 70 CritScope cs(&crit_); |
| 71 message_queues_.push_back(message_queue); |
| 72 } |
| 73 |
| 74 void MessageQueueManager::Remove(MessageQueue *message_queue) { |
| 75 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
| 76 // If this is the last MessageQueue, destroy the manager as well so that |
| 77 // we don't leak this object at program shutdown. As mentioned above, this is |
| 78 // not thread-safe, but this should only happen at program termination (when |
| 79 // the ThreadManager is destroyed, and threads are no longer active). |
| 80 bool destroy = false; |
| 81 { |
| 82 CritScope cs(&crit_); |
| 83 std::vector<MessageQueue *>::iterator iter; |
| 84 iter = std::find(message_queues_.begin(), message_queues_.end(), |
| 85 message_queue); |
| 86 if (iter != message_queues_.end()) { |
| 87 message_queues_.erase(iter); |
| 88 } |
| 89 destroy = message_queues_.empty(); |
| 90 } |
| 91 if (destroy) { |
| 92 instance_ = NULL; |
| 93 delete this; |
| 94 } |
| 95 } |
| 96 |
| 97 void MessageQueueManager::Clear(MessageHandler *handler) { |
| 98 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
| 99 CritScope cs(&crit_); |
| 100 std::vector<MessageQueue *>::iterator iter; |
| 101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
| 102 (*iter)->Clear(handler); |
| 103 } |
| 104 |
| 105 //------------------------------------------------------------------ |
| 106 // MessageQueue |
| 107 |
| 108 MessageQueue::MessageQueue(SocketServer* ss) |
| 109 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), |
| 110 dmsgq_next_num_(0) { |
| 111 if (!ss_) { |
| 112 // Currently, MessageQueue holds a socket server, and is the base class for |
| 113 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 114 // server, and provide it to the MessageQueue, since the Thread controls |
| 115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 116 // messagequeue_unittest to depend on network libraries... yuck. |
| 117 default_ss_.reset(new PhysicalSocketServer()); |
| 118 ss_ = default_ss_.get(); |
| 119 } else { |
| 120 // This is only for chrome where we can't release the SocketServer. |
| 121 default_ss_.reset(ss_); |
| 122 } |
| 123 ss_->SetMessageQueue(this); |
| 124 } |
| 125 |
| 126 MessageQueue::~MessageQueue() { |
| 127 // The signal is done from here to ensure |
| 128 // that it always gets called when the queue |
| 129 // is going away. |
| 130 SignalQueueDestroyed(); |
| 131 if (active_) { |
| 132 MessageQueueManager::Instance()->Remove(this); |
| 133 Clear(NULL); |
| 134 } |
| 135 if (ss_) { |
| 136 ss_->SetMessageQueue(NULL); |
| 137 } |
| 138 } |
| 139 |
| 140 void MessageQueue::set_socketserver(SocketServer* ss) { |
| 141 ss_ = ss ? ss : default_ss_.get(); |
| 142 ss_->SetMessageQueue(this); |
| 143 } |
| 144 |
| 145 void MessageQueue::Quit() { |
| 146 fStop_ = true; |
| 147 ss_->WakeUp(); |
| 148 } |
| 149 |
| 150 bool MessageQueue::IsQuitting() { |
| 151 return fStop_; |
| 152 } |
| 153 |
| 154 void MessageQueue::Restart() { |
| 155 fStop_ = false; |
| 156 } |
| 157 |
| 158 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 159 if (fPeekKeep_) { |
| 160 *pmsg = msgPeek_; |
| 161 return true; |
| 162 } |
| 163 if (!Get(pmsg, cmsWait)) |
| 164 return false; |
| 165 msgPeek_ = *pmsg; |
| 166 fPeekKeep_ = true; |
| 167 return true; |
| 168 } |
| 169 |
| 170 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { |
| 171 // Return and clear peek if present |
| 172 // Always return the peek if it exists so there is Peek/Get symmetry |
| 173 |
| 174 if (fPeekKeep_) { |
| 175 *pmsg = msgPeek_; |
| 176 fPeekKeep_ = false; |
| 177 return true; |
| 178 } |
| 179 |
| 180 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch |
| 181 |
| 182 int cmsTotal = cmsWait; |
| 183 int cmsElapsed = 0; |
| 184 uint32 msStart = Time(); |
| 185 uint32 msCurrent = msStart; |
| 186 while (true) { |
| 187 // Check for sent messages |
| 188 |
| 189 ReceiveSends(); |
| 190 |
| 191 // Check queues |
| 192 |
| 193 int cmsDelayNext = kForever; |
| 194 { |
| 195 CritScope cs(&crit_); |
| 196 |
| 197 // Check for delayed messages that have been triggered |
| 198 // Calc the next trigger too |
| 199 |
| 200 while (!dmsgq_.empty()) { |
| 201 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { |
| 202 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); |
| 203 break; |
| 204 } |
| 205 msgq_.push_back(dmsgq_.top().msg_); |
| 206 dmsgq_.pop(); |
| 207 } |
| 208 |
| 209 // Check for posted events |
| 210 |
| 211 while (!msgq_.empty()) { |
| 212 *pmsg = msgq_.front(); |
| 213 if (pmsg->ts_sensitive) { |
| 214 long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); |
| 215 if (delay > 0) { |
| 216 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " |
| 217 << (delay + kMaxMsgLatency) << "ms"; |
| 218 } |
| 219 } |
| 220 msgq_.pop_front(); |
| 221 if (MQID_DISPOSE == pmsg->message_id) { |
| 222 ASSERT(NULL == pmsg->phandler); |
| 223 delete pmsg->pdata; |
| 224 continue; |
| 225 } |
| 226 return true; |
| 227 } |
| 228 } |
| 229 |
| 230 if (fStop_) |
| 231 break; |
| 232 |
| 233 // Which is shorter, the delay wait or the asked wait? |
| 234 |
| 235 int cmsNext; |
| 236 if (cmsWait == kForever) { |
| 237 cmsNext = cmsDelayNext; |
| 238 } else { |
| 239 cmsNext = _max(0, cmsTotal - cmsElapsed); |
| 240 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| 241 cmsNext = cmsDelayNext; |
| 242 } |
| 243 |
| 244 // Wait and multiplex in the meantime |
| 245 if (!ss_->Wait(cmsNext, process_io)) |
| 246 return false; |
| 247 |
| 248 // If the specified timeout expired, return |
| 249 |
| 250 msCurrent = Time(); |
| 251 cmsElapsed = TimeDiff(msCurrent, msStart); |
| 252 if (cmsWait != kForever) { |
| 253 if (cmsElapsed >= cmsWait) |
| 254 return false; |
| 255 } |
| 256 } |
| 257 return false; |
| 258 } |
| 259 |
| 260 void MessageQueue::ReceiveSends() { |
| 261 } |
| 262 |
| 263 void MessageQueue::Post(MessageHandler *phandler, uint32 id, |
| 264 MessageData *pdata, bool time_sensitive) { |
| 265 if (fStop_) |
| 266 return; |
| 267 |
| 268 // Keep thread safe |
| 269 // Add the message to the end of the queue |
| 270 // Signal for the multiplexer to return |
| 271 |
| 272 CritScope cs(&crit_); |
| 273 EnsureActive(); |
| 274 Message msg; |
| 275 msg.phandler = phandler; |
| 276 msg.message_id = id; |
| 277 msg.pdata = pdata; |
| 278 if (time_sensitive) { |
| 279 msg.ts_sensitive = Time() + kMaxMsgLatency; |
| 280 } |
| 281 msgq_.push_back(msg); |
| 282 ss_->WakeUp(); |
| 283 } |
| 284 |
| 285 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, |
| 286 MessageHandler *phandler, uint32 id, MessageData* pdata) { |
| 287 if (fStop_) |
| 288 return; |
| 289 |
| 290 // Keep thread safe |
| 291 // Add to the priority queue. Gets sorted soonest first. |
| 292 // Signal for the multiplexer to return. |
| 293 |
| 294 CritScope cs(&crit_); |
| 295 EnsureActive(); |
| 296 Message msg; |
| 297 msg.phandler = phandler; |
| 298 msg.message_id = id; |
| 299 msg.pdata = pdata; |
| 300 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
| 301 dmsgq_.push(dmsg); |
| 302 // If this message queue processes 1 message every millisecond for 50 days, |
| 303 // we will wrap this number. Even then, only messages with identical times |
| 304 // will be misordered, and then only briefly. This is probably ok. |
| 305 VERIFY(0 != ++dmsgq_next_num_); |
| 306 ss_->WakeUp(); |
| 307 } |
| 308 |
| 309 int MessageQueue::GetDelay() { |
| 310 CritScope cs(&crit_); |
| 311 |
| 312 if (!msgq_.empty()) |
| 313 return 0; |
| 314 |
| 315 if (!dmsgq_.empty()) { |
| 316 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
| 317 if (delay < 0) |
| 318 delay = 0; |
| 319 return delay; |
| 320 } |
| 321 |
| 322 return kForever; |
| 323 } |
| 324 |
| 325 void MessageQueue::Clear(MessageHandler *phandler, uint32 id, |
| 326 MessageList* removed) { |
| 327 CritScope cs(&crit_); |
| 328 |
| 329 // Remove messages with phandler |
| 330 |
| 331 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { |
| 332 if (removed) { |
| 333 removed->push_back(msgPeek_); |
| 334 } else { |
| 335 delete msgPeek_.pdata; |
| 336 } |
| 337 fPeekKeep_ = false; |
| 338 } |
| 339 |
| 340 // Remove from ordered message queue |
| 341 |
| 342 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { |
| 343 if (it->Match(phandler, id)) { |
| 344 if (removed) { |
| 345 removed->push_back(*it); |
| 346 } else { |
| 347 delete it->pdata; |
| 348 } |
| 349 it = msgq_.erase(it); |
| 350 } else { |
| 351 ++it; |
| 352 } |
| 353 } |
| 354 |
| 355 // Remove from priority queue. Not directly iterable, so use this approach |
| 356 |
| 357 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); |
| 358 for (PriorityQueue::container_type::iterator it = new_end; |
| 359 it != dmsgq_.container().end(); ++it) { |
| 360 if (it->msg_.Match(phandler, id)) { |
| 361 if (removed) { |
| 362 removed->push_back(it->msg_); |
| 363 } else { |
| 364 delete it->msg_.pdata; |
| 365 } |
| 366 } else { |
| 367 *new_end++ = *it; |
| 368 } |
| 369 } |
| 370 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| 371 dmsgq_.reheap(); |
| 372 } |
| 373 |
| 374 void MessageQueue::Dispatch(Message *pmsg) { |
| 375 pmsg->phandler->OnMessage(pmsg); |
| 376 } |
| 377 |
| 378 void MessageQueue::EnsureActive() { |
| 379 ASSERT(crit_.CurrentThreadIsOwner()); |
| 380 if (!active_) { |
| 381 active_ = true; |
| 382 MessageQueueManager::Instance()->Add(this); |
| 383 } |
| 384 } |
| 385 |
| 386 } // namespace talk_base |
OLD | NEW |