OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "tools/android/forwarder2/forwarder.h" | 5 #include "tools/android/forwarder2/forwarder.h" |
6 | 6 |
7 #include <errno.h> | 7 #include "base/basictypes.h" |
8 #include <stdio.h> | 8 #include "base/bind.h" |
9 #include <stdlib.h> | |
10 #include <string.h> | |
11 | |
12 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/memory/ref_counted.h" |
13 #include "base/posix/eintr_wrapper.h" | 11 #include "base/posix/eintr_wrapper.h" |
14 #include "base/safe_strerror_posix.h" | 12 #include "base/single_thread_task_runner.h" |
15 #include "tools/android/forwarder2/socket.h" | 13 #include "tools/android/forwarder2/socket.h" |
16 | 14 |
17 namespace forwarder2 { | 15 namespace forwarder2 { |
18 | |
19 namespace { | 16 namespace { |
20 | 17 |
21 // Helper class to buffer reads and writes from one socket to another. | 18 // Helper class to buffer reads and writes from one socket to another. |
22 class BufferedCopier { | 19 class BufferedCopier { |
23 public: | 20 public: |
24 // Does NOT own the pointers. | 21 // Does NOT own the pointers. |
25 BufferedCopier(Socket* socket_from, | 22 BufferedCopier(Socket* socket_from, |
26 Socket* socket_to) | 23 Socket* socket_to) |
27 : socket_from_(socket_from), | 24 : socket_from_(socket_from), |
28 socket_to_(socket_to), | 25 socket_to_(socket_to), |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
80 | 77 |
81 // A big buffer to let our file-over-http bridge work more like real file. | 78 // A big buffer to let our file-over-http bridge work more like real file. |
82 static const int kBufferSize = 1024 * 128; | 79 static const int kBufferSize = 1024 * 128; |
83 int bytes_read_; | 80 int bytes_read_; |
84 int write_offset_; | 81 int write_offset_; |
85 char buffer_[kBufferSize]; | 82 char buffer_[kBufferSize]; |
86 | 83 |
87 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); | 84 DISALLOW_COPY_AND_ASSIGN(BufferedCopier); |
88 }; | 85 }; |
89 | 86 |
| 87 // Internal class that wraps a helper thread to forward traffic between |
| 88 // |socket1| and |socket2|. After creating a new instance, call its Start() |
| 89 // method to launch operations. Thread stops automatically if one of the socket |
| 90 // disconnects, but ensures that all buffered writes to the other, still alive, |
| 91 // socket, are written first. When this happens, the instance will delete itself |
| 92 // automatically. |
| 93 // Note that the instance will always be destroyed on the same thread that |
| 94 // created it. |
| 95 class Forwarder { |
| 96 public: |
| 97 Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) |
| 98 : socket1_(socket1.Pass()), |
| 99 socket2_(socket2.Pass()), |
| 100 destructor_runner_(base::MessageLoopProxy::current()), |
| 101 thread_("ForwarderThread") { |
| 102 } |
| 103 |
| 104 void Start() { |
| 105 thread_.Start(); |
| 106 thread_.message_loop_proxy()->PostTask( |
| 107 FROM_HERE, |
| 108 base::Bind(&Forwarder::ThreadHandler, base::Unretained(this))); |
| 109 } |
| 110 |
| 111 private: |
| 112 void ThreadHandler() { |
| 113 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; |
| 114 fd_set read_fds; |
| 115 fd_set write_fds; |
| 116 |
| 117 // Copy from socket1 to socket2 |
| 118 BufferedCopier buffer1(socket1_.get(), socket2_.get()); |
| 119 // Copy from socket2 to socket1 |
| 120 BufferedCopier buffer2(socket2_.get(), socket1_.get()); |
| 121 |
| 122 bool run = true; |
| 123 while (run) { |
| 124 FD_ZERO(&read_fds); |
| 125 FD_ZERO(&write_fds); |
| 126 |
| 127 buffer1.AddToReadSet(&read_fds); |
| 128 buffer2.AddToReadSet(&read_fds); |
| 129 buffer1.AddToWriteSet(&write_fds); |
| 130 buffer2.AddToWriteSet(&write_fds); |
| 131 |
| 132 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { |
| 133 PLOG(ERROR) << "select"; |
| 134 break; |
| 135 } |
| 136 // When a socket in the read set closes the connection, select() returns |
| 137 // with that socket descriptor set as "ready to read". When we call |
| 138 // TryRead() below, it will return false, but the while loop will continue |
| 139 // to run until all the write operations are finished, to make sure the |
| 140 // buffers are completely flushed out. |
| 141 |
| 142 // Keep running while we have some operation to do. |
| 143 run = buffer1.TryRead(read_fds); |
| 144 run = run || buffer2.TryRead(read_fds); |
| 145 run = run || buffer1.TryWrite(write_fds); |
| 146 run = run || buffer2.TryWrite(write_fds); |
| 147 } |
| 148 |
| 149 // Note that the thread that |destruction_runner_| runs tasks on could be |
| 150 // temporarily blocked on I/O (e.g. select()) therefore it is safer to close |
| 151 // the sockets now rather than relying on the destructor. |
| 152 socket1_.reset(); |
| 153 socket2_.reset(); |
| 154 |
| 155 // Note that base::Thread must be destroyed on the thread it was created on. |
| 156 destructor_runner_->DeleteSoon(FROM_HERE, this); |
| 157 } |
| 158 |
| 159 scoped_ptr<Socket> socket1_; |
| 160 scoped_ptr<Socket> socket2_; |
| 161 scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_; |
| 162 base::Thread thread_; |
| 163 }; |
| 164 |
90 } // namespace | 165 } // namespace |
91 | 166 |
92 Forwarder::Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) | 167 void StartForwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2) { |
93 : socket1_(socket1.Pass()), | 168 (new Forwarder(socket1.Pass(), socket2.Pass()))->Start(); |
94 socket2_(socket2.Pass()) { | |
95 DCHECK(socket1_.get()); | |
96 DCHECK(socket2_.get()); | |
97 } | 169 } |
98 | 170 |
99 Forwarder::~Forwarder() { | 171 } // namespace forwarder2 |
100 Detach(); | |
101 } | |
102 | |
103 void Forwarder::Run() { | |
104 const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1; | |
105 fd_set read_fds; | |
106 fd_set write_fds; | |
107 | |
108 // Copy from socket1 to socket2 | |
109 BufferedCopier buffer1(socket1_.get(), socket2_.get()); | |
110 | |
111 // Copy from socket2 to socket1 | |
112 BufferedCopier buffer2(socket2_.get(), socket1_.get()); | |
113 | |
114 bool run = true; | |
115 while (run) { | |
116 FD_ZERO(&read_fds); | |
117 FD_ZERO(&write_fds); | |
118 | |
119 buffer1.AddToReadSet(&read_fds); | |
120 buffer2.AddToReadSet(&read_fds); | |
121 buffer1.AddToWriteSet(&write_fds); | |
122 buffer2.AddToWriteSet(&write_fds); | |
123 | |
124 if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) { | |
125 LOG(ERROR) << "Select error: " << safe_strerror(errno); | |
126 break; | |
127 } | |
128 // When a socket in the read set closes the connection, select() returns | |
129 // with that socket descriptor set as "ready to read". When we call | |
130 // TryRead() below, it will return false, but the while loop will continue | |
131 // to run until all the write operations are finished, to make sure the | |
132 // buffers are completely flushed out. | |
133 | |
134 // Keep running while we have some operation to do. | |
135 run = buffer1.TryRead(read_fds); | |
136 run = run || buffer2.TryRead(read_fds); | |
137 run = run || buffer1.TryWrite(write_fds); | |
138 run = run || buffer2.TryWrite(write_fds); | |
139 } | |
140 | |
141 delete this; | |
142 } | |
143 | |
144 void Forwarder::Join() { | |
145 NOTREACHED() << "Can't Join a Forwarder thread."; | |
146 } | |
147 | |
148 } // namespace forwarder | |
OLD | NEW |