OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
6 | 6 |
7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
8 #include "net/quic/quic_spdy_decompressor.h" | 8 #include "net/quic/quic_spdy_decompressor.h" |
9 | 9 |
10 using base::StringPiece; | 10 using base::StringPiece; |
11 using std::min; | 11 using std::min; |
12 | 12 |
13 namespace net { | 13 namespace net { |
14 | 14 |
15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
16 QuicSession* session) | 16 QuicSession* session) |
17 : sequencer_(this), | 17 : sequencer_(this), |
18 id_(id), | 18 id_(id), |
19 session_(session), | 19 session_(session), |
20 visitor_(NULL), | 20 visitor_(NULL), |
21 stream_bytes_read_(0), | 21 stream_bytes_read_(0), |
22 stream_bytes_written_(0), | 22 stream_bytes_written_(0), |
23 headers_complete_(false), | 23 headers_decompressed_(false), |
24 headers_id_(0), | 24 headers_id_(0), |
25 stream_error_(QUIC_STREAM_NO_ERROR), | 25 stream_error_(QUIC_STREAM_NO_ERROR), |
26 connection_error_(QUIC_NO_ERROR), | 26 connection_error_(QUIC_NO_ERROR), |
27 read_side_closed_(false), | 27 read_side_closed_(false), |
28 write_side_closed_(false), | 28 write_side_closed_(false), |
29 fin_buffered_(false), | 29 fin_buffered_(false), |
30 fin_sent_(false) { | 30 fin_sent_(false) { |
31 } | 31 } |
32 | 32 |
33 ReliableQuicStream::~ReliableQuicStream() { | 33 ReliableQuicStream::~ReliableQuicStream() { |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 } | 92 } |
93 CloseReadSide(); | 93 CloseReadSide(); |
94 } | 94 } |
95 | 95 |
96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { | 96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { |
97 stream_error_ = error; | 97 stream_error_ = error; |
98 session()->SendRstStream(id(), error); | 98 session()->SendRstStream(id(), error); |
99 } | 99 } |
100 | 100 |
101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { | 101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { |
102 if (headers_complete_ && decompressed_headers_.empty()) { | 102 if (headers_decompressed_ && decompressed_headers_.empty()) { |
103 return sequencer_.Readv(iov, iov_len); | 103 return sequencer_.Readv(iov, iov_len); |
104 } | 104 } |
105 size_t bytes_consumed = 0; | 105 size_t bytes_consumed = 0; |
106 int iov_index = 0; | 106 int iov_index = 0; |
107 while (iov_index < iov_len && | 107 while (iov_index < iov_len && |
108 decompressed_headers_.length() > bytes_consumed) { | 108 decompressed_headers_.length() > bytes_consumed) { |
109 int bytes_to_read = min(iov[iov_index].iov_len, | 109 int bytes_to_read = min(iov[iov_index].iov_len, |
110 decompressed_headers_.length() - bytes_consumed); | 110 decompressed_headers_.length() - bytes_consumed); |
111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); | 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); |
112 memcpy(iov_ptr, | 112 memcpy(iov_ptr, |
113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); | 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); |
114 bytes_consumed += bytes_to_read; | 114 bytes_consumed += bytes_to_read; |
115 ++iov_index; | 115 ++iov_index; |
116 } | 116 } |
117 decompressed_headers_.erase(0, bytes_consumed); | 117 decompressed_headers_.erase(0, bytes_consumed); |
118 return bytes_consumed; | 118 return bytes_consumed; |
119 } | 119 } |
120 | 120 |
121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { | 121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { |
122 if (headers_complete_ && decompressed_headers_.empty()) { | 122 if (headers_decompressed_ && decompressed_headers_.empty()) { |
123 return sequencer_.GetReadableRegions(iov, iov_len); | 123 return sequencer_.GetReadableRegions(iov, iov_len); |
124 } | 124 } |
125 if (iov_len == 0) { | 125 if (iov_len == 0) { |
126 return 0; | 126 return 0; |
127 } | 127 } |
128 iov[0].iov_base = static_cast<void*>( | 128 iov[0].iov_base = static_cast<void*>( |
129 const_cast<char*>(decompressed_headers_.data())); | 129 const_cast<char*>(decompressed_headers_.data())); |
130 iov[0].iov_len = decompressed_headers_.length(); | 130 iov[0].iov_len = decompressed_headers_.length(); |
131 return 1; | 131 return 1; |
132 } | 132 } |
133 | 133 |
134 bool ReliableQuicStream::IsHalfClosed() const { | 134 bool ReliableQuicStream::IsHalfClosed() const { |
135 if (!headers_complete_ || !decompressed_headers_.empty()) { | 135 if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
136 return false; | 136 return false; |
137 } | 137 } |
138 return sequencer_.IsHalfClosed(); | 138 return sequencer_.IsHalfClosed(); |
139 } | 139 } |
140 | 140 |
141 bool ReliableQuicStream::IsClosed() const { | 141 bool ReliableQuicStream::IsClosed() const { |
142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); | 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); |
143 } | 143 } |
144 | 144 |
145 bool ReliableQuicStream::HasBytesToRead() const { | 145 bool ReliableQuicStream::HasBytesToRead() const { |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); | 247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); |
248 DCHECK_EQ(4u, headers_id_buffer_.length()); | 248 DCHECK_EQ(4u, headers_id_buffer_.length()); |
249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); | 249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); |
250 headers_id_buffer_.clear(); | 250 headers_id_buffer_.clear(); |
251 data += missing_size; | 251 data += missing_size; |
252 data_len -= missing_size; | 252 data_len -= missing_size; |
253 } | 253 } |
254 DCHECK_NE(0u, headers_id_); | 254 DCHECK_NE(0u, headers_id_); |
255 | 255 |
256 // Once the headers are finished, we simply pass the data through. | 256 // Once the headers are finished, we simply pass the data through. |
257 if (headers_complete_ && decompressed_headers_.empty()) { | 257 if (headers_decompressed_) { |
258 DVLOG(1) << "Delegating procesing to ProcessData"; | 258 // Some buffered header data remains. |
259 return total_bytes_consumed + ProcessData(data, data_len); | 259 if (!decompressed_headers_.empty()) { |
| 260 ProcessHeaderData(); |
| 261 } |
| 262 if (decompressed_headers_.empty()) { |
| 263 DVLOG(1) << "Delegating procesing to ProcessData"; |
| 264 total_bytes_consumed += ProcessData(data, data_len); |
| 265 } |
| 266 return total_bytes_consumed; |
260 } | 267 } |
261 | 268 |
262 QuicHeaderId current_header_id = | 269 QuicHeaderId current_header_id = |
263 session_->decompressor()->current_header_id(); | 270 session_->decompressor()->current_header_id(); |
264 // Ensure that this header id looks sane. | 271 // Ensure that this header id looks sane. |
265 if (headers_id_ < current_header_id || | 272 if (headers_id_ < current_header_id || |
266 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | 273 headers_id_ > kMaxHeaderIdDelta + current_header_id) { |
267 DVLOG(1) << "Invalud headers for stream: " << id() | 274 DVLOG(1) << "Invalid headers for stream: " << id() |
268 << " header_id: " << headers_id_; | 275 << " header_id: " << headers_id_ |
| 276 << " current_header_id: " << current_header_id; |
269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | 277 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); |
| 278 return total_bytes_consumed; |
270 } | 279 } |
271 | 280 |
272 // If we are head-of-line blocked on decompression, then back up. | 281 // If we are head-of-line blocked on decompression, then back up. |
273 if (current_header_id != headers_id_) { | 282 if (current_header_id != headers_id_) { |
274 session_->MarkDecompressionBlocked(headers_id_, id()); | 283 session_->MarkDecompressionBlocked(headers_id_, id()); |
275 DVLOG(1) << "Unable to decmpress header data for stream: " << id() | 284 DVLOG(1) << "Unable to decmpress header data for stream: " << id() |
276 << " header_id: " << headers_id_; | 285 << " header_id: " << headers_id_; |
277 return total_bytes_consumed; | 286 return total_bytes_consumed; |
278 } | 287 } |
279 | 288 |
280 // Decompressed data will be delivered to decompressed_headers_. | 289 // Decompressed data will be delivered to decompressed_headers_. |
281 size_t bytes_consumed = session_->decompressor()->DecompressData( | 290 size_t bytes_consumed = session_->decompressor()->DecompressData( |
282 StringPiece(data, data_len), this); | 291 StringPiece(data, data_len), this); |
283 total_bytes_consumed += bytes_consumed; | 292 total_bytes_consumed += bytes_consumed; |
284 | 293 |
285 // Headers are complete if the decompressor has moved on to the | 294 // Headers are complete if the decompressor has moved on to the |
286 // next stream. | 295 // next stream. |
287 headers_complete_ = | 296 headers_decompressed_ = |
288 session_->decompressor()->current_header_id() != headers_id_; | 297 session_->decompressor()->current_header_id() != headers_id_; |
289 | 298 |
290 if (!decompressed_headers_.empty()) { | 299 ProcessHeaderData(); |
291 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | |
292 decompressed_headers_.length()); | |
293 if (bytes_processed == decompressed_headers_.length()) { | |
294 decompressed_headers_.clear(); | |
295 } else { | |
296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | |
297 } | |
298 } | |
299 | 300 |
300 // We have processed all of the decompressed data but we might | 301 // We have processed all of the decompressed data but we might |
301 // have some more raw data to process. | 302 // have some more raw data to process. |
302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { | 303 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { |
303 total_bytes_consumed += ProcessData(data + bytes_consumed, | 304 total_bytes_consumed += ProcessData(data + bytes_consumed, |
304 data_len - bytes_consumed); | 305 data_len - bytes_consumed); |
305 } | 306 } |
306 | 307 |
307 // The sequencer will push any additional buffered frames if this data | 308 // The sequencer will push any additional buffered frames if this data |
308 // has been completely consumed. | 309 // has been completely consumed. |
(...skipping 11 matching lines...) Expand all Loading... |
320 decompressed_headers_.clear(); | 321 decompressed_headers_.clear(); |
321 } else { | 322 } else { |
322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | 323 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
323 } | 324 } |
324 return bytes_processed; | 325 return bytes_processed; |
325 } | 326 } |
326 | 327 |
327 void ReliableQuicStream::OnDecompressorAvailable() { | 328 void ReliableQuicStream::OnDecompressorAvailable() { |
328 DCHECK_EQ(headers_id_, | 329 DCHECK_EQ(headers_id_, |
329 session_->decompressor()->current_header_id()); | 330 session_->decompressor()->current_header_id()); |
330 DCHECK(!headers_complete_); | 331 DCHECK(!headers_decompressed_); |
331 DCHECK_EQ(0u, decompressed_headers_.length()); | 332 DCHECK_EQ(0u, decompressed_headers_.length()); |
332 | 333 |
333 size_t total_bytes_consumed = 0; | 334 size_t total_bytes_consumed = 0; |
334 struct iovec iovecs[5]; | 335 struct iovec iovecs[5]; |
335 while (!headers_complete_) { | 336 while (!headers_decompressed_) { |
336 size_t num_iovecs = | 337 size_t num_iovecs = |
337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); | 338 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); |
338 | 339 |
339 if (num_iovecs == 0) { | 340 if (num_iovecs == 0) { |
340 return; | 341 return; |
341 } | 342 } |
342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) { | 343 for (size_t i = 0; i < num_iovecs && !headers_decompressed_; i++) { |
343 total_bytes_consumed += session_->decompressor()->DecompressData( | 344 total_bytes_consumed += session_->decompressor()->DecompressData( |
344 StringPiece(static_cast<char*>(iovecs[i].iov_base), | 345 StringPiece(static_cast<char*>(iovecs[i].iov_base), |
345 iovecs[i].iov_len), this); | 346 iovecs[i].iov_len), this); |
346 | 347 |
347 headers_complete_ = | 348 headers_decompressed_ = |
348 session_->decompressor()->current_header_id() != headers_id_; | 349 session_->decompressor()->current_header_id() != headers_id_; |
349 } | 350 } |
350 } | 351 } |
351 | 352 |
352 // Either the headers are complete, or the all data as been consumed. | 353 // Either the headers are complete, or the all data as been consumed. |
353 sequencer_.MarkConsumed(total_bytes_consumed); | 354 sequencer_.MarkConsumed(total_bytes_consumed); |
354 | 355 |
355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | 356 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. |
356 | 357 |
357 if (headers_complete_ && decompressed_headers_.empty()) { | 358 if (headers_decompressed_ && decompressed_headers_.empty()) { |
358 sequencer_.FlushBufferedFrames(); | 359 sequencer_.FlushBufferedFrames(); |
359 } | 360 } |
360 } | 361 } |
361 | 362 |
362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { | 363 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { |
363 data.AppendToString(&decompressed_headers_); | 364 data.AppendToString(&decompressed_headers_); |
364 return true; | 365 return true; |
365 } | 366 } |
366 | 367 |
367 void ReliableQuicStream::CloseWriteSide() { | 368 void ReliableQuicStream::CloseWriteSide() { |
(...skipping 16 matching lines...) Expand all Loading... |
384 if (visitor_) { | 385 if (visitor_) { |
385 Visitor* visitor = visitor_; | 386 Visitor* visitor = visitor_; |
386 // Calling Visitor::OnClose() may result the destruction of the visitor, | 387 // Calling Visitor::OnClose() may result the destruction of the visitor, |
387 // so we need to ensure we don't call it again. | 388 // so we need to ensure we don't call it again. |
388 visitor_ = NULL; | 389 visitor_ = NULL; |
389 visitor->OnClose(this); | 390 visitor->OnClose(this); |
390 } | 391 } |
391 } | 392 } |
392 | 393 |
393 } // namespace net | 394 } // namespace net |
OLD | NEW |