OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "tools/android/forwarder2/forwarder.h" | 5 #include "tools/android/forwarder2/forwarder.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/bind.h" | 8 #include "base/bind.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
11 #include "base/message_loop/message_loop_proxy.h" | 11 #include "base/message_loop/message_loop_proxy.h" |
12 #include "base/posix/eintr_wrapper.h" | 12 #include "base/posix/eintr_wrapper.h" |
13 #include "base/single_thread_task_runner.h" | 13 #include "base/single_thread_task_runner.h" |
14 #include "base/threading/thread.h" | 14 #include "base/threading/thread.h" |
| 15 #include "tools/android/forwarder2/pipe_notifier.h" |
15 #include "tools/android/forwarder2/socket.h" | 16 #include "tools/android/forwarder2/socket.h" |
16 | 17 |
17 namespace forwarder2 { | 18 namespace forwarder2 { |
18 namespace { | 19 namespace { |
19 | 20 |
20 // Helper class to buffer reads and writes from one socket to another. | 21 // Helper class to buffer reads and writes from one socket to another. |
21 // Each implements a small buffer connected two one input socket, and | 22 // Each implements a small buffer connected two one input socket, and |
22 // one output socket. | 23 // one output socket. |
23 // | 24 // |
24 // socket_from_ ---> [BufferedCopier] ---> socket_to_ | 25 // socket_from_ ---> [BufferedCopier] ---> socket_to_ |
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 static const int kBufferSize = 1024 * 128; | 209 static const int kBufferSize = 1024 * 128; |
209 int bytes_read_; | 210 int bytes_read_; |
210 int write_offset_; | 211 int write_offset_; |
211 BufferedCopier* peer_; | 212 BufferedCopier* peer_; |
212 State state_; | 213 State state_; |
213 char buffer_[kBufferSize]; | 214 char buffer_[kBufferSize]; |
214 | 215 |
215 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); | 216 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
216 }; | 217 }; |
217 | 218 |
218 // Internal class that wraps a helper thread to forward traffic between | |
219 // |socket1| and |socket2|. After creating a new instance, call its Start() | |
220 // method to launch operations. Thread stops automatically if one of the socket | |
221 // disconnects, but ensures that all buffered writes to the other, still alive, | |
222 // socket, are written first. When this happens, the instance will delete itself | |
223 // automatically. | |
224 // Note that the instance will always be destroyed on the same thread that | |
225 // created it. | |
226 class Forwarder { | |
227 public: | |
228 // Create a new Forwarder instance. |socket1| and |socket2| are the two socket | |
229 // endpoints. | |
230 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) | |
231 : socket1_(socket1.Pass()), | |
232 socket2_(socket2.Pass()), | |
233 destructor_runner_(base::MessageLoopProxy::current()), | |
234 thread_("ForwarderThread") {} | |
235 | |
236 void Start() { | |
237 thread_.Start(); | |
238 thread_.message_loop_proxy()->PostTask( | |
239 FROM_HERE, | |
240 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); | |
241 } | |
242 | |
243 private: | |
244 void ThreadHandler() { | |
245 fd_set read_fds; | |
246 fd_set write_fds; | |
247 | |
248 // Copy from socket1 to socket2 | |
249 BufferedCopier buffer1(socket1_.get(), socket2_.get()); | |
250 | |
251 // Copy from socket2 to socket1 | |
252 BufferedCopier buffer2(socket2_.get(), socket1_.get()); | |
253 | |
254 buffer1.SetPeer(&buffer2); | |
255 buffer2.SetPeer(&buffer1); | |
256 | |
257 for (;;) { | |
258 FD_ZERO(&read_fds); | |
259 FD_ZERO(&write_fds); | |
260 | |
261 int max_fd = -1; | |
262 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); | |
263 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); | |
264 | |
265 if (max_fd < 0) { | |
266 // Both buffers are closed. Exit immediately. | |
267 break; | |
268 } | |
269 | |
270 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= | |
271 0) { | |
272 PLOG(ERROR) << "select"; | |
273 break; | |
274 } | |
275 | |
276 buffer1.ProcessSelect(read_fds, write_fds); | |
277 buffer2.ProcessSelect(read_fds, write_fds); | |
278 } | |
279 | |
280 // Note that the thread that |destruction_runner_| runs tasks on could be | |
281 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close | |
282 // the sockets now rather than relying on the destructor. | |
283 socket1_.reset(); | |
284 socket2_.reset(); | |
285 | |
286 // Ensure the object is destroyed on the thread that created it. | |
287 destructor_runner_->DeleteSoon(FROM_HERE, this); | |
288 } | |
289 | |
290 scoped_ptr<Socket> socket1_; | |
291 scoped_ptr<Socket> socket2_; | |
292 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; | |
293 base::Thread thread_; | |
294 }; | |
295 | |
296 } // namespace | 219 } // namespace |
297 | 220 |
298 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { | 221 Forwarder::Forwarder(scoped_ptr<Socket> socket1, |
299 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); | 222 scoped_ptr<Socket> socket2, |
| 223 PipeNotifier* deletion_notifier, |
| 224 const ErrorCallback& error_callback) |
| 225 : self_deleter_helper_(this, error_callback), |
| 226 deletion_notifier_(deletion_notifier), |
| 227 socket1_(socket1.Pass()), |
| 228 socket2_(socket2.Pass()), |
| 229 thread_("ForwarderThread") { |
| 230 DCHECK(deletion_notifier_); |
| 231 } |
| 232 |
| 233 Forwarder::~Forwarder() {} |
| 234 |
| 235 void Forwarder::Start() { |
| 236 thread_.Start(); |
| 237 thread_.message_loop_proxy()->PostTask( |
| 238 FROM_HERE, |
| 239 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
| 240 } |
| 241 |
| 242 void Forwarder::ThreadHandler() { |
| 243 fd_set read_fds; |
| 244 fd_set write_fds; |
| 245 |
| 246 // Copy from socket1 to socket2 |
| 247 BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
| 248 // Copy from socket2 to socket1 |
| 249 BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
| 250 |
| 251 buffer1.SetPeer(&buffer2); |
| 252 buffer2.SetPeer(&buffer1); |
| 253 |
| 254 for (;;) { |
| 255 FD_ZERO(&read_fds); |
| 256 FD_ZERO(&write_fds); |
| 257 |
| 258 int max_fd = -1; |
| 259 buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
| 260 buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
| 261 |
| 262 if (max_fd < 0) { |
| 263 // Both buffers are closed. Exit immediately. |
| 264 break; |
| 265 } |
| 266 |
| 267 const int deletion_fd = deletion_notifier_->receiver_fd(); |
| 268 if (deletion_fd >= 0) { |
| 269 FD_SET(deletion_fd, &read_fds); |
| 270 max_fd = std::max(max_fd, deletion_fd); |
| 271 } |
| 272 |
| 273 if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= |
| 274 0) { |
| 275 PLOG(ERROR) << "select"; |
| 276 break; |
| 277 } |
| 278 |
| 279 buffer1.ProcessSelect(read_fds, write_fds); |
| 280 buffer2.ProcessSelect(read_fds, write_fds); |
| 281 |
| 282 if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) { |
| 283 buffer1.Close(); |
| 284 buffer2.Close(); |
| 285 } |
| 286 } |
| 287 |
| 288 // Note that the thread that the destructor will run on could be temporarily |
| 289 // blocked on I/O (e.g. select()) therefore it is safer to close the sockets |
| 290 // now rather than relying on the destructor. |
| 291 socket1_.reset(); |
| 292 socket2_.reset(); |
| 293 |
| 294 self_deleter_helper_.MaybeSelfDeleteSoon(); |
300 } | 295 } |
301 | 296 |
302 } // namespace forwarder2 | 297 } // namespace forwarder2 |
OLD | NEW |