| Index: tools/android/forwarder2/forwarder.cc
|
| diff --git a/tools/android/forwarder2/forwarder.cc b/tools/android/forwarder2/forwarder.cc
|
| index 2f25b9cb3ac223d0d12f72726e7ee6a1323918ee..d747dbe6558ae98623f5ea92b840c01b11bf8b6f 100644
|
| --- a/tools/android/forwarder2/forwarder.cc
|
| +++ b/tools/android/forwarder2/forwarder.cc
|
| @@ -12,6 +12,7 @@
|
| #include "base/posix/eintr_wrapper.h"
|
| #include "base/single_thread_task_runner.h"
|
| #include "base/threading/thread.h"
|
| +#include "tools/android/forwarder2/pipe_notifier.h"
|
| #include "tools/android/forwarder2/socket.h"
|
|
|
| namespace forwarder2 {
|
| @@ -215,88 +216,82 @@ class BufferedCopier {
|
| DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
|
| };
|
|
|
| -// Internal class that wraps a helper thread to forward traffic between
|
| -// |socket1| and |socket2|. After creating a new instance, call its Start()
|
| -// method to launch operations. Thread stops automatically if one of the socket
|
| -// disconnects, but ensures that all buffered writes to the other, still alive,
|
| -// socket, are written first. When this happens, the instance will delete itself
|
| -// automatically.
|
| -// Note that the instance will always be destroyed on the same thread that
|
| -// created it.
|
| -class Forwarder {
|
| - public:
|
| - // Create a new Forwarder instance. |socket1| and |socket2| are the two socket
|
| - // endpoints.
|
| - Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
|
| - : socket1_(socket1.Pass()),
|
| - socket2_(socket2.Pass()),
|
| - destructor_runner_(base::MessageLoopProxy::current()),
|
| - thread_("ForwarderThread") {}
|
| -
|
| - void Start() {
|
| - thread_.Start();
|
| - thread_.message_loop_proxy()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
|
| - }
|
| +} // namespace
|
|
|
| - private:
|
| - void ThreadHandler() {
|
| - fd_set read_fds;
|
| - fd_set write_fds;
|
| +Forwarder::Forwarder(scoped_ptr<Socket> socket1,
|
| + scoped_ptr<Socket> socket2,
|
| + PipeNotifier* deletion_notifier,
|
| + const ErrorCallback& error_callback)
|
| + : self_deleter_helper_(this, error_callback),
|
| + deletion_notifier_(deletion_notifier),
|
| + socket1_(socket1.Pass()),
|
| + socket2_(socket2.Pass()),
|
| + thread_("ForwarderThread") {
|
| + DCHECK(deletion_notifier_);
|
| +}
|
|
|
| - // Copy from socket1 to socket2
|
| - BufferedCopier buffer1(socket1_.get(), socket2_.get());
|
| +Forwarder::~Forwarder() {}
|
|
|
| - // Copy from socket2 to socket1
|
| - BufferedCopier buffer2(socket2_.get(), socket1_.get());
|
| +void Forwarder::Start() {
|
| + thread_.Start();
|
| + thread_.message_loop_proxy()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
|
| +}
|
|
|
| - buffer1.SetPeer(&buffer2);
|
| - buffer2.SetPeer(&buffer1);
|
| +void Forwarder::ThreadHandler() {
|
| + fd_set read_fds;
|
| + fd_set write_fds;
|
|
|
| - for (;;) {
|
| - FD_ZERO(&read_fds);
|
| - FD_ZERO(&write_fds);
|
| + // Copy from socket1 to socket2
|
| + BufferedCopier buffer1(socket1_.get(), socket2_.get());
|
| + // Copy from socket2 to socket1
|
| + BufferedCopier buffer2(socket2_.get(), socket1_.get());
|
|
|
| - int max_fd = -1;
|
| - buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
| - buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
| + buffer1.SetPeer(&buffer2);
|
| + buffer2.SetPeer(&buffer1);
|
|
|
| - if (max_fd < 0) {
|
| - // Both buffers are closed. Exit immediately.
|
| - break;
|
| - }
|
| + for (;;) {
|
| + FD_ZERO(&read_fds);
|
| + FD_ZERO(&write_fds);
|
|
|
| - if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
|
| - 0) {
|
| - PLOG(ERROR) << "select";
|
| - break;
|
| - }
|
| + int max_fd = -1;
|
| + buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
| + buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
|
|
|
| - buffer1.ProcessSelect(read_fds, write_fds);
|
| - buffer2.ProcessSelect(read_fds, write_fds);
|
| + if (max_fd < 0) {
|
| + // Both buffers are closed. Exit immediately.
|
| + break;
|
| }
|
|
|
| - // Note that the thread that |destruction_runner_| runs tasks on could be
|
| - // temporarily blocked on I/O (e.g. select()) therefore it is safer to close
|
| - // the sockets now rather than relying on the destructor.
|
| - socket1_.reset();
|
| - socket2_.reset();
|
| + const int deletion_fd = deletion_notifier_->receiver_fd();
|
| + if (deletion_fd >= 0) {
|
| + FD_SET(deletion_fd, &read_fds);
|
| + max_fd = std::max(max_fd, deletion_fd);
|
| + }
|
|
|
| - // Ensure the object is destroyed on the thread that created it.
|
| - destructor_runner_->DeleteSoon(FROM_HERE, this);
|
| - }
|
| + if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
|
| + 0) {
|
| + PLOG(ERROR) << "select";
|
| + break;
|
| + }
|
|
|
| - scoped_ptr<Socket> socket1_;
|
| - scoped_ptr<Socket> socket2_;
|
| - scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
|
| - base::Thread thread_;
|
| -};
|
| + buffer1.ProcessSelect(read_fds, write_fds);
|
| + buffer2.ProcessSelect(read_fds, write_fds);
|
|
|
| -} // namespace
|
| + if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
|
| + buffer1.Close();
|
| + buffer2.Close();
|
| + }
|
| + }
|
| +
|
| + // Note that the thread that the destructor will run on could be temporarily
|
| + // blocked on I/O (e.g. select()) therefore it is safer to close the sockets
|
| + // now rather than relying on the destructor.
|
| + socket1_.reset();
|
| + socket2_.reset();
|
|
|
| -void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) {
|
| - (new Forwarder(socket1.Pass(), socket2.Pass()))->Start();
|
| + self_deleter_helper_.MaybeSelfDeleteSoon();
|
| }
|
|
|
| } // namespace forwarder2
|
|
|