| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
| 8 #include "net/quic/quic_spdy_decompressor.h" |
| 8 | 9 |
| 9 using base::StringPiece; | 10 using base::StringPiece; |
| 11 using std::min; |
| 10 | 12 |
| 11 namespace net { | 13 namespace net { |
| 12 | 14 |
| 13 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
| 14 QuicSession* session) | 16 QuicSession* session) |
| 15 : sequencer_(this), | 17 : sequencer_(this), |
| 16 id_(id), | 18 id_(id), |
| 17 session_(session), | 19 session_(session), |
| 18 visitor_(NULL), | 20 visitor_(NULL), |
| 19 stream_bytes_read_(0), | 21 stream_bytes_read_(0), |
| 20 stream_bytes_written_(0), | 22 stream_bytes_written_(0), |
| 23 headers_complete_(false), |
| 24 headers_id_(0), |
| 21 stream_error_(QUIC_STREAM_NO_ERROR), | 25 stream_error_(QUIC_STREAM_NO_ERROR), |
| 22 connection_error_(QUIC_NO_ERROR), | 26 connection_error_(QUIC_NO_ERROR), |
| 23 read_side_closed_(false), | 27 read_side_closed_(false), |
| 24 write_side_closed_(false), | 28 write_side_closed_(false), |
| 25 fin_buffered_(false), | 29 fin_buffered_(false), |
| 26 fin_sent_(false) { | 30 fin_sent_(false) { |
| 27 } | 31 } |
| 28 | 32 |
| 29 ReliableQuicStream::~ReliableQuicStream() { | 33 ReliableQuicStream::~ReliableQuicStream() { |
| 30 } | 34 } |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 87 CloseWriteSide(); | 91 CloseWriteSide(); |
| 88 } | 92 } |
| 89 CloseReadSide(); | 93 CloseReadSide(); |
| 90 } | 94 } |
| 91 | 95 |
| 92 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { | 96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { |
| 93 stream_error_ = error; | 97 stream_error_ = error; |
| 94 session()->SendRstStream(id(), error); | 98 session()->SendRstStream(id(), error); |
| 95 } | 99 } |
| 96 | 100 |
| 101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { |
| 102 if (headers_complete_ && decompressed_headers_.empty()) { |
| 103 return sequencer_.Readv(iov, iov_len); |
| 104 } |
| 105 size_t bytes_consumed = 0; |
| 106 int iov_index = 0; |
| 107 while (iov_index < iov_len && |
| 108 decompressed_headers_.length() > bytes_consumed) { |
| 109 int bytes_to_read = min(iov[iov_index].iov_len, |
| 110 decompressed_headers_.length() - bytes_consumed); |
| 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); |
| 112 memcpy(iov_ptr, |
| 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); |
| 114 bytes_consumed += bytes_to_read; |
| 115 ++iov_index; |
| 116 } |
| 117 decompressed_headers_.erase(0, bytes_consumed); |
| 118 return bytes_consumed; |
| 119 } |
| 120 |
| 121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { |
| 122 if (headers_complete_ && decompressed_headers_.empty()) { |
| 123 return sequencer_.GetReadableRegions(iov, iov_len); |
| 124 } |
| 125 if (iov_len == 0) { |
| 126 return 0; |
| 127 } |
| 128 iov[0].iov_base = static_cast<void*>( |
| 129 const_cast<char*>(decompressed_headers_.data())); |
| 130 iov[0].iov_len = decompressed_headers_.length(); |
| 131 return 1; |
| 132 } |
| 133 |
| 97 bool ReliableQuicStream::IsHalfClosed() const { | 134 bool ReliableQuicStream::IsHalfClosed() const { |
| 135 if (!headers_complete_ || !decompressed_headers_.empty()) { |
| 136 return false; |
| 137 } |
| 98 return sequencer_.IsHalfClosed(); | 138 return sequencer_.IsHalfClosed(); |
| 99 } | 139 } |
| 100 | 140 |
| 101 bool ReliableQuicStream::IsClosed() const { | 141 bool ReliableQuicStream::IsClosed() const { |
| 102 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); | 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); |
| 103 } | 143 } |
| 104 | 144 |
| 105 bool ReliableQuicStream::HasBytesToRead() const { | 145 bool ReliableQuicStream::HasBytesToRead() const { |
| 106 return sequencer_.HasBytesToRead(); | 146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); |
| 107 } | 147 } |
| 108 | 148 |
| 109 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { | 149 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { |
| 110 return session_->peer_address(); | 150 return session_->peer_address(); |
| 111 } | 151 } |
| 112 | 152 |
| 113 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { | 153 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { |
| 114 return WriteOrBuffer(data, fin); | 154 return WriteOrBuffer(data, fin); |
| 115 } | 155 } |
| 116 | 156 |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 181 } | 221 } |
| 182 DLOG(INFO) << "Done reading from stream " << id(); | 222 DLOG(INFO) << "Done reading from stream " << id(); |
| 183 | 223 |
| 184 read_side_closed_ = true; | 224 read_side_closed_ = true; |
| 185 if (write_side_closed_) { | 225 if (write_side_closed_) { |
| 186 DLOG(INFO) << "Closing stream: " << id(); | 226 DLOG(INFO) << "Closing stream: " << id(); |
| 187 session_->CloseStream(id()); | 227 session_->CloseStream(id()); |
| 188 } | 228 } |
| 189 } | 229 } |
| 190 | 230 |
| 231 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { |
| 232 if (id() == kCryptoStreamId) { |
| 233 // The crypto stream does not use compression. |
| 234 return ProcessData(data, data_len); |
| 235 } |
| 236 uint32 total_bytes_consumed = 0; |
| 237 if (headers_id_ == 0u) { |
| 238 // The headers ID has not yet been read. Strip it from the beginning of |
| 239 // the data stream. |
| 240 DCHECK_GT(4u, headers_id_buffer_.length()); |
| 241 size_t missing_size = 4 - headers_id_buffer_.length(); |
| 242 if (data_len < missing_size) { |
| 243 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); |
| 244 return data_len; |
| 245 } |
| 246 total_bytes_consumed += missing_size; |
| 247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); |
| 248 DCHECK_EQ(4u, headers_id_buffer_.length()); |
| 249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); |
| 250 headers_id_buffer_.clear(); |
| 251 data += missing_size; |
| 252 data_len -= missing_size; |
| 253 } |
| 254 DCHECK_NE(0u, headers_id_); |
| 255 |
| 256 // Once the headers are finished, we simply pass the data through. |
| 257 if (headers_complete_ && decompressed_headers_.empty()) { |
| 258 DVLOG(1) << "Delegating procesing to ProcessData"; |
| 259 return total_bytes_consumed + ProcessData(data, data_len); |
| 260 } |
| 261 |
| 262 QuicHeaderId current_header_id = |
| 263 session_->decompressor()->current_header_id(); |
| 264 // Ensure that this header id looks sane. |
| 265 if (headers_id_ < current_header_id || |
| 266 headers_id_ > kMaxHeaderIdDelta + current_header_id) { |
| 267 DVLOG(1) << "Invalud headers for stream: " << id() |
| 268 << " header_id: " << headers_id_; |
| 269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); |
| 270 } |
| 271 |
| 272 // If we are head-of-line blocked on decompression, then back up. |
| 273 if (current_header_id != headers_id_) { |
| 274 session_->MarkDecompressionBlocked(headers_id_, id()); |
| 275 DVLOG(1) << "Unable to decmpress header data for stream: " << id() |
| 276 << " header_id: " << headers_id_; |
| 277 return total_bytes_consumed; |
| 278 } |
| 279 |
| 280 // Decompressed data will be delivered to decompressed_headers_. |
| 281 size_t bytes_consumed = session_->decompressor()->DecompressData( |
| 282 StringPiece(data, data_len), this); |
| 283 total_bytes_consumed += bytes_consumed; |
| 284 |
| 285 // Headers are complete if the decompressor has moved on to the |
| 286 // next stream. |
| 287 headers_complete_ = |
| 288 session_->decompressor()->current_header_id() != headers_id_; |
| 289 |
| 290 if (!decompressed_headers_.empty()) { |
| 291 size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
| 292 decompressed_headers_.length()); |
| 293 if (bytes_processed == decompressed_headers_.length()) { |
| 294 decompressed_headers_.clear(); |
| 295 } else { |
| 296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
| 297 } |
| 298 } |
| 299 |
| 300 // We have processed all of the decompressed data but we might |
| 301 // have some more raw data to process. |
| 302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { |
| 303 total_bytes_consumed += ProcessData(data + bytes_consumed, |
| 304 data_len - bytes_consumed); |
| 305 } |
| 306 |
| 307 // The sequencer will push any additional buffered frames if this data |
| 308 // has been completely consumed. |
| 309 return total_bytes_consumed; |
| 310 } |
| 311 |
| 312 uint32 ReliableQuicStream::ProcessHeaderData() { |
| 313 if (decompressed_headers_.empty()) { |
| 314 return 0; |
| 315 } |
| 316 |
| 317 size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
| 318 decompressed_headers_.length()); |
| 319 if (bytes_processed == decompressed_headers_.length()) { |
| 320 decompressed_headers_.clear(); |
| 321 } else { |
| 322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
| 323 } |
| 324 return bytes_processed; |
| 325 } |
| 326 |
| 327 void ReliableQuicStream::OnDecompressorAvailable() { |
| 328 DCHECK_EQ(headers_id_, |
| 329 session_->decompressor()->current_header_id()); |
| 330 DCHECK(!headers_complete_); |
| 331 DCHECK_EQ(0u, decompressed_headers_.length()); |
| 332 |
| 333 size_t total_bytes_consumed = 0; |
| 334 struct iovec iovecs[5]; |
| 335 while (!headers_complete_) { |
| 336 size_t num_iovecs = |
| 337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); |
| 338 |
| 339 if (num_iovecs == 0) { |
| 340 return; |
| 341 } |
| 342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) { |
| 343 total_bytes_consumed += session_->decompressor()->DecompressData( |
| 344 StringPiece(static_cast<char*>(iovecs[i].iov_base), |
| 345 iovecs[i].iov_len), this); |
| 346 |
| 347 headers_complete_ = |
| 348 session_->decompressor()->current_header_id() != headers_id_; |
| 349 } |
| 350 } |
| 351 |
| 352 // Either the headers are complete, or the all data as been consumed. |
| 353 sequencer_.MarkConsumed(total_bytes_consumed); |
| 354 |
| 355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. |
| 356 |
| 357 if (headers_complete_ && decompressed_headers_.empty()) { |
| 358 sequencer_.FlushBufferedFrames(); |
| 359 } |
| 360 } |
| 361 |
| 362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { |
| 363 data.AppendToString(&decompressed_headers_); |
| 364 return true; |
| 365 } |
| 366 |
| 191 void ReliableQuicStream::CloseWriteSide() { | 367 void ReliableQuicStream::CloseWriteSide() { |
| 192 if (write_side_closed_) { | 368 if (write_side_closed_) { |
| 193 return; | 369 return; |
| 194 } | 370 } |
| 195 DLOG(INFO) << "Done writing to stream " << id(); | 371 DLOG(INFO) << "Done writing to stream " << id(); |
| 196 | 372 |
| 197 write_side_closed_ = true; | 373 write_side_closed_ = true; |
| 198 if (read_side_closed_) { | 374 if (read_side_closed_) { |
| 199 DLOG(INFO) << "Closing stream: " << id(); | 375 DLOG(INFO) << "Closing stream: " << id(); |
| 200 session_->CloseStream(id()); | 376 session_->CloseStream(id()); |
| 201 } | 377 } |
| 202 } | 378 } |
| 203 | 379 |
| 204 void ReliableQuicStream::OnClose() { | 380 void ReliableQuicStream::OnClose() { |
| 205 CloseReadSide(); | 381 CloseReadSide(); |
| 206 CloseWriteSide(); | 382 CloseWriteSide(); |
| 207 | 383 |
| 208 if (visitor_) { | 384 if (visitor_) { |
| 209 Visitor* visitor = visitor_; | 385 Visitor* visitor = visitor_; |
| 210 // Calling Visitor::OnClose() may result the destruction of the visitor, | 386 // Calling Visitor::OnClose() may result the destruction of the visitor, |
| 211 // so we need to ensure we don't call it again. | 387 // so we need to ensure we don't call it again. |
| 212 visitor_ = NULL; | 388 visitor_ = NULL; |
| 213 visitor->OnClose(this); | 389 visitor->OnClose(this); |
| 214 } | 390 } |
| 215 } | 391 } |
| 216 | 392 |
| 217 } // namespace net | 393 } // namespace net |
| OLD | NEW |