Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: jingle/glue/thread_wrapper.cc

Issue 10823224: Update JingleThreadWrapper to allow it to be created using task runner. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « jingle/glue/thread_wrapper.h ('k') | jingle/glue/thread_wrapper_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "jingle/glue/thread_wrapper.h" 5 #include "jingle/glue/thread_wrapper.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/bind_helpers.h" 8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h" 9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h" 10 #include "base/threading/thread_local.h"
(...skipping 11 matching lines...) Expand all
22 22
23 JingleThreadWrapper* sending_thread; 23 JingleThreadWrapper* sending_thread;
24 talk_base::Message message; 24 talk_base::Message message;
25 base::WaitableEvent done_event; 25 base::WaitableEvent done_event;
26 }; 26 };
27 27
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > 28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; 29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30 30
31 // static 31 // static
32 void JingleThreadWrapper::EnsureForCurrentThread() { 32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == NULL) { 33 if (JingleThreadWrapper::current() == NULL) {
34 g_jingle_thread_wrapper.Get().Set( 34 MessageLoop* message_loop = MessageLoop::current();
35 new JingleThreadWrapper(MessageLoop::current())); 35 g_jingle_thread_wrapper.Get().Set(new JingleThreadWrapper(
36 message_loop->message_loop_proxy()));
37 message_loop->AddDestructionObserver(current());
36 } 38 }
37 39
38 DCHECK_EQ(talk_base::Thread::Current(), current()); 40 DCHECK_EQ(talk_base::Thread::Current(), current());
39 } 41 }
40 42
41 // static 43 // static
42 JingleThreadWrapper* JingleThreadWrapper::current() { 44 JingleThreadWrapper* JingleThreadWrapper::current() {
43 return g_jingle_thread_wrapper.Get().Get(); 45 return g_jingle_thread_wrapper.Get().Get();
44 } 46 }
45 47
46 JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) 48 JingleThreadWrapper::JingleThreadWrapper(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
47 : talk_base::Thread(new talk_base::NullSocketServer()), 50 : talk_base::Thread(new talk_base::NullSocketServer()),
48 message_loop_(message_loop), 51 task_runner_(task_runner),
49 send_allowed_(false), 52 send_allowed_(false),
50 last_task_id_(0), 53 last_task_id_(0),
51 pending_send_event_(true, false) { 54 pending_send_event_(true, false),
52 DCHECK_EQ(message_loop_, MessageLoop::current()); 55 weak_ptr_factory_(this),
53 56 weak_ptr_(weak_ptr_factory_.GetWeakPtr()) {
54 talk_base::ThreadManager::Instance()->UnwrapCurrentThread(); 57 DCHECK(task_runner->BelongsToCurrentThread());
55 talk_base::ThreadManager::Instance()->SetCurrentThread(this); 58 DCHECK(!talk_base::Thread::Current());
56 talk_base::MessageQueueManager::Instance()->Add(this); 59 talk_base::MessageQueueManager::Instance()->Add(this);
57 message_loop_->AddDestructionObserver(this);
58
59 WrapCurrent(); 60 WrapCurrent();
60 } 61 }
61 62
62 JingleThreadWrapper::~JingleThreadWrapper() { 63 JingleThreadWrapper::~JingleThreadWrapper() {
63 Clear(NULL, talk_base::MQID_ANY, NULL); 64 Clear(NULL, talk_base::MQID_ANY, NULL);
64 } 65 }
65 66
66 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { 67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
67 DCHECK_EQ(talk_base::Thread::Current(), current()); 68 DCHECK_EQ(talk_base::Thread::Current(), current());
68 UnwrapCurrent(); 69 UnwrapCurrent();
69 g_jingle_thread_wrapper.Get().Set(NULL); 70 g_jingle_thread_wrapper.Get().Set(NULL);
70 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL); 71 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
71 talk_base::MessageQueueManager::Instance()->Remove(this); 72 talk_base::MessageQueueManager::Instance()->Remove(this);
72 message_loop_->RemoveDestructionObserver(this);
73 talk_base::SocketServer* ss = socketserver(); 73 talk_base::SocketServer* ss = socketserver();
74 delete this; 74 delete this;
75 delete ss; 75 delete ss;
76 } 76 }
77 77
78 void JingleThreadWrapper::Post( 78 void JingleThreadWrapper::Post(
79 talk_base::MessageHandler* handler, uint32 message_id, 79 talk_base::MessageHandler* handler, uint32 message_id,
80 talk_base::MessageData* data, bool time_sensitive) { 80 talk_base::MessageData* data, bool time_sensitive) {
81 PostTaskInternal(0, handler, message_id, data); 81 PostTaskInternal(0, handler, message_id, data);
82 } 82 }
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
155 155
156 PendingSend pending_send(message); 156 PendingSend pending_send(message);
157 { 157 {
158 base::AutoLock auto_lock(lock_); 158 base::AutoLock auto_lock(lock_);
159 pending_send_messages_.push_back(&pending_send); 159 pending_send_messages_.push_back(&pending_send);
160 } 160 }
161 161
162 // Need to signal |pending_send_event_| here in case the thread is 162 // Need to signal |pending_send_event_| here in case the thread is
163 // sending message to another thread. 163 // sending message to another thread.
164 pending_send_event_.Signal(); 164 pending_send_event_.Signal();
165 message_loop_->PostTask(FROM_HERE, 165 task_runner_->PostTask(FROM_HERE,
166 base::Bind(&JingleThreadWrapper::ProcessPendingSends, 166 base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167 base::Unretained(this))); 167 weak_ptr_));
168 168
169 169
170 while (!pending_send.done_event.IsSignaled()) { 170 while (!pending_send.done_event.IsSignaled()) {
171 base::WaitableEvent* events[] = {&pending_send.done_event, 171 base::WaitableEvent* events[] = {&pending_send.done_event,
172 &current_thread->pending_send_event_}; 172 &current_thread->pending_send_event_};
173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); 173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174 DCHECK(event == 0 || event == 1); 174 DCHECK(event == 0 || event == 1);
175 175
176 if (event == 1) 176 if (event == 1)
177 current_thread->ProcessPendingSends(); 177 current_thread->ProcessPendingSends();
(...skipping 29 matching lines...) Expand all
207 message.phandler = handler; 207 message.phandler = handler;
208 message.message_id = message_id; 208 message.message_id = message_id;
209 message.pdata = data; 209 message.pdata = data;
210 { 210 {
211 base::AutoLock auto_lock(lock_); 211 base::AutoLock auto_lock(lock_);
212 task_id = ++last_task_id_; 212 task_id = ++last_task_id_;
213 messages_.insert(std::pair<int, talk_base::Message>(task_id, message)); 213 messages_.insert(std::pair<int, talk_base::Message>(task_id, message));
214 } 214 }
215 215
216 if (delay_ms <= 0) { 216 if (delay_ms <= 0) {
217 message_loop_->PostTask(FROM_HERE, 217 task_runner_->PostTask(FROM_HERE,
218 base::Bind(&JingleThreadWrapper::RunTask, 218 base::Bind(&JingleThreadWrapper::RunTask,
219 base::Unretained(this), task_id)); 219 weak_ptr_, task_id));
220 } else { 220 } else {
221 message_loop_->PostDelayedTask(FROM_HERE, 221 task_runner_->PostDelayedTask(FROM_HERE,
222 base::Bind(&JingleThreadWrapper::RunTask, 222 base::Bind(&JingleThreadWrapper::RunTask,
223 base::Unretained(this), task_id), 223 weak_ptr_, task_id),
224 base::TimeDelta::FromMilliseconds(delay_ms)); 224 base::TimeDelta::FromMilliseconds(delay_ms));
225 } 225 }
226 } 226 }
227 227
228 void JingleThreadWrapper::RunTask(int task_id) { 228 void JingleThreadWrapper::RunTask(int task_id) {
229 bool have_message = false; 229 bool have_message = false;
230 talk_base::Message message; 230 talk_base::Message message;
231 { 231 {
232 base::AutoLock auto_lock(lock_); 232 base::AutoLock auto_lock(lock_);
233 MessagesQueue::iterator it = messages_.find(task_id); 233 MessagesQueue::iterator it = messages_.find(task_id);
234 if (it != messages_.end()) { 234 if (it != messages_.end()) {
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
293 293
294 void JingleThreadWrapper::Stop() { 294 void JingleThreadWrapper::Stop() {
295 NOTREACHED(); 295 NOTREACHED();
296 } 296 }
297 297
298 void JingleThreadWrapper::Run() { 298 void JingleThreadWrapper::Run() {
299 NOTREACHED(); 299 NOTREACHED();
300 } 300 }
301 301
302 } // namespace jingle_glue 302 } // namespace jingle_glue
OLDNEW
« no previous file with comments | « jingle/glue/thread_wrapper.h ('k') | jingle/glue/thread_wrapper_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698