Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(176)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 14816006: Land Recent QUIC changes (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added missing NET_PRIVATE_EXPORT to QuicWallTime Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h" 8 #include "net/quic/quic_spdy_decompressor.h"
9 9
10 using base::StringPiece; 10 using base::StringPiece;
11 using std::min; 11 using std::min;
12 12
13 namespace net { 13 namespace net {
14 14
15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16 QuicSession* session) 16 QuicSession* session)
17 : sequencer_(this), 17 : sequencer_(this),
18 id_(id), 18 id_(id),
19 session_(session), 19 session_(session),
20 visitor_(NULL), 20 visitor_(NULL),
21 stream_bytes_read_(0), 21 stream_bytes_read_(0),
22 stream_bytes_written_(0), 22 stream_bytes_written_(0),
23 headers_complete_(false), 23 headers_decompressed_(false),
24 headers_id_(0), 24 headers_id_(0),
25 stream_error_(QUIC_STREAM_NO_ERROR), 25 stream_error_(QUIC_STREAM_NO_ERROR),
26 connection_error_(QUIC_NO_ERROR), 26 connection_error_(QUIC_NO_ERROR),
27 read_side_closed_(false), 27 read_side_closed_(false),
28 write_side_closed_(false), 28 write_side_closed_(false),
29 fin_buffered_(false), 29 fin_buffered_(false),
30 fin_sent_(false) { 30 fin_sent_(false) {
31 } 31 }
32 32
33 ReliableQuicStream::~ReliableQuicStream() { 33 ReliableQuicStream::~ReliableQuicStream() {
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 } 92 }
93 CloseReadSide(); 93 CloseReadSide();
94 } 94 }
95 95
96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97 stream_error_ = error; 97 stream_error_ = error;
98 session()->SendRstStream(id(), error); 98 session()->SendRstStream(id(), error);
99 } 99 }
100 100
101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { 101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
102 if (headers_complete_ && decompressed_headers_.empty()) { 102 if (headers_decompressed_ && decompressed_headers_.empty()) {
103 return sequencer_.Readv(iov, iov_len); 103 return sequencer_.Readv(iov, iov_len);
104 } 104 }
105 size_t bytes_consumed = 0; 105 size_t bytes_consumed = 0;
106 int iov_index = 0; 106 int iov_index = 0;
107 while (iov_index < iov_len && 107 while (iov_index < iov_len &&
108 decompressed_headers_.length() > bytes_consumed) { 108 decompressed_headers_.length() > bytes_consumed) {
109 int bytes_to_read = min(iov[iov_index].iov_len, 109 int bytes_to_read = min(iov[iov_index].iov_len,
110 decompressed_headers_.length() - bytes_consumed); 110 decompressed_headers_.length() - bytes_consumed);
111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
112 memcpy(iov_ptr, 112 memcpy(iov_ptr,
113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
114 bytes_consumed += bytes_to_read; 114 bytes_consumed += bytes_to_read;
115 ++iov_index; 115 ++iov_index;
116 } 116 }
117 decompressed_headers_.erase(0, bytes_consumed); 117 decompressed_headers_.erase(0, bytes_consumed);
118 return bytes_consumed; 118 return bytes_consumed;
119 } 119 }
120 120
121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { 121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
122 if (headers_complete_ && decompressed_headers_.empty()) { 122 if (headers_decompressed_ && decompressed_headers_.empty()) {
123 return sequencer_.GetReadableRegions(iov, iov_len); 123 return sequencer_.GetReadableRegions(iov, iov_len);
124 } 124 }
125 if (iov_len == 0) { 125 if (iov_len == 0) {
126 return 0; 126 return 0;
127 } 127 }
128 iov[0].iov_base = static_cast<void*>( 128 iov[0].iov_base = static_cast<void*>(
129 const_cast<char*>(decompressed_headers_.data())); 129 const_cast<char*>(decompressed_headers_.data()));
130 iov[0].iov_len = decompressed_headers_.length(); 130 iov[0].iov_len = decompressed_headers_.length();
131 return 1; 131 return 1;
132 } 132 }
133 133
134 bool ReliableQuicStream::IsHalfClosed() const { 134 bool ReliableQuicStream::IsHalfClosed() const {
135 if (!headers_complete_ || !decompressed_headers_.empty()) { 135 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
136 return false; 136 return false;
137 } 137 }
138 return sequencer_.IsHalfClosed(); 138 return sequencer_.IsHalfClosed();
139 } 139 }
140 140
141 bool ReliableQuicStream::IsClosed() const { 141 bool ReliableQuicStream::IsClosed() const {
142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
143 } 143 }
144 144
145 bool ReliableQuicStream::HasBytesToRead() const { 145 bool ReliableQuicStream::HasBytesToRead() const {
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
248 DCHECK_EQ(4u, headers_id_buffer_.length()); 248 DCHECK_EQ(4u, headers_id_buffer_.length());
249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 249 memcpy(&headers_id_, headers_id_buffer_.data(), 4);
250 headers_id_buffer_.clear(); 250 headers_id_buffer_.clear();
251 data += missing_size; 251 data += missing_size;
252 data_len -= missing_size; 252 data_len -= missing_size;
253 } 253 }
254 DCHECK_NE(0u, headers_id_); 254 DCHECK_NE(0u, headers_id_);
255 255
256 // Once the headers are finished, we simply pass the data through. 256 // Once the headers are finished, we simply pass the data through.
257 if (headers_complete_ && decompressed_headers_.empty()) { 257 if (headers_decompressed_) {
258 DVLOG(1) << "Delegating procesing to ProcessData"; 258 // Some buffered header data remains.
259 return total_bytes_consumed + ProcessData(data, data_len); 259 if (!decompressed_headers_.empty()) {
260 ProcessHeaderData();
261 }
262 if (decompressed_headers_.empty()) {
263 DVLOG(1) << "Delegating procesing to ProcessData";
264 total_bytes_consumed += ProcessData(data, data_len);
265 }
266 return total_bytes_consumed;
260 } 267 }
261 268
262 QuicHeaderId current_header_id = 269 QuicHeaderId current_header_id =
263 session_->decompressor()->current_header_id(); 270 session_->decompressor()->current_header_id();
264 // Ensure that this header id looks sane. 271 // Ensure that this header id looks sane.
265 if (headers_id_ < current_header_id || 272 if (headers_id_ < current_header_id ||
266 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 273 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
267 DVLOG(1) << "Invalud headers for stream: " << id() 274 DVLOG(1) << "Invalid headers for stream: " << id()
268 << " header_id: " << headers_id_; 275 << " header_id: " << headers_id_
276 << " current_header_id: " << current_header_id;
269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 277 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
278 return total_bytes_consumed;
270 } 279 }
271 280
272 // If we are head-of-line blocked on decompression, then back up. 281 // If we are head-of-line blocked on decompression, then back up.
273 if (current_header_id != headers_id_) { 282 if (current_header_id != headers_id_) {
274 session_->MarkDecompressionBlocked(headers_id_, id()); 283 session_->MarkDecompressionBlocked(headers_id_, id());
275 DVLOG(1) << "Unable to decmpress header data for stream: " << id() 284 DVLOG(1) << "Unable to decmpress header data for stream: " << id()
276 << " header_id: " << headers_id_; 285 << " header_id: " << headers_id_;
277 return total_bytes_consumed; 286 return total_bytes_consumed;
278 } 287 }
279 288
280 // Decompressed data will be delivered to decompressed_headers_. 289 // Decompressed data will be delivered to decompressed_headers_.
281 size_t bytes_consumed = session_->decompressor()->DecompressData( 290 size_t bytes_consumed = session_->decompressor()->DecompressData(
282 StringPiece(data, data_len), this); 291 StringPiece(data, data_len), this);
283 total_bytes_consumed += bytes_consumed; 292 total_bytes_consumed += bytes_consumed;
284 293
285 // Headers are complete if the decompressor has moved on to the 294 // Headers are complete if the decompressor has moved on to the
286 // next stream. 295 // next stream.
287 headers_complete_ = 296 headers_decompressed_ =
288 session_->decompressor()->current_header_id() != headers_id_; 297 session_->decompressor()->current_header_id() != headers_id_;
289 298
290 if (!decompressed_headers_.empty()) { 299 ProcessHeaderData();
291 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
292 decompressed_headers_.length());
293 if (bytes_processed == decompressed_headers_.length()) {
294 decompressed_headers_.clear();
295 } else {
296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
297 }
298 }
299 300
300 // We have processed all of the decompressed data but we might 301 // We have processed all of the decompressed data but we might
301 // have some more raw data to process. 302 // have some more raw data to process.
302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { 303 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
303 total_bytes_consumed += ProcessData(data + bytes_consumed, 304 total_bytes_consumed += ProcessData(data + bytes_consumed,
304 data_len - bytes_consumed); 305 data_len - bytes_consumed);
305 } 306 }
306 307
307 // The sequencer will push any additional buffered frames if this data 308 // The sequencer will push any additional buffered frames if this data
308 // has been completely consumed. 309 // has been completely consumed.
(...skipping 11 matching lines...) Expand all
320 decompressed_headers_.clear(); 321 decompressed_headers_.clear();
321 } else { 322 } else {
322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 323 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
323 } 324 }
324 return bytes_processed; 325 return bytes_processed;
325 } 326 }
326 327
327 void ReliableQuicStream::OnDecompressorAvailable() { 328 void ReliableQuicStream::OnDecompressorAvailable() {
328 DCHECK_EQ(headers_id_, 329 DCHECK_EQ(headers_id_,
329 session_->decompressor()->current_header_id()); 330 session_->decompressor()->current_header_id());
330 DCHECK(!headers_complete_); 331 DCHECK(!headers_decompressed_);
331 DCHECK_EQ(0u, decompressed_headers_.length()); 332 DCHECK_EQ(0u, decompressed_headers_.length());
332 333
333 size_t total_bytes_consumed = 0; 334 size_t total_bytes_consumed = 0;
334 struct iovec iovecs[5]; 335 struct iovec iovecs[5];
335 while (!headers_complete_) { 336 while (!headers_decompressed_) {
336 size_t num_iovecs = 337 size_t num_iovecs =
337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 338 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
338 339
339 if (num_iovecs == 0) { 340 if (num_iovecs == 0) {
340 return; 341 return;
341 } 342 }
342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) { 343 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
343 total_bytes_consumed += session_->decompressor()->DecompressData( 344 total_bytes_consumed += session_->decompressor()->DecompressData(
344 StringPiece(static_cast<char*>(iovecs[i].iov_base), 345 StringPiece(static_cast<char*>(iovecs[i].iov_base),
345 iovecs[i].iov_len), this); 346 iovecs[i].iov_len), this);
346 347
347 headers_complete_ = 348 headers_decompressed_ =
348 session_->decompressor()->current_header_id() != headers_id_; 349 session_->decompressor()->current_header_id() != headers_id_;
349 } 350 }
350 } 351 }
351 352
352 // Either the headers are complete, or the all data as been consumed. 353 // Either the headers are complete, or the all data as been consumed.
353 sequencer_.MarkConsumed(total_bytes_consumed); 354 sequencer_.MarkConsumed(total_bytes_consumed);
354 355
355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 356 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
356 357
357 if (headers_complete_ && decompressed_headers_.empty()) { 358 if (headers_decompressed_ && decompressed_headers_.empty()) {
358 sequencer_.FlushBufferedFrames(); 359 sequencer_.FlushBufferedFrames();
359 } 360 }
360 } 361 }
361 362
362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 363 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
363 data.AppendToString(&decompressed_headers_); 364 data.AppendToString(&decompressed_headers_);
364 return true; 365 return true;
365 } 366 }
366 367
367 void ReliableQuicStream::CloseWriteSide() { 368 void ReliableQuicStream::CloseWriteSide() {
(...skipping 16 matching lines...) Expand all
384 if (visitor_) { 385 if (visitor_) {
385 Visitor* visitor = visitor_; 386 Visitor* visitor = visitor_;
386 // Calling Visitor::OnClose() may result the destruction of the visitor, 387 // Calling Visitor::OnClose() may result the destruction of the visitor,
387 // so we need to ensure we don't call it again. 388 // so we need to ensure we don't call it again.
388 visitor_ = NULL; 389 visitor_ = NULL;
389 visitor->OnClose(this); 390 visitor->OnClose(this);
390 } 391 }
391 } 392 }
392 393
393 } // namespace net 394 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698