Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(99)

Side by Side Diff: remoting/protocol/buffered_socket_writer.cc

Issue 10836030: Add unittests for BufferedSocketWriter and fix some bugs in that code. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/buffered_socket_writer.h" 5 #include "remoting/protocol/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
9 #include "base/stl_util.h" 10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
10 #include "net/base/net_errors.h" 12 #include "net/base/net_errors.h"
11 13
12 namespace remoting { 14 namespace remoting {
13 namespace protocol { 15 namespace protocol {
14 16
15 class BufferedSocketWriterBase::PendingPacket { 17 struct BufferedSocketWriterBase::PendingPacket {
16 public:
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, 18 PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
18 const base::Closure& done_task) 19 const base::Closure& done_task)
19 : data_(data), 20 : data(data),
20 done_task_(done_task) { 21 done_task(done_task) {
21 }
22 ~PendingPacket() {
23 if (!done_task_.is_null())
24 done_task_.Run();
25 } 22 }
26 23
27 net::IOBufferWithSize* data() { 24 scoped_refptr<net::IOBufferWithSize> data;
28 return data_; 25 base::Closure done_task;
29 }
30
31 private:
32 scoped_refptr<net::IOBufferWithSize> data_;
33 base::Closure done_task_;
34
35 DISALLOW_COPY_AND_ASSIGN(PendingPacket);
36 }; 26 };
37 27
38 BufferedSocketWriterBase::BufferedSocketWriterBase() 28 BufferedSocketWriterBase::BufferedSocketWriterBase()
39 : buffer_size_(0), 29 : buffer_size_(0),
40 socket_(NULL), 30 socket_(NULL),
41 write_pending_(false), 31 write_pending_(false),
42 closed_(false) { 32 closed_(false),
33 destroyed_flag_(NULL) {
43 } 34 }
44 35
45 void BufferedSocketWriterBase::Init(net::Socket* socket, 36 void BufferedSocketWriterBase::Init(net::Socket* socket,
46 const WriteFailedCallback& callback) { 37 const WriteFailedCallback& callback) {
47 DCHECK(CalledOnValidThread()); 38 DCHECK(CalledOnValidThread());
48 DCHECK(socket); 39 DCHECK(socket);
49 socket_ = socket; 40 socket_ = socket;
50 write_failed_callback_ = callback; 41 write_failed_callback_ = callback;
51 } 42 }
52 43
53 bool BufferedSocketWriterBase::Write( 44 bool BufferedSocketWriterBase::Write(
54 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { 45 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
55 DCHECK(CalledOnValidThread()); 46 DCHECK(CalledOnValidThread());
56 DCHECK(socket_); 47 DCHECK(socket_);
48 DCHECK(data.get());
57 49
58 // Don't write after Close(). 50 // Don't write after Close().
59 if (closed_) 51 if (closed_)
60 return false; 52 return false;
61 53
62 queue_.push_back(new PendingPacket(data, done_task)); 54 queue_.push_back(new PendingPacket(data, done_task));
63 buffer_size_ += data->size(); 55 buffer_size_ += data->size();
64 56
65 DoWrite(); 57 DoWrite();
66 return true; 58 return true;
(...skipping 17 matching lines...) Expand all
84 GetNextPacket(&current_packet, &current_packet_size); 76 GetNextPacket(&current_packet, &current_packet_size);
85 77
86 // Return if the queue is empty. 78 // Return if the queue is empty.
87 if (!current_packet) 79 if (!current_packet)
88 return; 80 return;
89 81
90 int result = socket_->Write( 82 int result = socket_->Write(
91 current_packet, current_packet_size, 83 current_packet, current_packet_size,
92 base::Bind(&BufferedSocketWriterBase::OnWritten, 84 base::Bind(&BufferedSocketWriterBase::OnWritten,
93 base::Unretained(this))); 85 base::Unretained(this)));
94 if (result >= 0) { 86 bool write_again = false;
95 AdvanceBufferPosition(result); 87 HandleWriteResult(result, &write_again);
88 if (!write_again)
89 return;
90 }
91 }
92
93 void BufferedSocketWriterBase::HandleWriteResult(int result,
94 bool* write_again) {
95 *write_again = false;
96 if (result < 0) {
97 if (result == net::ERR_IO_PENDING) {
98 write_pending_ = true;
96 } else { 99 } else {
97 if (result == net::ERR_IO_PENDING) { 100 HandleError(result);
98 write_pending_ = true; 101 if (!write_failed_callback_.is_null())
99 } else { 102 write_failed_callback_.Run(result);
100 HandleError(result); 103 }
101 if (!write_failed_callback_.is_null()) 104 return;
102 write_failed_callback_.Run(result); 105 }
103 } 106
107 base::Closure done_task = AdvanceBufferPosition(result);
108 if (!done_task.is_null()) {
109 bool destroyed = false;
110 destroyed_flag_ = &destroyed;
111 done_task.Run();
112 if (destroyed) {
113 // Stop doing anything if we've been destroyed by the callback.
104 return; 114 return;
105 } 115 }
116 destroyed_flag_ = NULL;
106 } 117 }
118
119 *write_again = true;
107 } 120 }
108 121
109 void BufferedSocketWriterBase::OnWritten(int result) { 122 void BufferedSocketWriterBase::OnWritten(int result) {
110 DCHECK(CalledOnValidThread()); 123 DCHECK(CalledOnValidThread());
124 DCHECK(write_pending_);
111 write_pending_ = false; 125 write_pending_ = false;
112 126
113 if (result < 0) { 127 bool write_again;
114 HandleError(result); 128 HandleWriteResult(result, &write_again);
115 if (!write_failed_callback_.is_null()) 129 if (write_again)
116 write_failed_callback_.Run(result); 130 DoWrite();
117 return;
118 }
119
120 AdvanceBufferPosition(result);
121
122 DoWrite();
123 } 131 }
124 132
125 void BufferedSocketWriterBase::HandleError(int result) { 133 void BufferedSocketWriterBase::HandleError(int result) {
126 DCHECK(CalledOnValidThread()); 134 DCHECK(CalledOnValidThread());
127 135
128 closed_ = true; 136 closed_ = true;
129 137
130 STLDeleteElements(&queue_); 138 STLDeleteElements(&queue_);
131 139
132 // Notify subclass that an error is received. 140 // Notify subclass that an error is received.
133 OnError(result); 141 OnError(result);
134 } 142 }
135 143
136 int BufferedSocketWriterBase::GetBufferSize() { 144 int BufferedSocketWriterBase::GetBufferSize() {
137 return buffer_size_; 145 return buffer_size_;
138 } 146 }
139 147
140 int BufferedSocketWriterBase::GetBufferChunks() { 148 int BufferedSocketWriterBase::GetBufferChunks() {
141 return queue_.size(); 149 return queue_.size();
142 } 150 }
143 151
144 void BufferedSocketWriterBase::Close() { 152 void BufferedSocketWriterBase::Close() {
145 DCHECK(CalledOnValidThread()); 153 DCHECK(CalledOnValidThread());
146 closed_ = true; 154 closed_ = true;
147 } 155 }
148 156
149 BufferedSocketWriterBase::~BufferedSocketWriterBase() {} 157 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
158 if (destroyed_flag_)
159 *destroyed_flag_ = true;
150 160
151 void BufferedSocketWriterBase::PopQueue() { 161 STLDeleteElements(&queue_);
152 // This also calls |done_task|. 162 }
163
164 base::Closure BufferedSocketWriterBase::PopQueue() {
165 base::Closure result = queue_.front()->done_task;
153 delete queue_.front(); 166 delete queue_.front();
154 queue_.pop_front(); 167 queue_.pop_front();
168 return result;
155 } 169 }
156 170
157 BufferedSocketWriter::BufferedSocketWriter() { 171 BufferedSocketWriter::BufferedSocketWriter() {
158 } 172 }
159 173
160 void BufferedSocketWriter::GetNextPacket( 174 void BufferedSocketWriter::GetNextPacket(
161 net::IOBuffer** buffer, int* size) { 175 net::IOBuffer** buffer, int* size) {
162 if (!current_buf_) { 176 if (!current_buf_) {
163 if (queue_.empty()) { 177 if (queue_.empty()) {
164 *buffer = NULL; 178 *buffer = NULL;
165 return; // Nothing to write. 179 return; // Nothing to write.
166 } 180 }
167 current_buf_ = new net::DrainableIOBuffer( 181 current_buf_ = new net::DrainableIOBuffer(
168 queue_.front()->data(), queue_.front()->data()->size()); 182 queue_.front()->data, queue_.front()->data->size());
169 } 183 }
170 184
171 *buffer = current_buf_; 185 *buffer = current_buf_;
172 *size = current_buf_->BytesRemaining(); 186 *size = current_buf_->BytesRemaining();
173 } 187 }
174 188
175 void BufferedSocketWriter::AdvanceBufferPosition(int written) { 189 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
176 buffer_size_ -= written; 190 buffer_size_ -= written;
177 current_buf_->DidConsume(written); 191 current_buf_->DidConsume(written);
178 192
179 if (current_buf_->BytesRemaining() == 0) { 193 if (current_buf_->BytesRemaining() == 0) {
180 PopQueue();
181 current_buf_ = NULL; 194 current_buf_ = NULL;
195 return PopQueue();
182 } 196 }
197 return base::Closure();
183 } 198 }
184 199
185 void BufferedSocketWriter::OnError(int result) { 200 void BufferedSocketWriter::OnError(int result) {
186 current_buf_ = NULL; 201 current_buf_ = NULL;
187 } 202 }
188 203
189 BufferedSocketWriter::~BufferedSocketWriter() { 204 BufferedSocketWriter::~BufferedSocketWriter() {
190 STLDeleteElements(&queue_);
191 } 205 }
192 206
193 BufferedDatagramWriter::BufferedDatagramWriter() { 207 BufferedDatagramWriter::BufferedDatagramWriter() {
194 } 208 }
195 209
196 void BufferedDatagramWriter::GetNextPacket( 210 void BufferedDatagramWriter::GetNextPacket(
197 net::IOBuffer** buffer, int* size) { 211 net::IOBuffer** buffer, int* size) {
198 if (queue_.empty()) { 212 if (queue_.empty()) {
199 *buffer = NULL; 213 *buffer = NULL;
200 return; // Nothing to write. 214 return; // Nothing to write.
201 } 215 }
202 *buffer = queue_.front()->data(); 216 *buffer = queue_.front()->data;
203 *size = queue_.front()->data()->size(); 217 *size = queue_.front()->data->size();
204 } 218 }
205 219
206 void BufferedDatagramWriter::AdvanceBufferPosition(int written) { 220 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
207 DCHECK_EQ(written, queue_.front()->data()->size()); 221 DCHECK_EQ(written, queue_.front()->data->size());
208 buffer_size_ -= queue_.front()->data()->size(); 222 buffer_size_ -= queue_.front()->data->size();
209 PopQueue(); 223 return PopQueue();
210 } 224 }
211 225
212 void BufferedDatagramWriter::OnError(int result) { 226 void BufferedDatagramWriter::OnError(int result) {
213 // Nothing to do here. 227 // Nothing to do here.
214 } 228 }
215 229
216 BufferedDatagramWriter::~BufferedDatagramWriter() {} 230 BufferedDatagramWriter::~BufferedDatagramWriter() {
231 }
217 232
218 } // namespace protocol 233 } // namespace protocol
219 } // namespace remoting 234 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/buffered_socket_writer.h ('k') | remoting/protocol/buffered_socket_writer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698