Index: tools/android/forwarder2/forwarder.cc |
diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc |
index 2f25b9cb3ac223d0d12f72726e7ee6a1323918ee..d747dbe6558ae98623f5ea92b840c01b11bf8b6f 100644 |
--- a/tools/android/forwarder2/forwarder.cc |
+++ b/tools/android/forwarder2/forwarder.cc |
@@ -12,6 +12,7 @@ |
#include "base/posix/eintr_wrapper.h" |
#include "base/single_thread_task_runner.h" |
#include "base/threading/thread.h" |
+#include "tools/android/forwarder2/pipe_notifier.h" |
#include "tools/android/forwarder2/socket.h" |
namespace forwarder2 { |
@@ -215,88 +216,82 @@ class BufferedCopier { |
DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
}; |
-// Internal class that wraps a helper thread to forward traffic between |
-// |socket1| and |socket2|. After creating a new instance, call its Start() |
-// method to launch operations. Thread stops automatically if one of the socket |
-// disconnects, but ensures that all buffered writes to the other, still alive, |
-// socket, are written first. When this happens, the instance will delete itself |
-// automatically. |
-// Note that the instance will always be destroyed on the same thread that |
-// created it. |
-class Forwarder { |
- public: |
- // Create a new Forwarder instance. |socket1| and |socket2| are the two socket |
- // endpoints. |
- Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) |
- : socket1_(socket1.Pass()), |
- socket2_(socket2.Pass()), |
- destructor_runner_(base::MessageLoopProxy::current()), |
- thread_("ForwarderThread") {} |
- |
- void Start() { |
- thread_.Start(); |
- thread_.message_loop_proxy()->PostTask( |
- FROM_HERE, |
- base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
- } |
+} // namespace |
- private: |
- void ThreadHandler() { |
- fd_set read_fds; |
- fd_set write_fds; |
+Forwarder::Forwarder(scoped_ptr<Socket> socket1, |
+ scoped_ptr<Socket> socket2, |
+ PipeNotifier* deletion_notifier, |
+ const ErrorCallback& error_callback) |
+ : self_deleter_helper_(this, error_callback), |
+ deletion_notifier_(deletion_notifier), |
+ socket1_(socket1.Pass()), |
+ socket2_(socket2.Pass()), |
+ thread_("ForwarderThread") { |
+ DCHECK(deletion_notifier_); |
+} |
- // Copy from socket1 to socket2 |
- BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
+Forwarder::~Forwarder() {} |
- // Copy from socket2 to socket1 |
- BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
+void Forwarder::Start() { |
+ thread_.Start(); |
+ thread_.message_loop_proxy()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
+} |
- buffer1.SetPeer(&buffer2); |
- buffer2.SetPeer(&buffer1); |
+void Forwarder::ThreadHandler() { |
+ fd_set read_fds; |
+ fd_set write_fds; |
- for (;;) { |
- FD_ZERO(&read_fds); |
- FD_ZERO(&write_fds); |
+ // Copy from socket1 to socket2 |
+ BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
+ // Copy from socket2 to socket1 |
+ BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
- int max_fd = -1; |
- buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
- buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
+ buffer1.SetPeer(&buffer2); |
+ buffer2.SetPeer(&buffer1); |
- if (max_fd < 0) { |
- // Both buffers are closed. Exit immediately. |
- break; |
- } |
+ for (;;) { |
+ FD_ZERO(&read_fds); |
+ FD_ZERO(&write_fds); |
- if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= |
- 0) { |
- PLOG(ERROR) << "select"; |
- break; |
- } |
+ int max_fd = -1; |
+ buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd); |
+ buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd); |
- buffer1.ProcessSelect(read_fds, write_fds); |
- buffer2.ProcessSelect(read_fds, write_fds); |
+ if (max_fd < 0) { |
+ // Both buffers are closed. Exit immediately. |
+ break; |
} |
- // Note that the thread that |destruction_runner_| runs tasks on could be |
- // temporarily blocked on I/O (e.g. select()) therefore it is safer to close |
- // the sockets now rather than relying on the destructor. |
- socket1_.reset(); |
- socket2_.reset(); |
+ const int deletion_fd = deletion_notifier_->receiver_fd(); |
+ if (deletion_fd >= 0) { |
+ FD_SET(deletion_fd, &read_fds); |
+ max_fd = std::max(max_fd, deletion_fd); |
+ } |
- // Ensure the object is destroyed on the thread that created it. |
- destructor_runner_->DeleteSoon(FROM_HERE, this); |
- } |
+ if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <= |
+ 0) { |
+ PLOG(ERROR) << "select"; |
+ break; |
+ } |
- scoped_ptr<Socket> socket1_; |
- scoped_ptr<Socket> socket2_; |
- scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; |
- base::Thread thread_; |
-}; |
+ buffer1.ProcessSelect(read_fds, write_fds); |
+ buffer2.ProcessSelect(read_fds, write_fds); |
-} // namespace |
+ if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) { |
+ buffer1.Close(); |
+ buffer2.Close(); |
+ } |
+ } |
+ |
+ // Note that the thread that the destructor will run on could be temporarily |
+ // blocked on I/O (e.g. select()) therefore it is safer to close the sockets |
+ // now rather than relying on the destructor. |
+ socket1_.reset(); |
+ socket2_.reset(); |
-void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { |
- (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); |
+ self_deleter_helper_.MaybeSelfDeleteSoon(); |
} |
} // namespace forwarder2 |