Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(776)

Side by Side Diff: third_party/libjingle/overrides/talk/base/messagequeue.cc

Issue 9455070: Remove the dependency to ws2_32.dll from talk_base::ThreadManager and talk_base::Thread. (Closed) Base URL: https://src.chromium.org/svn/trunk/src/
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 /*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #if defined(_MSC_VER) && _MSC_VER < 1300
29 #pragma warning(disable:4786)
30 #endif
31
32 #ifdef POSIX
33 #include <sys/time.h>
34 #endif
35
36 #include "talk/base/common.h"
37 #include "talk/base/logging.h"
38 #include "talk/base/messagequeue.h"
39 #include "talk/base/physicalsocketserver.h"
40
41
42 namespace talk_base {
43
44 const uint32 kMaxMsgLatency = 150; // 150 ms
45
46 //------------------------------------------------------------------
47 // MessageQueueManager
48
49 MessageQueueManager* MessageQueueManager::instance_;
50
51 MessageQueueManager* MessageQueueManager::Instance() {
52 // Note: This is not thread safe, but it is first called before threads are
53 // spawned.
54 if (!instance_)
55 instance_ = new MessageQueueManager;
56 return instance_;
57 }
58
59 MessageQueueManager::MessageQueueManager() {
60 }
61
62 MessageQueueManager::~MessageQueueManager() {
63 }
64
65 void MessageQueueManager::Add(MessageQueue *message_queue) {
66 // MessageQueueManager methods should be non-reentrant, so we
67 // ASSERT that is the case. If any of these ASSERT, please
68 // contact bpm or jbeda.
69 ASSERT(!crit_.CurrentThreadIsOwner());
70 CritScope cs(&crit_);
71 message_queues_.push_back(message_queue);
72 }
73
74 void MessageQueueManager::Remove(MessageQueue *message_queue) {
75 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
76 // If this is the last MessageQueue, destroy the manager as well so that
77 // we don't leak this object at program shutdown. As mentioned above, this is
78 // not thread-safe, but this should only happen at program termination (when
79 // the ThreadManager is destroyed, and threads are no longer active).
80 bool destroy = false;
81 {
82 CritScope cs(&crit_);
83 std::vector<MessageQueue *>::iterator iter;
84 iter = std::find(message_queues_.begin(), message_queues_.end(),
85 message_queue);
86 if (iter != message_queues_.end()) {
87 message_queues_.erase(iter);
88 }
89 destroy = message_queues_.empty();
90 }
91 if (destroy) {
92 instance_ = NULL;
93 delete this;
94 }
95 }
96
97 void MessageQueueManager::Clear(MessageHandler *handler) {
98 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
99 CritScope cs(&crit_);
100 std::vector<MessageQueue *>::iterator iter;
101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
102 (*iter)->Clear(handler);
103 }
104
105 //------------------------------------------------------------------
106 // MessageQueue
107
108 MessageQueue::MessageQueue(SocketServer* ss)
109 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
110 dmsgq_next_num_(0) {
111 if (!ss_) {
112 // Currently, MessageQueue holds a socket server, and is the base class for
113 // Thread. It seems like it makes more sense for Thread to hold the socket
114 // server, and provide it to the MessageQueue, since the Thread controls
115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
116 // messagequeue_unittest to depend on network libraries... yuck.
117 default_ss_.reset(new PhysicalSocketServer());
118 ss_ = default_ss_.get();
119 } else {
120 // This is only for chrome where we can't release the SocketServer.
121 default_ss_.reset(ss_);
Ronghua Wu (Left Chromium) 2012/02/24 23:39:16 The Change is between L119-L121
122 }
123 ss_->SetMessageQueue(this);
124 }
125
126 MessageQueue::~MessageQueue() {
127 // The signal is done from here to ensure
128 // that it always gets called when the queue
129 // is going away.
130 SignalQueueDestroyed();
131 if (active_) {
132 MessageQueueManager::Instance()->Remove(this);
133 Clear(NULL);
134 }
135 if (ss_) {
136 ss_->SetMessageQueue(NULL);
137 }
138 }
139
140 void MessageQueue::set_socketserver(SocketServer* ss) {
141 ss_ = ss ? ss : default_ss_.get();
142 ss_->SetMessageQueue(this);
143 }
144
145 void MessageQueue::Quit() {
146 fStop_ = true;
147 ss_->WakeUp();
148 }
149
150 bool MessageQueue::IsQuitting() {
151 return fStop_;
152 }
153
154 void MessageQueue::Restart() {
155 fStop_ = false;
156 }
157
158 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
159 if (fPeekKeep_) {
160 *pmsg = msgPeek_;
161 return true;
162 }
163 if (!Get(pmsg, cmsWait))
164 return false;
165 msgPeek_ = *pmsg;
166 fPeekKeep_ = true;
167 return true;
168 }
169
170 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
171 // Return and clear peek if present
172 // Always return the peek if it exists so there is Peek/Get symmetry
173
174 if (fPeekKeep_) {
175 *pmsg = msgPeek_;
176 fPeekKeep_ = false;
177 return true;
178 }
179
180 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
181
182 int cmsTotal = cmsWait;
183 int cmsElapsed = 0;
184 uint32 msStart = Time();
185 uint32 msCurrent = msStart;
186 while (true) {
187 // Check for sent messages
188
189 ReceiveSends();
190
191 // Check queues
192
193 int cmsDelayNext = kForever;
194 {
195 CritScope cs(&crit_);
196
197 // Check for delayed messages that have been triggered
198 // Calc the next trigger too
199
200 while (!dmsgq_.empty()) {
201 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
202 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
203 break;
204 }
205 msgq_.push_back(dmsgq_.top().msg_);
206 dmsgq_.pop();
207 }
208
209 // Check for posted events
210
211 while (!msgq_.empty()) {
212 *pmsg = msgq_.front();
213 if (pmsg->ts_sensitive) {
214 long delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
215 if (delay > 0) {
216 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
217 << (delay + kMaxMsgLatency) << "ms";
218 }
219 }
220 msgq_.pop_front();
221 if (MQID_DISPOSE == pmsg->message_id) {
222 ASSERT(NULL == pmsg->phandler);
223 delete pmsg->pdata;
224 continue;
225 }
226 return true;
227 }
228 }
229
230 if (fStop_)
231 break;
232
233 // Which is shorter, the delay wait or the asked wait?
234
235 int cmsNext;
236 if (cmsWait == kForever) {
237 cmsNext = cmsDelayNext;
238 } else {
239 cmsNext = _max(0, cmsTotal - cmsElapsed);
240 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
241 cmsNext = cmsDelayNext;
242 }
243
244 // Wait and multiplex in the meantime
245 if (!ss_->Wait(cmsNext, process_io))
246 return false;
247
248 // If the specified timeout expired, return
249
250 msCurrent = Time();
251 cmsElapsed = TimeDiff(msCurrent, msStart);
252 if (cmsWait != kForever) {
253 if (cmsElapsed >= cmsWait)
254 return false;
255 }
256 }
257 return false;
258 }
259
260 void MessageQueue::ReceiveSends() {
261 }
262
263 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
264 MessageData *pdata, bool time_sensitive) {
265 if (fStop_)
266 return;
267
268 // Keep thread safe
269 // Add the message to the end of the queue
270 // Signal for the multiplexer to return
271
272 CritScope cs(&crit_);
273 EnsureActive();
274 Message msg;
275 msg.phandler = phandler;
276 msg.message_id = id;
277 msg.pdata = pdata;
278 if (time_sensitive) {
279 msg.ts_sensitive = Time() + kMaxMsgLatency;
280 }
281 msgq_.push_back(msg);
282 ss_->WakeUp();
283 }
284
285 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
286 MessageHandler *phandler, uint32 id, MessageData* pdata) {
287 if (fStop_)
288 return;
289
290 // Keep thread safe
291 // Add to the priority queue. Gets sorted soonest first.
292 // Signal for the multiplexer to return.
293
294 CritScope cs(&crit_);
295 EnsureActive();
296 Message msg;
297 msg.phandler = phandler;
298 msg.message_id = id;
299 msg.pdata = pdata;
300 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
301 dmsgq_.push(dmsg);
302 // If this message queue processes 1 message every millisecond for 50 days,
303 // we will wrap this number. Even then, only messages with identical times
304 // will be misordered, and then only briefly. This is probably ok.
305 VERIFY(0 != ++dmsgq_next_num_);
306 ss_->WakeUp();
307 }
308
309 int MessageQueue::GetDelay() {
310 CritScope cs(&crit_);
311
312 if (!msgq_.empty())
313 return 0;
314
315 if (!dmsgq_.empty()) {
316 int delay = TimeUntil(dmsgq_.top().msTrigger_);
317 if (delay < 0)
318 delay = 0;
319 return delay;
320 }
321
322 return kForever;
323 }
324
325 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
326 MessageList* removed) {
327 CritScope cs(&crit_);
328
329 // Remove messages with phandler
330
331 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
332 if (removed) {
333 removed->push_back(msgPeek_);
334 } else {
335 delete msgPeek_.pdata;
336 }
337 fPeekKeep_ = false;
338 }
339
340 // Remove from ordered message queue
341
342 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
343 if (it->Match(phandler, id)) {
344 if (removed) {
345 removed->push_back(*it);
346 } else {
347 delete it->pdata;
348 }
349 it = msgq_.erase(it);
350 } else {
351 ++it;
352 }
353 }
354
355 // Remove from priority queue. Not directly iterable, so use this approach
356
357 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
358 for (PriorityQueue::container_type::iterator it = new_end;
359 it != dmsgq_.container().end(); ++it) {
360 if (it->msg_.Match(phandler, id)) {
361 if (removed) {
362 removed->push_back(it->msg_);
363 } else {
364 delete it->msg_.pdata;
365 }
366 } else {
367 *new_end++ = *it;
368 }
369 }
370 dmsgq_.container().erase(new_end, dmsgq_.container().end());
371 dmsgq_.reheap();
372 }
373
374 void MessageQueue::Dispatch(Message *pmsg) {
375 pmsg->phandler->OnMessage(pmsg);
376 }
377
378 void MessageQueue::EnsureActive() {
379 ASSERT(crit_.CurrentThreadIsOwner());
380 if (!active_) {
381 active_ = true;
382 MessageQueueManager::Instance()->Add(this);
383 }
384 }
385
386 } // namespace talk_base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698