| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
| 8 #include "net/quic/quic_spdy_decompressor.h" | 8 #include "net/quic/quic_spdy_decompressor.h" |
| 9 | 9 |
| 10 using base::StringPiece; | 10 using base::StringPiece; |
| 11 using std::min; | 11 using std::min; |
| 12 | 12 |
| 13 namespace net { | 13 namespace net { |
| 14 | 14 |
| 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
| 16 QuicSession* session) | 16 QuicSession* session) |
| 17 : sequencer_(this), | 17 : sequencer_(this), |
| 18 id_(id), | 18 id_(id), |
| 19 session_(session), | 19 session_(session), |
| 20 visitor_(NULL), | 20 visitor_(NULL), |
| 21 stream_bytes_read_(0), | 21 stream_bytes_read_(0), |
| 22 stream_bytes_written_(0), | 22 stream_bytes_written_(0), |
| 23 headers_decompressed_(false), | 23 headers_decompressed_(false), |
| 24 headers_id_(0), | 24 headers_id_(0), |
| 25 decompression_failed_(false), |
| 25 stream_error_(QUIC_STREAM_NO_ERROR), | 26 stream_error_(QUIC_STREAM_NO_ERROR), |
| 26 connection_error_(QUIC_NO_ERROR), | 27 connection_error_(QUIC_NO_ERROR), |
| 27 read_side_closed_(false), | 28 read_side_closed_(false), |
| 28 write_side_closed_(false), | 29 write_side_closed_(false), |
| 29 fin_buffered_(false), | 30 fin_buffered_(false), |
| 30 fin_sent_(false) { | 31 fin_sent_(false) { |
| 31 } | 32 } |
| 32 | 33 |
| 33 ReliableQuicStream::~ReliableQuicStream() { | 34 ReliableQuicStream::~ReliableQuicStream() { |
| 34 } | 35 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 50 if (read_side_closed_) { | 51 if (read_side_closed_) { |
| 51 DLOG(INFO) << "Ignoring frame " << frame.stream_id; | 52 DLOG(INFO) << "Ignoring frame " << frame.stream_id; |
| 52 // We don't want to be reading: blackhole the data. | 53 // We don't want to be reading: blackhole the data. |
| 53 return true; | 54 return true; |
| 54 } | 55 } |
| 55 // Note: This count include duplicate data received. | 56 // Note: This count include duplicate data received. |
| 56 stream_bytes_read_ += frame.data.length(); | 57 stream_bytes_read_ += frame.data.length(); |
| 57 | 58 |
| 58 bool accepted = sequencer_.OnStreamFrame(frame); | 59 bool accepted = sequencer_.OnStreamFrame(frame); |
| 59 | 60 |
| 60 if (frame.fin) { | |
| 61 sequencer_.CloseStreamAtOffset(frame.offset + frame.data.size()); | |
| 62 } | |
| 63 | |
| 64 return accepted; | 61 return accepted; |
| 65 } | 62 } |
| 66 | 63 |
| 67 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { | 64 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { |
| 68 stream_error_ = error; | 65 stream_error_ = error; |
| 69 TerminateFromPeer(false); // Full close. | 66 TerminateFromPeer(false); // Full close. |
| 70 } | 67 } |
| 71 | 68 |
| 72 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { | 69 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { |
| 73 if (read_side_closed_ && write_side_closed_) { | 70 if (read_side_closed_ && write_side_closed_) { |
| (...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 260 } | 257 } |
| 261 total_bytes_consumed += missing_size; | 258 total_bytes_consumed += missing_size; |
| 262 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); | 259 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); |
| 263 DCHECK_EQ(4u, headers_id_buffer_.length()); | 260 DCHECK_EQ(4u, headers_id_buffer_.length()); |
| 264 memcpy(&headers_id_, headers_id_buffer_.data(), 4); | 261 memcpy(&headers_id_, headers_id_buffer_.data(), 4); |
| 265 headers_id_buffer_.clear(); | 262 headers_id_buffer_.clear(); |
| 266 data += missing_size; | 263 data += missing_size; |
| 267 data_len -= missing_size; | 264 data_len -= missing_size; |
| 268 } | 265 } |
| 269 DCHECK_NE(0u, headers_id_); | 266 DCHECK_NE(0u, headers_id_); |
| 267 if (data_len == 0) { |
| 268 return total_bytes_consumed; |
| 269 } |
| 270 | 270 |
| 271 // Once the headers are finished, we simply pass the data through. | 271 // Once the headers are finished, we simply pass the data through. |
| 272 if (headers_decompressed_) { | 272 if (headers_decompressed_) { |
| 273 // Some buffered header data remains. | 273 // Some buffered header data remains. |
| 274 if (!decompressed_headers_.empty()) { | 274 if (!decompressed_headers_.empty()) { |
| 275 ProcessHeaderData(); | 275 ProcessHeaderData(); |
| 276 } | 276 } |
| 277 if (decompressed_headers_.empty() && data_len > 0) { | 277 if (decompressed_headers_.empty()) { |
| 278 DVLOG(1) << "Delegating procesing to ProcessData"; | 278 DVLOG(1) << "Delegating procesing to ProcessData"; |
| 279 total_bytes_consumed += ProcessData(data, data_len); | 279 total_bytes_consumed += ProcessData(data, data_len); |
| 280 } | 280 } |
| 281 return total_bytes_consumed; | 281 return total_bytes_consumed; |
| 282 } | 282 } |
| 283 | 283 |
| 284 QuicHeaderId current_header_id = | 284 QuicHeaderId current_header_id = |
| 285 session_->decompressor()->current_header_id(); | 285 session_->decompressor()->current_header_id(); |
| 286 // Ensure that this header id looks sane. | 286 // Ensure that this header id looks sane. |
| 287 if (headers_id_ < current_header_id || | 287 if (headers_id_ < current_header_id || |
| 288 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | 288 headers_id_ > kMaxHeaderIdDelta + current_header_id) { |
| 289 DVLOG(1) << "Invalid headers for stream: " << id() | 289 DVLOG(1) << "Invalid headers for stream: " << id() |
| 290 << " header_id: " << headers_id_ | 290 << " header_id: " << headers_id_ |
| 291 << " current_header_id: " << current_header_id; | 291 << " current_header_id: " << current_header_id; |
| 292 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | 292 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); |
| 293 return total_bytes_consumed; | 293 return total_bytes_consumed; |
| 294 } | 294 } |
| 295 | 295 |
| 296 // If we are head-of-line blocked on decompression, then back up. | 296 // If we are head-of-line blocked on decompression, then back up. |
| 297 if (current_header_id != headers_id_) { | 297 if (current_header_id != headers_id_) { |
| 298 session_->MarkDecompressionBlocked(headers_id_, id()); | 298 session_->MarkDecompressionBlocked(headers_id_, id()); |
| 299 DVLOG(1) << "Unable to decompress header data for stream: " << id() | 299 DVLOG(1) << "Unable to decompress header data for stream: " << id() |
| 300 << " header_id: " << headers_id_; | 300 << " header_id: " << headers_id_; |
| 301 return total_bytes_consumed; | 301 return total_bytes_consumed; |
| 302 } | 302 } |
| 303 | 303 |
| 304 // Decompressed data will be delivered to decompressed_headers_. | 304 // Decompressed data will be delivered to decompressed_headers_. |
| 305 size_t bytes_consumed = session_->decompressor()->DecompressData( | 305 size_t bytes_consumed = session_->decompressor()->DecompressData( |
| 306 StringPiece(data, data_len), this); | 306 StringPiece(data, data_len), this); |
| 307 DCHECK_NE(0u, bytes_consumed); |
| 308 if (bytes_consumed > data_len) { |
| 309 DCHECK(false) << "DecompressData returned illegal value"; |
| 310 OnDecompressionError(); |
| 311 return total_bytes_consumed; |
| 312 } |
| 307 total_bytes_consumed += bytes_consumed; | 313 total_bytes_consumed += bytes_consumed; |
| 314 data += bytes_consumed; |
| 315 data_len -= bytes_consumed; |
| 316 |
| 317 if (decompression_failed_) { |
| 318 // The session will have been closed in OnDecompressionError. |
| 319 return total_bytes_consumed; |
| 320 } |
| 308 | 321 |
| 309 // Headers are complete if the decompressor has moved on to the | 322 // Headers are complete if the decompressor has moved on to the |
| 310 // next stream. | 323 // next stream. |
| 311 headers_decompressed_ = | 324 headers_decompressed_ = |
| 312 session_->decompressor()->current_header_id() != headers_id_; | 325 session_->decompressor()->current_header_id() != headers_id_; |
| 326 if (!headers_decompressed_) { |
| 327 DCHECK_EQ(0u, data_len); |
| 328 } |
| 313 | 329 |
| 314 ProcessHeaderData(); | 330 ProcessHeaderData(); |
| 315 | 331 |
| 332 if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
| 333 return total_bytes_consumed; |
| 334 } |
| 335 |
| 316 // We have processed all of the decompressed data but we might | 336 // We have processed all of the decompressed data but we might |
| 317 // have some more raw data to process. | 337 // have some more raw data to process. |
| 318 if (decompressed_headers_.empty() && bytes_consumed < data_len) { | 338 if (data_len > 0) { |
| 319 total_bytes_consumed += ProcessData(data + bytes_consumed, | 339 total_bytes_consumed += ProcessData(data, data_len); |
| 320 data_len - bytes_consumed); | |
| 321 } | 340 } |
| 322 | 341 |
| 323 // The sequencer will push any additional buffered frames if this data | 342 // The sequencer will push any additional buffered frames if this data |
| 324 // has been completely consumed. | 343 // has been completely consumed. |
| 325 return total_bytes_consumed; | 344 return total_bytes_consumed; |
| 326 } | 345 } |
| 327 | 346 |
| 328 uint32 ReliableQuicStream::ProcessHeaderData() { | 347 uint32 ReliableQuicStream::ProcessHeaderData() { |
| 329 if (decompressed_headers_.empty()) { | 348 if (decompressed_headers_.empty()) { |
| 330 return 0; | 349 return 0; |
| 331 } | 350 } |
| 332 | 351 |
| 333 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | 352 size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
| 334 decompressed_headers_.length()); | 353 decompressed_headers_.length()); |
| 335 if (bytes_processed == decompressed_headers_.length()) { | 354 if (bytes_processed == decompressed_headers_.length()) { |
| 336 decompressed_headers_.clear(); | 355 decompressed_headers_.clear(); |
| 337 } else { | 356 } else { |
| 338 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | 357 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
| 339 } | 358 } |
| 340 return bytes_processed; | 359 return bytes_processed; |
| 341 } | 360 } |
| 342 | 361 |
| 343 void ReliableQuicStream::OnDecompressorAvailable() { | 362 void ReliableQuicStream::OnDecompressorAvailable() { |
| 344 DCHECK_EQ(headers_id_, | 363 DCHECK_EQ(headers_id_, |
| 345 session_->decompressor()->current_header_id()); | 364 session_->decompressor()->current_header_id()); |
| 346 DCHECK(!headers_decompressed_); | 365 DCHECK(!headers_decompressed_); |
| 366 DCHECK(!decompression_failed_); |
| 347 DCHECK_EQ(0u, decompressed_headers_.length()); | 367 DCHECK_EQ(0u, decompressed_headers_.length()); |
| 348 | 368 |
| 349 size_t total_bytes_consumed = 0; | 369 size_t total_bytes_consumed = 0; |
| 350 struct iovec iovecs[5]; | 370 struct iovec iovecs[5]; |
| 351 while (!headers_decompressed_) { | 371 while (!headers_decompressed_) { |
| 352 size_t num_iovecs = | 372 size_t num_iovecs = |
| 353 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); | 373 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); |
| 354 | 374 |
| 355 if (num_iovecs == 0) { | 375 if (num_iovecs == 0) { |
| 356 return; | 376 return; |
| 357 } | 377 } |
| 358 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { | 378 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { |
| 359 total_bytes_consumed += session_->decompressor()->DecompressData( | 379 total_bytes_consumed += session_->decompressor()->DecompressData( |
| 360 StringPiece(static_cast<char*>(iovecs[i].iov_base), | 380 StringPiece(static_cast<char*>(iovecs[i].iov_base), |
| 361 iovecs[i].iov_len), this); | 381 iovecs[i].iov_len), this); |
| 382 if (decompression_failed_) { |
| 383 return; |
| 384 } |
| 362 | 385 |
| 363 headers_decompressed_ = | 386 headers_decompressed_ = |
| 364 session_->decompressor()->current_header_id() != headers_id_; | 387 session_->decompressor()->current_header_id() != headers_id_; |
| 365 } | 388 } |
| 366 } | 389 } |
| 367 | 390 |
| 368 // Either the headers are complete, or the all data as been consumed. | 391 // Either the headers are complete, or the all data as been consumed. |
| 369 sequencer_.MarkConsumed(total_bytes_consumed); | 392 sequencer_.MarkConsumed(total_bytes_consumed); |
| 370 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | 393 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. |
| 371 if (IsHalfClosed()) { | 394 if (IsHalfClosed()) { |
| 372 TerminateFromPeer(true); | 395 TerminateFromPeer(true); |
| 373 } else if (headers_decompressed_ && decompressed_headers_.empty()) { | 396 } else if (headers_decompressed_ && decompressed_headers_.empty()) { |
| 374 sequencer_.FlushBufferedFrames(); | 397 sequencer_.FlushBufferedFrames(); |
| 375 } | 398 } |
| 376 } | 399 } |
| 377 | 400 |
| 378 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { | 401 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { |
| 379 data.AppendToString(&decompressed_headers_); | 402 data.AppendToString(&decompressed_headers_); |
| 380 return true; | 403 return true; |
| 381 } | 404 } |
| 382 | 405 |
| 383 void ReliableQuicStream::OnDecompressionError() { | 406 void ReliableQuicStream::OnDecompressionError() { |
| 407 DCHECK(!decompression_failed_); |
| 408 decompression_failed_ = true; |
| 384 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); | 409 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); |
| 385 } | 410 } |
| 386 | 411 |
| 387 | 412 |
| 388 void ReliableQuicStream::CloseWriteSide() { | 413 void ReliableQuicStream::CloseWriteSide() { |
| 389 if (write_side_closed_) { | 414 if (write_side_closed_) { |
| 390 return; | 415 return; |
| 391 } | 416 } |
| 392 DLOG(INFO) << "Done writing to stream " << id(); | 417 DLOG(INFO) << "Done writing to stream " << id(); |
| 393 | 418 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 405 if (visitor_) { | 430 if (visitor_) { |
| 406 Visitor* visitor = visitor_; | 431 Visitor* visitor = visitor_; |
| 407 // Calling Visitor::OnClose() may result the destruction of the visitor, | 432 // Calling Visitor::OnClose() may result the destruction of the visitor, |
| 408 // so we need to ensure we don't call it again. | 433 // so we need to ensure we don't call it again. |
| 409 visitor_ = NULL; | 434 visitor_ = NULL; |
| 410 visitor->OnClose(this); | 435 visitor->OnClose(this); |
| 411 } | 436 } |
| 412 } | 437 } |
| 413 | 438 |
| 414 } // namespace net | 439 } // namespace net |
| OLD | NEW |