Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 14651009: Land Recent QUIC changes (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix integer constant is too large for 'unsigned long' type Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h"
8 9
9 using base::StringPiece; 10 using base::StringPiece;
11 using std::min;
10 12
11 namespace net { 13 namespace net {
12 14
13 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
14 QuicSession* session) 16 QuicSession* session)
15 : sequencer_(this), 17 : sequencer_(this),
16 id_(id), 18 id_(id),
17 session_(session), 19 session_(session),
18 visitor_(NULL), 20 visitor_(NULL),
19 stream_bytes_read_(0), 21 stream_bytes_read_(0),
20 stream_bytes_written_(0), 22 stream_bytes_written_(0),
23 headers_complete_(false),
24 headers_id_(0),
21 stream_error_(QUIC_STREAM_NO_ERROR), 25 stream_error_(QUIC_STREAM_NO_ERROR),
22 connection_error_(QUIC_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR),
23 read_side_closed_(false), 27 read_side_closed_(false),
24 write_side_closed_(false), 28 write_side_closed_(false),
25 fin_buffered_(false), 29 fin_buffered_(false),
26 fin_sent_(false) { 30 fin_sent_(false) {
27 } 31 }
28 32
29 ReliableQuicStream::~ReliableQuicStream() { 33 ReliableQuicStream::~ReliableQuicStream() {
30 } 34 }
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 CloseWriteSide(); 91 CloseWriteSide();
88 } 92 }
89 CloseReadSide(); 93 CloseReadSide();
90 } 94 }
91 95
92 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
93 stream_error_ = error; 97 stream_error_ = error;
94 session()->SendRstStream(id(), error); 98 session()->SendRstStream(id(), error);
95 } 99 }
96 100
101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
102 if (headers_complete_ && decompressed_headers_.empty()) {
103 return sequencer_.Readv(iov, iov_len);
104 }
105 size_t bytes_consumed = 0;
106 int iov_index = 0;
107 while (iov_index < iov_len &&
108 decompressed_headers_.length() > bytes_consumed) {
109 int bytes_to_read = min(iov[iov_index].iov_len,
110 decompressed_headers_.length() - bytes_consumed);
111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
112 memcpy(iov_ptr,
113 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
114 bytes_consumed += bytes_to_read;
115 ++iov_index;
116 }
117 decompressed_headers_.erase(0, bytes_consumed);
118 return bytes_consumed;
119 }
120
121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
122 if (headers_complete_ && decompressed_headers_.empty()) {
123 return sequencer_.GetReadableRegions(iov, iov_len);
124 }
125 if (iov_len == 0) {
126 return 0;
127 }
128 iov[0].iov_base = static_cast<void*>(
129 const_cast<char*>(decompressed_headers_.data()));
130 iov[0].iov_len = decompressed_headers_.length();
131 return 1;
132 }
133
97 bool ReliableQuicStream::IsHalfClosed() const { 134 bool ReliableQuicStream::IsHalfClosed() const {
135 if (!headers_complete_ || !decompressed_headers_.empty()) {
136 return false;
137 }
98 return sequencer_.IsHalfClosed(); 138 return sequencer_.IsHalfClosed();
99 } 139 }
100 140
101 bool ReliableQuicStream::IsClosed() const { 141 bool ReliableQuicStream::IsClosed() const {
102 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
103 } 143 }
104 144
105 bool ReliableQuicStream::HasBytesToRead() const { 145 bool ReliableQuicStream::HasBytesToRead() const {
106 return sequencer_.HasBytesToRead(); 146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
107 } 147 }
108 148
109 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 149 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
110 return session_->peer_address(); 150 return session_->peer_address();
111 } 151 }
112 152
113 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 153 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
114 return WriteOrBuffer(data, fin); 154 return WriteOrBuffer(data, fin);
115 } 155 }
116 156
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 } 221 }
182 DLOG(INFO) << "Done reading from stream " << id(); 222 DLOG(INFO) << "Done reading from stream " << id();
183 223
184 read_side_closed_ = true; 224 read_side_closed_ = true;
185 if (write_side_closed_) { 225 if (write_side_closed_) {
186 DLOG(INFO) << "Closing stream: " << id(); 226 DLOG(INFO) << "Closing stream: " << id();
187 session_->CloseStream(id()); 227 session_->CloseStream(id());
188 } 228 }
189 } 229 }
190 230
231 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
232 if (id() == kCryptoStreamId) {
233 // The crypto stream does not use compression.
234 return ProcessData(data, data_len);
235 }
236 uint32 total_bytes_consumed = 0;
237 if (headers_id_ == 0u) {
238 // The headers ID has not yet been read. Strip it from the beginning of
239 // the data stream.
240 DCHECK_GT(4u, headers_id_buffer_.length());
241 size_t missing_size = 4 - headers_id_buffer_.length();
242 if (data_len < missing_size) {
243 StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
244 return data_len;
245 }
246 total_bytes_consumed += missing_size;
247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
248 DCHECK_EQ(4u, headers_id_buffer_.length());
249 memcpy(&headers_id_, headers_id_buffer_.data(), 4);
250 headers_id_buffer_.clear();
251 data += missing_size;
252 data_len -= missing_size;
253 }
254 DCHECK_NE(0u, headers_id_);
255
256 // Once the headers are finished, we simply pass the data through.
257 if (headers_complete_ && decompressed_headers_.empty()) {
258 DVLOG(1) << "Delegating procesing to ProcessData";
259 return total_bytes_consumed + ProcessData(data, data_len);
260 }
261
262 QuicHeaderId current_header_id =
263 session_->decompressor()->current_header_id();
264 // Ensure that this header id looks sane.
265 if (headers_id_ < current_header_id ||
266 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
267 DVLOG(1) << "Invalud headers for stream: " << id()
268 << " header_id: " << headers_id_;
269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
270 }
271
272 // If we are head-of-line blocked on decompression, then back up.
273 if (current_header_id != headers_id_) {
274 session_->MarkDecompressionBlocked(headers_id_, id());
275 DVLOG(1) << "Unable to decmpress header data for stream: " << id()
276 << " header_id: " << headers_id_;
277 return total_bytes_consumed;
278 }
279
280 // Decompressed data will be delivered to decompressed_headers_.
281 size_t bytes_consumed = session_->decompressor()->DecompressData(
282 StringPiece(data, data_len), this);
283 total_bytes_consumed += bytes_consumed;
284
285 // Headers are complete if the decompressor has moved on to the
286 // next stream.
287 headers_complete_ =
288 session_->decompressor()->current_header_id() != headers_id_;
289
290 if (!decompressed_headers_.empty()) {
291 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
292 decompressed_headers_.length());
293 if (bytes_processed == decompressed_headers_.length()) {
294 decompressed_headers_.clear();
295 } else {
296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
297 }
298 }
299
300 // We have processed all of the decompressed data but we might
301 // have some more raw data to process.
302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
303 total_bytes_consumed += ProcessData(data + bytes_consumed,
304 data_len - bytes_consumed);
305 }
306
307 // The sequencer will push any additional buffered frames if this data
308 // has been completely consumed.
309 return total_bytes_consumed;
310 }
311
312 uint32 ReliableQuicStream::ProcessHeaderData() {
313 if (decompressed_headers_.empty()) {
314 return 0;
315 }
316
317 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
318 decompressed_headers_.length());
319 if (bytes_processed == decompressed_headers_.length()) {
320 decompressed_headers_.clear();
321 } else {
322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
323 }
324 return bytes_processed;
325 }
326
327 void ReliableQuicStream::OnDecompressorAvailable() {
328 DCHECK_EQ(headers_id_,
329 session_->decompressor()->current_header_id());
330 DCHECK(!headers_complete_);
331 DCHECK_EQ(0u, decompressed_headers_.length());
332
333 size_t total_bytes_consumed = 0;
334 struct iovec iovecs[5];
335 while (!headers_complete_) {
336 size_t num_iovecs =
337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
338
339 if (num_iovecs == 0) {
340 return;
341 }
342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) {
343 total_bytes_consumed += session_->decompressor()->DecompressData(
344 StringPiece(static_cast<char*>(iovecs[i].iov_base),
345 iovecs[i].iov_len), this);
346
347 headers_complete_ =
348 session_->decompressor()->current_header_id() != headers_id_;
349 }
350 }
351
352 // Either the headers are complete, or the all data as been consumed.
353 sequencer_.MarkConsumed(total_bytes_consumed);
354
355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
356
357 if (headers_complete_ && decompressed_headers_.empty()) {
358 sequencer_.FlushBufferedFrames();
359 }
360 }
361
362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
363 data.AppendToString(&decompressed_headers_);
364 return true;
365 }
366
191 void ReliableQuicStream::CloseWriteSide() { 367 void ReliableQuicStream::CloseWriteSide() {
192 if (write_side_closed_) { 368 if (write_side_closed_) {
193 return; 369 return;
194 } 370 }
195 DLOG(INFO) << "Done writing to stream " << id(); 371 DLOG(INFO) << "Done writing to stream " << id();
196 372
197 write_side_closed_ = true; 373 write_side_closed_ = true;
198 if (read_side_closed_) { 374 if (read_side_closed_) {
199 DLOG(INFO) << "Closing stream: " << id(); 375 DLOG(INFO) << "Closing stream: " << id();
200 session_->CloseStream(id()); 376 session_->CloseStream(id());
201 } 377 }
202 } 378 }
203 379
204 void ReliableQuicStream::OnClose() { 380 void ReliableQuicStream::OnClose() {
205 CloseReadSide(); 381 CloseReadSide();
206 CloseWriteSide(); 382 CloseWriteSide();
207 383
208 if (visitor_) { 384 if (visitor_) {
209 Visitor* visitor = visitor_; 385 Visitor* visitor = visitor_;
210 // Calling Visitor::OnClose() may result the destruction of the visitor, 386 // Calling Visitor::OnClose() may result the destruction of the visitor,
211 // so we need to ensure we don't call it again. 387 // so we need to ensure we don't call it again.
212 visitor_ = NULL; 388 visitor_ = NULL;
213 visitor->OnClose(this); 389 visitor->OnClose(this);
214 } 390 }
215 } 391 }
216 392
217 } // namespace net 393 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698