| Index: net/quic/reliable_quic_stream.cc
|
| diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc
|
| index adce4082ab3cb4bec5f33c6c48767be8325d707a..cdcec21b89409dd15e053300f569891a87443575 100644
|
| --- a/net/quic/reliable_quic_stream.cc
|
| +++ b/net/quic/reliable_quic_stream.cc
|
| @@ -5,8 +5,10 @@
|
| #include "net/quic/reliable_quic_stream.h"
|
|
|
| #include "net/quic/quic_session.h"
|
| +#include "net/quic/quic_spdy_decompressor.h"
|
|
|
| using base::StringPiece;
|
| +using std::min;
|
|
|
| namespace net {
|
|
|
| @@ -18,6 +20,8 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
|
| visitor_(NULL),
|
| stream_bytes_read_(0),
|
| stream_bytes_written_(0),
|
| + headers_complete_(false),
|
| + headers_id_(0),
|
| stream_error_(QUIC_STREAM_NO_ERROR),
|
| connection_error_(QUIC_NO_ERROR),
|
| read_side_closed_(false),
|
| @@ -94,7 +98,43 @@ void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
|
| session()->SendRstStream(id(), error);
|
| }
|
|
|
| +int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
|
| + if (headers_complete_ && decompressed_headers_.empty()) {
|
| + return sequencer_.Readv(iov, iov_len);
|
| + }
|
| + size_t bytes_consumed = 0;
|
| + int iov_index = 0;
|
| + while (iov_index < iov_len &&
|
| + decompressed_headers_.length() > bytes_consumed) {
|
| + int bytes_to_read = min(iov[iov_index].iov_len,
|
| + decompressed_headers_.length() - bytes_consumed);
|
| + char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
|
| + memcpy(iov_ptr,
|
| + decompressed_headers_.data() + bytes_consumed, bytes_to_read);
|
| + bytes_consumed += bytes_to_read;
|
| + ++iov_index;
|
| + }
|
| + decompressed_headers_.erase(0, bytes_consumed);
|
| + return bytes_consumed;
|
| +}
|
| +
|
| +int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
|
| + if (headers_complete_ && decompressed_headers_.empty()) {
|
| + return sequencer_.GetReadableRegions(iov, iov_len);
|
| + }
|
| + if (iov_len == 0) {
|
| + return 0;
|
| + }
|
| + iov[0].iov_base = static_cast<void*>(
|
| + const_cast<char*>(decompressed_headers_.data()));
|
| + iov[0].iov_len = decompressed_headers_.length();
|
| + return 1;
|
| +}
|
| +
|
| bool ReliableQuicStream::IsHalfClosed() const {
|
| + if (!headers_complete_ || !decompressed_headers_.empty()) {
|
| + return false;
|
| + }
|
| return sequencer_.IsHalfClosed();
|
| }
|
|
|
| @@ -103,7 +143,7 @@ bool ReliableQuicStream::IsClosed() const {
|
| }
|
|
|
| bool ReliableQuicStream::HasBytesToRead() const {
|
| - return sequencer_.HasBytesToRead();
|
| + return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
|
| }
|
|
|
| const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
|
| @@ -188,6 +228,142 @@ void ReliableQuicStream::CloseReadSide() {
|
| }
|
| }
|
|
|
| +uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
|
| + if (id() == kCryptoStreamId) {
|
| + // The crypto stream does not use compression.
|
| + return ProcessData(data, data_len);
|
| + }
|
| + uint32 total_bytes_consumed = 0;
|
| + if (headers_id_ == 0u) {
|
| + // The headers ID has not yet been read. Strip it from the beginning of
|
| + // the data stream.
|
| + DCHECK_GT(4u, headers_id_buffer_.length());
|
| + size_t missing_size = 4 - headers_id_buffer_.length();
|
| + if (data_len < missing_size) {
|
| + StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
|
| + return data_len;
|
| + }
|
| + total_bytes_consumed += missing_size;
|
| + StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
|
| + DCHECK_EQ(4u, headers_id_buffer_.length());
|
| + memcpy(&headers_id_, headers_id_buffer_.data(), 4);
|
| + headers_id_buffer_.clear();
|
| + data += missing_size;
|
| + data_len -= missing_size;
|
| + }
|
| + DCHECK_NE(0u, headers_id_);
|
| +
|
| + // Once the headers are finished, we simply pass the data through.
|
| + if (headers_complete_ && decompressed_headers_.empty()) {
|
| + DVLOG(1) << "Delegating procesing to ProcessData";
|
| + return total_bytes_consumed + ProcessData(data, data_len);
|
| + }
|
| +
|
| + QuicHeaderId current_header_id =
|
| + session_->decompressor()->current_header_id();
|
| + // Ensure that this header id looks sane.
|
| + if (headers_id_ < current_header_id ||
|
| + headers_id_ > kMaxHeaderIdDelta + current_header_id) {
|
| + DVLOG(1) << "Invalud headers for stream: " << id()
|
| + << " header_id: " << headers_id_;
|
| + session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
|
| + }
|
| +
|
| + // If we are head-of-line blocked on decompression, then back up.
|
| + if (current_header_id != headers_id_) {
|
| + session_->MarkDecompressionBlocked(headers_id_, id());
|
| + DVLOG(1) << "Unable to decmpress header data for stream: " << id()
|
| + << " header_id: " << headers_id_;
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + // Decompressed data will be delivered to decompressed_headers_.
|
| + size_t bytes_consumed = session_->decompressor()->DecompressData(
|
| + StringPiece(data, data_len), this);
|
| + total_bytes_consumed += bytes_consumed;
|
| +
|
| + // Headers are complete if the decompressor has moved on to the
|
| + // next stream.
|
| + headers_complete_ =
|
| + session_->decompressor()->current_header_id() != headers_id_;
|
| +
|
| + if (!decompressed_headers_.empty()) {
|
| + size_t bytes_processed = ProcessData(decompressed_headers_.data(),
|
| + decompressed_headers_.length());
|
| + if (bytes_processed == decompressed_headers_.length()) {
|
| + decompressed_headers_.clear();
|
| + } else {
|
| + decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
|
| + }
|
| + }
|
| +
|
| + // We have processed all of the decompressed data but we might
|
| + // have some more raw data to process.
|
| + if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
|
| + total_bytes_consumed += ProcessData(data + bytes_consumed,
|
| + data_len - bytes_consumed);
|
| + }
|
| +
|
| + // The sequencer will push any additional buffered frames if this data
|
| + // has been completely consumed.
|
| + return total_bytes_consumed;
|
| +}
|
| +
|
| +uint32 ReliableQuicStream::ProcessHeaderData() {
|
| + if (decompressed_headers_.empty()) {
|
| + return 0;
|
| + }
|
| +
|
| + size_t bytes_processed = ProcessData(decompressed_headers_.data(),
|
| + decompressed_headers_.length());
|
| + if (bytes_processed == decompressed_headers_.length()) {
|
| + decompressed_headers_.clear();
|
| + } else {
|
| + decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
|
| + }
|
| + return bytes_processed;
|
| +}
|
| +
|
| +void ReliableQuicStream::OnDecompressorAvailable() {
|
| + DCHECK_EQ(headers_id_,
|
| + session_->decompressor()->current_header_id());
|
| + DCHECK(!headers_complete_);
|
| + DCHECK_EQ(0u, decompressed_headers_.length());
|
| +
|
| + size_t total_bytes_consumed = 0;
|
| + struct iovec iovecs[5];
|
| + while (!headers_complete_) {
|
| + size_t num_iovecs =
|
| + sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
|
| +
|
| + if (num_iovecs == 0) {
|
| + return;
|
| + }
|
| + for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) {
|
| + total_bytes_consumed += session_->decompressor()->DecompressData(
|
| + StringPiece(static_cast<char*>(iovecs[i].iov_base),
|
| + iovecs[i].iov_len), this);
|
| +
|
| + headers_complete_ =
|
| + session_->decompressor()->current_header_id() != headers_id_;
|
| + }
|
| + }
|
| +
|
| + // Either the headers are complete, or the all data as been consumed.
|
| + sequencer_.MarkConsumed(total_bytes_consumed);
|
| +
|
| + ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
|
| +
|
| + if (headers_complete_ && decompressed_headers_.empty()) {
|
| + sequencer_.FlushBufferedFrames();
|
| + }
|
| +}
|
| +
|
| +bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
|
| + data.AppendToString(&decompressed_headers_);
|
| + return true;
|
| +}
|
| +
|
| void ReliableQuicStream::CloseWriteSide() {
|
| if (write_side_closed_) {
|
| return;
|
|
|