Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(639)

Unified Diff: net/spdy/spdy_session.cc

Issue 10815074: Instead of enqueueing SPDY frames, enqueue SPDY streams that are ready to produce data. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Remove logging and cleanup Created 8 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 1051cb9179e936556580cf1fe0fbd9608e801e9b..c372b87ba821f8e34a95ea21957d08850f5c8639 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -194,6 +194,28 @@ static ExternalTimeFunc g_time_func = base::TimeTicks::Now;
} // namespace
// static
+void SpdySession::SpdyIOBufferProducer::ActivateStream(
+ SpdySession* spdy_session,
+ SpdyStream* spdy_stream) {
+ spdy_session->ActivateStream(spdy_stream);
+}
+
+// static
+SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* stream) {
+ size_t size = frame->length() + SpdyFrame::kHeaderSize;
+ DCHECK_GT(size, 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), frame->data(), size);
+
+ return new SpdyIOBuffer(buffer, size, priority, stream);
+}
+
+// static
void SpdySession::set_default_protocol(NextProto default_protocol) {
g_default_protocol = default_protocol;
}
@@ -389,6 +411,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
ssl_info.cert->VerifyNameMatch(domain);
}
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer) {
+ write_queue_.push(producer);
+ stream_producers_[producer] = stream;
+ WriteSocketLater();
+}
+
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -427,7 +456,8 @@ int SpdySession::CreateStream(
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
- active_streams_.size() < max_concurrent_streams_) {
+ (active_streams_.size() + created_streams_.size() <
+ max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
@@ -517,10 +547,7 @@ int SpdySession::CreateStreamImpl(
const std::string& path = url.PathForRequest();
- const SpdyStreamId stream_id = GetNewStreamId();
-
*spdy_stream = new SpdyStream(this,
- stream_id,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
@@ -529,14 +556,13 @@ int SpdySession::CreateStreamImpl(
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
- ActivateStream(stream);
+ created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
return OK;
}
@@ -558,15 +584,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-int SpdySession::WriteSynStream(
+SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const SpdyHeaderBlock& headers) {
- // Find our stream
- if (!IsStreamActive(stream_id))
- return ERR_INVALID_SPDY_STREAM;
+ CHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
@@ -577,11 +601,7 @@ int SpdySession::WriteSynStream(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
- credential_slot, flags, false, &headers));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order.
- // http://crbug.com/111708
- QueueFrame(syn_frame.get(), HIGHEST, stream);
+ credential_slot, flags, true, &headers));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
@@ -596,14 +616,15 @@ int SpdySession::WriteSynStream(
stream_id, 0));
}
- return ERR_IO_PENDING;
+ return syn_frame.release();
}
-int SpdySession::WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority) {
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
+ const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority) {
DCHECK(is_secure_);
unsigned char secret[32]; // 32 bytes from the spec
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
@@ -645,24 +666,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order, which means that we need
- // to enqueue all CREDENTIAL frames at this priority to ensure that
- // they are sent *before* the SYN_STREAM that references them.
- // http://crbug.com/111708
- QueueFrame(credential_frame.get(), HIGHEST, NULL);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
- return ERR_IO_PENDING;
+ return credential_frame.release();
}
-int SpdySession::WriteStreamData(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -686,7 +701,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return ERR_IO_PENDING;
+ return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
@@ -711,18 +726,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
- QueueFrame(frame.get(), stream->priority(), stream);
- return ERR_IO_PENDING;
+ return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
+ DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
+ DCHECK_EQ(0u, stream->stream_id());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+}
+
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
@@ -740,7 +760,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.get(), priority, NULL);
+ QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -928,46 +948,22 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !queue_.empty()) {
+ while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
- // Grab the next SpdyFrame to send.
- SpdyIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
- DCHECK(uncompressed_frame.is_control_frame());
- const SpdyControlFrame* uncompressed_control_frame =
- reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame);
- scoped_ptr<SpdyFrame> compressed_frame(
- buffered_spdy_framer_->CompressControlFrame(
- *uncompressed_control_frame));
- if (!compressed_frame.get()) {
- RecordProtocolErrorHistogram(
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
- CloseSessionOnError(
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
- return;
- }
-
- size = compressed_frame->length() + SpdyFrame::kHeaderSize;
-
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
- next_buffer.stream());
- } else {
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
- in_flight_write_ = next_buffer;
- }
+ // Grab the next SpdyBuffer to send.
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ write_queue_.pop();
+ scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
+ stream_producers_.erase(producer.get());
+ // It is possible that a stream had data to write, but a
+ // WINDOW_UPDATE frame has been received which made that
+ // stream no longer writable.
+ // TODO(rch): consider handling that case by removing the
+ // stream from the writable queue?
+ if (buffer == NULL)
+ continue;
+
+ in_flight_write_ = *buffer;
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
@@ -1014,16 +1010,32 @@ void SpdySession::CloseAllStreams(net::Error status) {
while (!active_streams_.empty()) {
ActiveStreamMap::iterator it = active_streams_.begin();
const scoped_refptr<SpdyStream>& stream = it->second;
- DCHECK(stream);
- std::string description = base::StringPrintf(
- "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
- stream->LogStreamError(status, description);
+ LogAbandonedStream(stream, status);
DeleteStream(stream->stream_id(), status);
}
+ while (!created_streams_.empty()) {
+ CreatedStreamSet::iterator it = created_streams_.begin();
+ const scoped_refptr<SpdyStream>& stream = *it;
+ LogAbandonedStream(stream, status);
+ stream->OnClose(status);
+ created_streams_.erase(it);
+ }
+
// We also need to drain the queue.
- while (queue_.size())
- queue_.pop();
+ while (!write_queue_.empty()) {
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ write_queue_.pop();
+ stream_producers_.erase(producer.get());
+ }
+}
+
+void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status) {
+ DCHECK(stream);
+ std::string description = base::StringPrintf(
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
+ stream->LogStreamError(status, description);
}
int SpdySession::GetNewStreamId() {
@@ -1034,17 +1046,6 @@ int SpdySession::GetNewStreamId() {
return id;
}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- int length = SpdyFrame::kHeaderSize + frame->length();
- IOBuffer* buffer = new IOBuffer(length);
- memcpy(buffer->data(), frame->data(), length);
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
-
- WriteSocketLater();
-}
-
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
@@ -1131,7 +1132,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
+class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SimpleSpdyIOBufferProducer(SpdyFrame* frame,
+ RequestPriority priority)
+ : frame_(frame),
+ priority_(priority) {
+ }
+
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return priority_;
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) {
+ return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame_.get(), priority_, NULL);
+ }
+
+ private:
+ scoped_ptr<SpdyFrame> frame_;
+ RequestPriority priority_;
+};
+
+void SpdySession::QueueFrame(SpdyFrame* frame,
+ RequestPriority priority) {
+ SimpleSpdyIOBufferProducer* producer =
+ new SimpleSpdyIOBufferProducer(frame, priority);
+ write_queue_.push(producer);
+ WriteSocketLater();
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
+ if (stream->stream_id() == 0) {
+ stream->set_stream_id(GetNewStreamId());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ }
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
@@ -1159,6 +1194,21 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
if (it2 == active_streams_.end())
return;
+ // Possibly remove from the write queue.
+ WriteQueue old = write_queue_;
+ write_queue_ = WriteQueue();
+ while (!old.empty()) {
+ scoped_ptr<SpdyIOBufferProducer> producer(old.top());
+ old.pop();
+ StreamProducerMap::iterator it = stream_producers_.find(producer.get());
+ if (it == stream_producers_.end() || it->second->stream_id() != id) {
+ write_queue_.push(producer.release());
+ } else {
+ stream_producers_.erase(producer.get());
+ producer.reset(NULL);
+ }
+ }
+
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
@@ -1379,8 +1429,8 @@ void SpdySession::OnSynStream(SpdyStreamId stream_id,
return;
}
- scoped_refptr<SpdyStream> stream(
- new SpdyStream(this, stream_id, true, net_log_));
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
+ stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
@@ -1618,7 +1668,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.get(), stream->priority(), NULL);
+ QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
@@ -1657,7 +1707,6 @@ void SpdySession::SendInitialSettings() {
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
}
- sent_settings_ = true;
SendSettings(settings_map);
}
@@ -1690,7 +1739,6 @@ void SpdySession::SendInitialSettings() {
HandleSetting(new_id, new_val);
}
- sent_settings_ = true;
SendSettings(settings_map_new);
}
@@ -1704,7 +1752,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
- QueueFrame(settings_frame.get(), HIGHEST, NULL);
+ sent_settings_ = true;
+ QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -1739,6 +1788,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
+
+ CreatedStreamSet::iterator i;
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
+ const scoped_refptr<SpdyStream>& stream = *i;
+ stream->AdjustSendWindowSize(delta_window_size);
+ }
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
@@ -1759,7 +1814,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
- QueueFrame(ping_frame.get(), HIGHEST, NULL);
+ QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698