Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: tools/android/forwarder2/forwarder.cc

Issue 60033002: Make HostController/DeviceListener deletion fully synchronous. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/forwarder.gyp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "tools/android/forwarder2/forwarder.h" 5 #include "tools/android/forwarder2/forwarder.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/bind.h" 8 #include "base/bind.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
11 #include "base/message_loop/message_loop_proxy.h" 11 #include "base/message_loop/message_loop_proxy.h"
12 #include "base/posix/eintr_wrapper.h" 12 #include "base/posix/eintr_wrapper.h"
13 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
14 #include "base/threading/thread.h" 14 #include "base/threading/thread.h"
15 #include "tools/android/forwarder2/pipe_notifier.h"
15 #include "tools/android/forwarder2/socket.h" 16 #include "tools/android/forwarder2/socket.h"
16 17
17 namespace forwarder2 { 18 namespace forwarder2 {
18 namespace { 19 namespace {
19 20
20 // Helper class to buffer reads and writes from one socket to another. 21 // Helper class to buffer reads and writes from one socket to another.
21 // Each implements a small buffer connected two one input socket, and 22 // Each implements a small buffer connected two one input socket, and
22 // one output socket. 23 // one output socket.
23 // 24 //
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ 25 // socket_from_ ---> [BufferedCopier] ---> socket_to_
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
208 static const int kBufferSize = 1024 * 128; 209 static const int kBufferSize = 1024 * 128;
209 int bytes_read_; 210 int bytes_read_;
210 int write_offset_; 211 int write_offset_;
211 BufferedCopier* peer_; 212 BufferedCopier* peer_;
212 State state_; 213 State state_;
213 char buffer_[kBufferSize]; 214 char buffer_[kBufferSize];
214 215
215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); 216 DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
216 }; 217 };
217 218
218 // Internal class that wraps a helper thread to forward traffic between
219 // |socket1| and |socket2|. After creating a new instance, call its Start()
220 // method to launch operations. Thread stops automatically if one of the socket
221 // disconnects, but ensures that all buffered writes to the other, still alive,
222 // socket, are written first. When this happens, the instance will delete itself
223 // automatically.
224 // Note that the instance will always be destroyed on the same thread that
225 // created it.
226 class Forwarder {
227 public:
228 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
229 // endpoints.
230 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
231 : socket1_(socket1.Pass()),
232 socket2_(socket2.Pass()),
233 destructor_runner_(base::MessageLoopProxy::current()),
234 thread_("ForwarderThread") {}
235
236 void Start() {
237 thread_.Start();
238 thread_.message_loop_proxy()->PostTask(
239 FROM_HERE,
240 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
241 }
242
243 private:
244 void ThreadHandler() {
245 fd_set read_fds;
246 fd_set write_fds;
247
248 // Copy from socket1 to socket2
249 BufferedCopier buffer1(socket1_.get(), socket2_.get());
250
251 // Copy from socket2 to socket1
252 BufferedCopier buffer2(socket2_.get(), socket1_.get());
253
254 buffer1.SetPeer(&buffer2);
255 buffer2.SetPeer(&buffer1);
256
257 for (;;) {
258 FD_ZERO(&read_fds);
259 FD_ZERO(&write_fds);
260
261 int max_fd = -1;
262 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
263 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
264
265 if (max_fd < 0) {
266 // Both buffers are closed. Exit immediately.
267 break;
268 }
269
270 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
271 0) {
272 PLOG(ERROR) << "select";
273 break;
274 }
275
276 buffer1.ProcessSelect(read_fds, write_fds);
277 buffer2.ProcessSelect(read_fds, write_fds);
278 }
279
280 // Note that the thread that |destruction_runner_| runs tasks on could be
281 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
282 // the sockets now rather than relying on the destructor.
283 socket1_.reset();
284 socket2_.reset();
285
286 // Ensure the object is destroyed on the thread that created it.
287 destructor_runner_->DeleteSoon(FROM_HERE, this);
288 }
289
290 scoped_ptr<Socket> socket1_;
291 scoped_ptr<Socket> socket2_;
292 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
293 base::Thread thread_;
294 };
295
296 } // namespace 219 } // namespace
297 220
298 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { 221 Forwarder::Forwarder(scoped_ptr<Socket> socket1,
299 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); 222 scoped_ptr<Socket> socket2,
223 PipeNotifier* deletion_notifier,
224 const ErrorCallback& error_callback)
225 : self_deleter_helper_(this, error_callback),
226 deletion_notifier_(deletion_notifier),
227 socket1_(socket1.Pass()),
228 socket2_(socket2.Pass()),
229 thread_("ForwarderThread") {
230 DCHECK(deletion_notifier_);
231 }
232
233 Forwarder::~Forwarder() {}
234
235 void Forwarder::Start() {
236 thread_.Start();
237 thread_.message_loop_proxy()->PostTask(
238 FROM_HERE,
239 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
240 }
241
242 void Forwarder::ThreadHandler() {
243 fd_set read_fds;
244 fd_set write_fds;
245
246 // Copy from socket1 to socket2
247 BufferedCopier buffer1(socket1_.get(), socket2_.get());
248 // Copy from socket2 to socket1
249 BufferedCopier buffer2(socket2_.get(), socket1_.get());
250
251 buffer1.SetPeer(&buffer2);
252 buffer2.SetPeer(&buffer1);
253
254 for (;;) {
255 FD_ZERO(&read_fds);
256 FD_ZERO(&write_fds);
257
258 int max_fd = -1;
259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
261
262 if (max_fd < 0) {
263 // Both buffers are closed. Exit immediately.
264 break;
265 }
266
267 const int deletion_fd = deletion_notifier_->receiver_fd();
268 if (deletion_fd >= 0) {
269 FD_SET(deletion_fd, &read_fds);
270 max_fd = std::max(max_fd, deletion_fd);
271 }
272
273 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
274 0) {
275 PLOG(ERROR) << "select";
276 break;
277 }
278
279 buffer1.ProcessSelect(read_fds, write_fds);
280 buffer2.ProcessSelect(read_fds, write_fds);
281
282 if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
283 buffer1.Close();
284 buffer2.Close();
285 }
286 }
287
288 // Note that the thread that the destructor will run on could be temporarily
289 // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
290 // now rather than relying on the destructor.
291 socket1_.reset();
292 socket2_.reset();
293
294 self_deleter_helper_.MaybeSelfDeleteSoon();
300 } 295 }
301 296
302 } // namespace forwarder2 297 } // namespace forwarder2
OLDNEW
« no previous file with comments | « tools/android/forwarder2/forwarder.h ('k') | tools/android/forwarder2/forwarder.gyp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698