Index: net/spdy/spdy_session.cc |
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
index 1051cb9179e936556580cf1fe0fbd9608e801e9b..c372b87ba821f8e34a95ea21957d08850f5c8639 100644 |
--- a/net/spdy/spdy_session.cc |
+++ b/net/spdy/spdy_session.cc |
@@ -194,6 +194,28 @@ static ExternalTimeFunc g_time_func = base::TimeTicks::Now; |
} // namespace |
// static |
+void SpdySession::SpdyIOBufferProducer::ActivateStream( |
+ SpdySession* spdy_session, |
+ SpdyStream* spdy_stream) { |
+ spdy_session->ActivateStream(spdy_stream); |
+} |
+ |
+// static |
+SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
+ SpdyFrame* frame, |
+ RequestPriority priority, |
+ SpdyStream* stream) { |
+ size_t size = frame->length() + SpdyFrame::kHeaderSize; |
+ DCHECK_GT(size, 0u); |
+ |
+ // TODO(mbelshe): We have too much copying of data here. |
+ IOBufferWithSize* buffer = new IOBufferWithSize(size); |
+ memcpy(buffer->data(), frame->data(), size); |
+ |
+ return new SpdyIOBuffer(buffer, size, priority, stream); |
+} |
+ |
+// static |
void SpdySession::set_default_protocol(NextProto default_protocol) { |
g_default_protocol = default_protocol; |
} |
@@ -389,6 +411,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { |
ssl_info.cert->VerifyNameMatch(domain); |
} |
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, |
+ SpdyIOBufferProducer* producer) { |
+ write_queue_.push(producer); |
+ stream_producers_[producer] = stream; |
+ WriteSocketLater(); |
+} |
+ |
int SpdySession::GetPushStream( |
const GURL& url, |
scoped_refptr<SpdyStream>* stream, |
@@ -427,7 +456,8 @@ int SpdySession::CreateStream( |
const BoundNetLog& stream_net_log, |
const CompletionCallback& callback) { |
if (!max_concurrent_streams_ || |
- active_streams_.size() < max_concurrent_streams_) { |
+ (active_streams_.size() + created_streams_.size() < |
+ max_concurrent_streams_)) { |
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); |
} |
@@ -517,10 +547,7 @@ int SpdySession::CreateStreamImpl( |
const std::string& path = url.PathForRequest(); |
- const SpdyStreamId stream_id = GetNewStreamId(); |
- |
*spdy_stream = new SpdyStream(this, |
- stream_id, |
false, |
stream_net_log); |
const scoped_refptr<SpdyStream>& stream = *spdy_stream; |
@@ -529,14 +556,13 @@ int SpdySession::CreateStreamImpl( |
stream->set_path(path); |
stream->set_send_window_size(initial_send_window_size_); |
stream->set_recv_window_size(initial_recv_window_size_); |
- ActivateStream(stream); |
+ created_streams_.insert(stream); |
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", |
static_cast<int>(priority), 0, 10, 11); |
// TODO(mbelshe): Optimize memory allocations |
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); |
return OK; |
} |
@@ -558,15 +584,13 @@ int SpdySession::GetProtocolVersion() const { |
return buffered_spdy_framer_->protocol_version(); |
} |
-int SpdySession::WriteSynStream( |
+SpdySynStreamControlFrame* SpdySession::CreateSynStream( |
SpdyStreamId stream_id, |
RequestPriority priority, |
uint8 credential_slot, |
SpdyControlFlags flags, |
const SpdyHeaderBlock& headers) { |
- // Find our stream |
- if (!IsStreamActive(stream_id)) |
- return ERR_INVALID_SPDY_STREAM; |
+ CHECK(IsStreamActive(stream_id)); |
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; |
CHECK_EQ(stream->stream_id(), stream_id); |
@@ -577,11 +601,7 @@ int SpdySession::WriteSynStream( |
buffered_spdy_framer_->CreateSynStream( |
stream_id, 0, |
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), |
- credential_slot, flags, false, &headers)); |
- // We enqueue all SYN_STREAM frames at the same priority to ensure |
- // that we do not send them out-of-order. |
- // http://crbug.com/111708 |
- QueueFrame(syn_frame.get(), HIGHEST, stream); |
+ credential_slot, flags, true, &headers)); |
base::StatsCounter spdy_requests("spdy.requests"); |
spdy_requests.Increment(); |
@@ -596,14 +616,15 @@ int SpdySession::WriteSynStream( |
stream_id, 0)); |
} |
- return ERR_IO_PENDING; |
+ return syn_frame.release(); |
} |
-int SpdySession::WriteCredentialFrame(const std::string& origin, |
- SSLClientCertType type, |
- const std::string& key, |
- const std::string& cert, |
- RequestPriority priority) { |
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( |
+ const std::string& origin, |
+ SSLClientCertType type, |
+ const std::string& key, |
+ const std::string& cert, |
+ RequestPriority priority) { |
DCHECK(is_secure_); |
unsigned char secret[32]; // 32 bytes from the spec |
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", |
@@ -645,24 +666,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin, |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyCredentialControlFrame> credential_frame( |
buffered_spdy_framer_->CreateCredentialFrame(credential)); |
- // We enqueue all SYN_STREAM frames at the same priority to ensure |
- // that we do not send them out-of-order, which means that we need |
- // to enqueue all CREDENTIAL frames at this priority to ensure that |
- // they are sent *before* the SYN_STREAM that references them. |
- // http://crbug.com/111708 |
- QueueFrame(credential_frame.get(), HIGHEST, NULL); |
if (net_log().IsLoggingAllEvents()) { |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, |
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); |
} |
- return ERR_IO_PENDING; |
+ return credential_frame.release(); |
} |
-int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
- net::IOBuffer* data, int len, |
- SpdyDataFlags flags) { |
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
+ net::IOBuffer* data, int len, |
+ SpdyDataFlags flags) { |
// Find our stream |
CHECK(IsStreamActive(stream_id)); |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
@@ -686,7 +701,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return ERR_IO_PENDING; |
+ return NULL; |
} |
int new_len = std::min(len, stream->send_window_size()); |
if (new_len < len) { |
@@ -711,18 +726,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
scoped_ptr<SpdyDataFrame> frame( |
buffered_spdy_framer_->CreateDataFrame( |
stream_id, data->data(), len, flags)); |
- QueueFrame(frame.get(), stream->priority(), stream); |
- return ERR_IO_PENDING; |
+ return frame.release(); |
} |
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
+ DCHECK_NE(0u, stream_id); |
// TODO(mbelshe): We should send a RST_STREAM control frame here |
// so that the server can cancel a large send. |
DeleteStream(stream_id, status); |
} |
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) { |
+ DCHECK_EQ(0u, stream->stream_id()); |
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
+} |
+ |
void SpdySession::ResetStream(SpdyStreamId stream_id, |
SpdyStatusCodes status, |
const std::string& description) { |
@@ -740,7 +760,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
priority = stream->priority(); |
} |
- QueueFrame(rst_frame.get(), priority, NULL); |
+ QueueFrame(rst_frame.release(), priority); |
RecordProtocolErrorHistogram( |
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
@@ -928,46 +948,22 @@ void SpdySession::WriteSocket() { |
// Loop sending frames until we've sent everything or until the write |
// returns error (or ERR_IO_PENDING). |
DCHECK(buffered_spdy_framer_.get()); |
- while (in_flight_write_.buffer() || !queue_.empty()) { |
+ while (in_flight_write_.buffer() || !write_queue_.empty()) { |
if (!in_flight_write_.buffer()) { |
- // Grab the next SpdyFrame to send. |
- SpdyIOBuffer next_buffer = queue_.top(); |
- queue_.pop(); |
- |
- // We've deferred compression until just before we write it to the socket, |
- // which is now. At this time, we don't compress our data frames. |
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); |
- size_t size; |
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) { |
- DCHECK(uncompressed_frame.is_control_frame()); |
- const SpdyControlFrame* uncompressed_control_frame = |
- reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame); |
- scoped_ptr<SpdyFrame> compressed_frame( |
- buffered_spdy_framer_->CompressControlFrame( |
- *uncompressed_control_frame)); |
- if (!compressed_frame.get()) { |
- RecordProtocolErrorHistogram( |
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE); |
- CloseSessionOnError( |
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure."); |
- return; |
- } |
- |
- size = compressed_frame->length() + SpdyFrame::kHeaderSize; |
- |
- DCHECK_GT(size, 0u); |
- |
- // TODO(mbelshe): We have too much copying of data here. |
- IOBufferWithSize* buffer = new IOBufferWithSize(size); |
- memcpy(buffer->data(), compressed_frame->data(), size); |
- |
- // Attempt to send the frame. |
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST, |
- next_buffer.stream()); |
- } else { |
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize; |
- in_flight_write_ = next_buffer; |
- } |
+ // Grab the next SpdyBuffer to send. |
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
+ write_queue_.pop(); |
+ scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); |
+ stream_producers_.erase(producer.get()); |
+ // It is possible that a stream had data to write, but a |
+ // WINDOW_UPDATE frame has been received which made that |
+ // stream no longer writable. |
+ // TODO(rch): consider handling that case by removing the |
+ // stream from the writable queue? |
+ if (buffer == NULL) |
+ continue; |
+ |
+ in_flight_write_ = *buffer; |
} else { |
DCHECK(in_flight_write_.buffer()->BytesRemaining()); |
} |
@@ -1014,16 +1010,32 @@ void SpdySession::CloseAllStreams(net::Error status) { |
while (!active_streams_.empty()) { |
ActiveStreamMap::iterator it = active_streams_.begin(); |
const scoped_refptr<SpdyStream>& stream = it->second; |
- DCHECK(stream); |
- std::string description = base::StringPrintf( |
- "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); |
- stream->LogStreamError(status, description); |
+ LogAbandonedStream(stream, status); |
DeleteStream(stream->stream_id(), status); |
} |
+ while (!created_streams_.empty()) { |
+ CreatedStreamSet::iterator it = created_streams_.begin(); |
+ const scoped_refptr<SpdyStream>& stream = *it; |
+ LogAbandonedStream(stream, status); |
+ stream->OnClose(status); |
+ created_streams_.erase(it); |
+ } |
+ |
// We also need to drain the queue. |
- while (queue_.size()) |
- queue_.pop(); |
+ while (!write_queue_.empty()) { |
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
+ write_queue_.pop(); |
+ stream_producers_.erase(producer.get()); |
+ } |
+} |
+ |
+void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, |
+ net::Error status) { |
+ DCHECK(stream); |
+ std::string description = base::StringPrintf( |
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); |
+ stream->LogStreamError(status, description); |
} |
int SpdySession::GetNewStreamId() { |
@@ -1034,17 +1046,6 @@ int SpdySession::GetNewStreamId() { |
return id; |
} |
-void SpdySession::QueueFrame(SpdyFrame* frame, |
- RequestPriority priority, |
- SpdyStream* stream) { |
- int length = SpdyFrame::kHeaderSize + frame->length(); |
- IOBuffer* buffer = new IOBuffer(length); |
- memcpy(buffer->data(), frame->data(), length); |
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); |
- |
- WriteSocketLater(); |
-} |
- |
void SpdySession::CloseSessionOnError(net::Error err, |
bool remove_from_pool, |
const std::string& description) { |
@@ -1131,7 +1132,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
return connection_->socket()->GetLocalAddress(address); |
} |
+class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { |
+ public: |
+ SimpleSpdyIOBufferProducer(SpdyFrame* frame, |
+ RequestPriority priority) |
+ : frame_(frame), |
+ priority_(priority) { |
+ } |
+ |
+ virtual RequestPriority GetPriority() const OVERRIDE { |
+ return priority_; |
+ } |
+ |
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) { |
+ return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
+ frame_.get(), priority_, NULL); |
+ } |
+ |
+ private: |
+ scoped_ptr<SpdyFrame> frame_; |
+ RequestPriority priority_; |
+}; |
+ |
+void SpdySession::QueueFrame(SpdyFrame* frame, |
+ RequestPriority priority) { |
+ SimpleSpdyIOBufferProducer* producer = |
+ new SimpleSpdyIOBufferProducer(frame, priority); |
+ write_queue_.push(producer); |
+ WriteSocketLater(); |
+} |
+ |
void SpdySession::ActivateStream(SpdyStream* stream) { |
+ if (stream->stream_id() == 0) { |
+ stream->set_stream_id(GetNewStreamId()); |
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
+ } |
const SpdyStreamId id = stream->stream_id(); |
DCHECK(!IsStreamActive(id)); |
@@ -1159,6 +1194,21 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
if (it2 == active_streams_.end()) |
return; |
+ // Possibly remove from the write queue. |
+ WriteQueue old = write_queue_; |
+ write_queue_ = WriteQueue(); |
+ while (!old.empty()) { |
+ scoped_ptr<SpdyIOBufferProducer> producer(old.top()); |
+ old.pop(); |
+ StreamProducerMap::iterator it = stream_producers_.find(producer.get()); |
+ if (it == stream_producers_.end() || it->second->stream_id() != id) { |
+ write_queue_.push(producer.release()); |
+ } else { |
+ stream_producers_.erase(producer.get()); |
+ producer.reset(NULL); |
+ } |
+ } |
+ |
// If this is an active stream, call the callback. |
const scoped_refptr<SpdyStream> stream(it2->second); |
active_streams_.erase(it2); |
@@ -1379,8 +1429,8 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id, |
return; |
} |
- scoped_refptr<SpdyStream> stream( |
- new SpdyStream(this, stream_id, true, net_log_)); |
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_)); |
+ stream->set_stream_id(stream_id); |
stream->set_path(gurl.PathForRequest()); |
stream->set_send_window_size(initial_send_window_size_); |
@@ -1618,7 +1668,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( |
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
- QueueFrame(window_update_frame.get(), stream->priority(), NULL); |
+ QueueFrame(window_update_frame.release(), stream->priority()); |
} |
// Given a cwnd that we would have sent to the server, modify it based on the |
@@ -1657,7 +1707,6 @@ void SpdySession::SendInitialSettings() { |
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = |
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_); |
} |
- sent_settings_ = true; |
SendSettings(settings_map); |
} |
@@ -1690,7 +1739,6 @@ void SpdySession::SendInitialSettings() { |
HandleSetting(new_id, new_val); |
} |
- sent_settings_ = true; |
SendSettings(settings_map_new); |
} |
@@ -1704,7 +1752,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) { |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdySettingsControlFrame> settings_frame( |
buffered_spdy_framer_->CreateSettings(settings)); |
- QueueFrame(settings_frame.get(), HIGHEST, NULL); |
+ sent_settings_ = true; |
+ QueueFrame(settings_frame.release(), HIGHEST); |
} |
void SpdySession::HandleSetting(uint32 id, uint32 value) { |
@@ -1739,6 +1788,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { |
DCHECK(stream); |
stream->AdjustSendWindowSize(delta_window_size); |
} |
+ |
+ CreatedStreamSet::iterator i; |
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) { |
+ const scoped_refptr<SpdyStream>& stream = *i; |
+ stream->AdjustSendWindowSize(delta_window_size); |
+ } |
} |
void SpdySession::SendPrefacePingIfNoneInFlight() { |
@@ -1759,7 +1814,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) { |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyPingControlFrame> ping_frame( |
buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); |
- QueueFrame(ping_frame.get(), HIGHEST, NULL); |
+ QueueFrame(ping_frame.release(), HIGHEST); |
if (net_log().IsLoggingAllEvents()) { |
net_log().AddEvent( |