Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 20227003: Land Recent QUIC changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Land Recent QUIC changes Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h" 8 #include "net/quic/quic_spdy_decompressor.h"
9 9
10 using base::StringPiece; 10 using base::StringPiece;
11 using std::min; 11 using std::min;
12 12
13 namespace net { 13 namespace net {
14 14
15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16 QuicSession* session) 16 QuicSession* session)
17 : sequencer_(this), 17 : sequencer_(this),
18 id_(id), 18 id_(id),
19 session_(session), 19 session_(session),
20 visitor_(NULL), 20 visitor_(NULL),
21 stream_bytes_read_(0), 21 stream_bytes_read_(0),
22 stream_bytes_written_(0), 22 stream_bytes_written_(0),
23 headers_decompressed_(false), 23 headers_decompressed_(false),
24 headers_id_(0), 24 headers_id_(0),
25 decompression_failed_(false),
25 stream_error_(QUIC_STREAM_NO_ERROR), 26 stream_error_(QUIC_STREAM_NO_ERROR),
26 connection_error_(QUIC_NO_ERROR), 27 connection_error_(QUIC_NO_ERROR),
27 read_side_closed_(false), 28 read_side_closed_(false),
28 write_side_closed_(false), 29 write_side_closed_(false),
29 fin_buffered_(false), 30 fin_buffered_(false),
30 fin_sent_(false) { 31 fin_sent_(false) {
31 } 32 }
32 33
33 ReliableQuicStream::~ReliableQuicStream() { 34 ReliableQuicStream::~ReliableQuicStream() {
34 } 35 }
(...skipping 15 matching lines...) Expand all
50 if (read_side_closed_) { 51 if (read_side_closed_) {
51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 52 DLOG(INFO) << "Ignoring frame " << frame.stream_id;
52 // We don't want to be reading: blackhole the data. 53 // We don't want to be reading: blackhole the data.
53 return true; 54 return true;
54 } 55 }
55 // Note: This count include duplicate data received. 56 // Note: This count include duplicate data received.
56 stream_bytes_read_ += frame.data.length(); 57 stream_bytes_read_ += frame.data.length();
57 58
58 bool accepted = sequencer_.OnStreamFrame(frame); 59 bool accepted = sequencer_.OnStreamFrame(frame);
59 60
60 if (frame.fin) {
61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size());
62 }
63
64 return accepted; 61 return accepted;
65 } 62 }
66 63
67 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 64 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
68 stream_error_ = error; 65 stream_error_ = error;
69 TerminateFromPeer(false); // Full close. 66 TerminateFromPeer(false); // Full close.
70 } 67 }
71 68
72 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 69 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
73 if (read_side_closed_ && write_side_closed_) { 70 if (read_side_closed_ && write_side_closed_) {
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
260 } 257 }
261 total_bytes_consumed += missing_size; 258 total_bytes_consumed += missing_size;
262 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 259 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
263 DCHECK_EQ(4u, headers_id_buffer_.length()); 260 DCHECK_EQ(4u, headers_id_buffer_.length());
264 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 261 memcpy(&headers_id_, headers_id_buffer_.data(), 4);
265 headers_id_buffer_.clear(); 262 headers_id_buffer_.clear();
266 data += missing_size; 263 data += missing_size;
267 data_len -= missing_size; 264 data_len -= missing_size;
268 } 265 }
269 DCHECK_NE(0u, headers_id_); 266 DCHECK_NE(0u, headers_id_);
267 if (data_len == 0) {
268 return total_bytes_consumed;
269 }
270 270
271 // Once the headers are finished, we simply pass the data through. 271 // Once the headers are finished, we simply pass the data through.
272 if (headers_decompressed_) { 272 if (headers_decompressed_) {
273 // Some buffered header data remains. 273 // Some buffered header data remains.
274 if (!decompressed_headers_.empty()) { 274 if (!decompressed_headers_.empty()) {
275 ProcessHeaderData(); 275 ProcessHeaderData();
276 } 276 }
277 if (decompressed_headers_.empty() && data_len > 0) { 277 if (decompressed_headers_.empty()) {
278 DVLOG(1) << "Delegating procesing to ProcessData"; 278 DVLOG(1) << "Delegating procesing to ProcessData";
279 total_bytes_consumed += ProcessData(data, data_len); 279 total_bytes_consumed += ProcessData(data, data_len);
280 } 280 }
281 return total_bytes_consumed; 281 return total_bytes_consumed;
282 } 282 }
283 283
284 QuicHeaderId current_header_id = 284 QuicHeaderId current_header_id =
285 session_->decompressor()->current_header_id(); 285 session_->decompressor()->current_header_id();
286 // Ensure that this header id looks sane. 286 // Ensure that this header id looks sane.
287 if (headers_id_ < current_header_id || 287 if (headers_id_ < current_header_id ||
288 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 288 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
289 DVLOG(1) << "Invalid headers for stream: " << id() 289 DVLOG(1) << "Invalid headers for stream: " << id()
290 << " header_id: " << headers_id_ 290 << " header_id: " << headers_id_
291 << " current_header_id: " << current_header_id; 291 << " current_header_id: " << current_header_id;
292 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 292 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
293 return total_bytes_consumed; 293 return total_bytes_consumed;
294 } 294 }
295 295
296 // If we are head-of-line blocked on decompression, then back up. 296 // If we are head-of-line blocked on decompression, then back up.
297 if (current_header_id != headers_id_) { 297 if (current_header_id != headers_id_) {
298 session_->MarkDecompressionBlocked(headers_id_, id()); 298 session_->MarkDecompressionBlocked(headers_id_, id());
299 DVLOG(1) << "Unable to decompress header data for stream: " << id() 299 DVLOG(1) << "Unable to decompress header data for stream: " << id()
300 << " header_id: " << headers_id_; 300 << " header_id: " << headers_id_;
301 return total_bytes_consumed; 301 return total_bytes_consumed;
302 } 302 }
303 303
304 // Decompressed data will be delivered to decompressed_headers_. 304 // Decompressed data will be delivered to decompressed_headers_.
305 size_t bytes_consumed = session_->decompressor()->DecompressData( 305 size_t bytes_consumed = session_->decompressor()->DecompressData(
306 StringPiece(data, data_len), this); 306 StringPiece(data, data_len), this);
307 DCHECK_NE(0u, bytes_consumed);
308 if (bytes_consumed > data_len) {
309 DCHECK(false) << "DecompressData returned illegal value";
310 OnDecompressionError();
311 return total_bytes_consumed;
312 }
307 total_bytes_consumed += bytes_consumed; 313 total_bytes_consumed += bytes_consumed;
314 data += bytes_consumed;
315 data_len -= bytes_consumed;
316
317 if (decompression_failed_) {
318 // The session will have been closed in OnDecompressionError.
319 return total_bytes_consumed;
320 }
308 321
309 // Headers are complete if the decompressor has moved on to the 322 // Headers are complete if the decompressor has moved on to the
310 // next stream. 323 // next stream.
311 headers_decompressed_ = 324 headers_decompressed_ =
312 session_->decompressor()->current_header_id() != headers_id_; 325 session_->decompressor()->current_header_id() != headers_id_;
326 if (!headers_decompressed_) {
327 DCHECK_EQ(0u, data_len);
328 }
313 329
314 ProcessHeaderData(); 330 ProcessHeaderData();
315 331
332 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
333 return total_bytes_consumed;
334 }
335
316 // We have processed all of the decompressed data but we might 336 // We have processed all of the decompressed data but we might
317 // have some more raw data to process. 337 // have some more raw data to process.
318 if (decompressed_headers_.empty() && bytes_consumed < data_len) { 338 if (data_len > 0) {
319 total_bytes_consumed += ProcessData(data + bytes_consumed, 339 total_bytes_consumed += ProcessData(data, data_len);
320 data_len - bytes_consumed);
321 } 340 }
322 341
323 // The sequencer will push any additional buffered frames if this data 342 // The sequencer will push any additional buffered frames if this data
324 // has been completely consumed. 343 // has been completely consumed.
325 return total_bytes_consumed; 344 return total_bytes_consumed;
326 } 345 }
327 346
328 uint32 ReliableQuicStream::ProcessHeaderData() { 347 uint32 ReliableQuicStream::ProcessHeaderData() {
329 if (decompressed_headers_.empty()) { 348 if (decompressed_headers_.empty()) {
330 return 0; 349 return 0;
331 } 350 }
332 351
333 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 352 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
334 decompressed_headers_.length()); 353 decompressed_headers_.length());
335 if (bytes_processed == decompressed_headers_.length()) { 354 if (bytes_processed == decompressed_headers_.length()) {
336 decompressed_headers_.clear(); 355 decompressed_headers_.clear();
337 } else { 356 } else {
338 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 357 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
339 } 358 }
340 return bytes_processed; 359 return bytes_processed;
341 } 360 }
342 361
343 void ReliableQuicStream::OnDecompressorAvailable() { 362 void ReliableQuicStream::OnDecompressorAvailable() {
344 DCHECK_EQ(headers_id_, 363 DCHECK_EQ(headers_id_,
345 session_->decompressor()->current_header_id()); 364 session_->decompressor()->current_header_id());
346 DCHECK(!headers_decompressed_); 365 DCHECK(!headers_decompressed_);
366 DCHECK(!decompression_failed_);
347 DCHECK_EQ(0u, decompressed_headers_.length()); 367 DCHECK_EQ(0u, decompressed_headers_.length());
348 368
349 size_t total_bytes_consumed = 0; 369 size_t total_bytes_consumed = 0;
350 struct iovec iovecs[5]; 370 struct iovec iovecs[5];
351 while (!headers_decompressed_) { 371 while (!headers_decompressed_) {
352 size_t num_iovecs = 372 size_t num_iovecs =
353 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); 373 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
354 374
355 if (num_iovecs == 0) { 375 if (num_iovecs == 0) {
356 return; 376 return;
357 } 377 }
358 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { 378 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) {
359 total_bytes_consumed += session_->decompressor()->DecompressData( 379 total_bytes_consumed += session_->decompressor()->DecompressData(
360 StringPiece(static_cast<char*>(iovecs[i].iov_base), 380 StringPiece(static_cast<char*>(iovecs[i].iov_base),
361 iovecs[i].iov_len), this); 381 iovecs[i].iov_len), this);
382 if (decompression_failed_) {
383 return;
384 }
362 385
363 headers_decompressed_ = 386 headers_decompressed_ =
364 session_->decompressor()->current_header_id() != headers_id_; 387 session_->decompressor()->current_header_id() != headers_id_;
365 } 388 }
366 } 389 }
367 390
368 // Either the headers are complete, or the all data as been consumed. 391 // Either the headers are complete, or the all data as been consumed.
369 sequencer_.MarkConsumed(total_bytes_consumed); 392 sequencer_.MarkConsumed(total_bytes_consumed);
370 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 393 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
371 if (IsHalfClosed()) { 394 if (IsHalfClosed()) {
372 TerminateFromPeer(true); 395 TerminateFromPeer(true);
373 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 396 } else if (headers_decompressed_ && decompressed_headers_.empty()) {
374 sequencer_.FlushBufferedFrames(); 397 sequencer_.FlushBufferedFrames();
375 } 398 }
376 } 399 }
377 400
378 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 401 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
379 data.AppendToString(&decompressed_headers_); 402 data.AppendToString(&decompressed_headers_);
380 return true; 403 return true;
381 } 404 }
382 405
383 void ReliableQuicStream::OnDecompressionError() { 406 void ReliableQuicStream::OnDecompressionError() {
407 DCHECK(!decompression_failed_);
408 decompression_failed_ = true;
384 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 409 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
385 } 410 }
386 411
387 412
388 void ReliableQuicStream::CloseWriteSide() { 413 void ReliableQuicStream::CloseWriteSide() {
389 if (write_side_closed_) { 414 if (write_side_closed_) {
390 return; 415 return;
391 } 416 }
392 DLOG(INFO) << "Done writing to stream " << id(); 417 DLOG(INFO) << "Done writing to stream " << id();
393 418
(...skipping 11 matching lines...) Expand all
405 if (visitor_) { 430 if (visitor_) {
406 Visitor* visitor = visitor_; 431 Visitor* visitor = visitor_;
407 // Calling Visitor::OnClose() may result the destruction of the visitor, 432 // Calling Visitor::OnClose() may result the destruction of the visitor,
408 // so we need to ensure we don't call it again. 433 // so we need to ensure we don't call it again.
409 visitor_ = NULL; 434 visitor_ = NULL;
410 visitor->OnClose(this); 435 visitor->OnClose(this);
411 } 436 }
412 } 437 }
413 438
414 } // namespace net 439 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698