Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: net/spdy/spdy_session.cc

Issue 11644088: SPDY - implement greedy approach to read all the data and process it (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/spdy/spdy_session.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/spdy_session.h" 5 #include "net/spdy/spdy_session.h"
6 6
7 #include <map> 7 #include <map>
8 8
9 #include "base/basictypes.h" 9 #include "base/basictypes.h"
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 24 matching lines...) Expand all
35 #include "net/spdy/spdy_http_utils.h" 35 #include "net/spdy/spdy_http_utils.h"
36 #include "net/spdy/spdy_protocol.h" 36 #include "net/spdy/spdy_protocol.h"
37 #include "net/spdy/spdy_session_pool.h" 37 #include "net/spdy/spdy_session_pool.h"
38 #include "net/spdy/spdy_stream.h" 38 #include "net/spdy/spdy_stream.h"
39 39
40 namespace net { 40 namespace net {
41 41
42 namespace { 42 namespace {
43 43
44 const int kReadBufferSize = 8 * 1024; 44 const int kReadBufferSize = 8 * 1024;
45 const int kMaxReadBytes = 64 * 1024;
45 const int kDefaultConnectionAtRiskOfLossSeconds = 10; 46 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
46 const int kHungIntervalSeconds = 10; 47 const int kHungIntervalSeconds = 10;
47 48
48 // Minimum seconds that unclaimed pushed streams will be kept in memory. 49 // Minimum seconds that unclaimed pushed streams will be kept in memory.
49 const int kMinPushedStreamLifetimeSeconds = 300; 50 const int kMinPushedStreamLifetimeSeconds = 300;
50 51
51 Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers, 52 Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers,
52 bool fin, 53 bool fin,
53 bool unidirectional, 54 bool unidirectional,
54 SpdyStreamId stream_id, 55 SpdyStreamId stream_id,
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after
231 connection_(new ClientSocketHandle), 232 connection_(new ClientSocketHandle),
232 read_buffer_(new IOBuffer(kReadBufferSize)), 233 read_buffer_(new IOBuffer(kReadBufferSize)),
233 read_pending_(false), 234 read_pending_(false),
234 stream_hi_water_mark_(1), // Always start at 1 for the first stream id. 235 stream_hi_water_mark_(1), // Always start at 1 for the first stream id.
235 write_pending_(false), 236 write_pending_(false),
236 delayed_write_pending_(false), 237 delayed_write_pending_(false),
237 is_secure_(false), 238 is_secure_(false),
238 certificate_error_code_(OK), 239 certificate_error_code_(OK),
239 error_(OK), 240 error_(OK),
240 state_(IDLE), 241 state_(IDLE),
242 io_state_(STATE_NONE),
241 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 243 max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
242 kInitialMaxConcurrentStreams : 244 kInitialMaxConcurrentStreams :
243 initial_max_concurrent_streams), 245 initial_max_concurrent_streams),
244 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 246 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
245 kMaxConcurrentStreamLimit : 247 kMaxConcurrentStreamLimit :
246 max_concurrent_streams_limit), 248 max_concurrent_streams_limit),
247 streams_initiated_count_(0), 249 streams_initiated_count_(0),
248 streams_pushed_count_(0), 250 streams_pushed_count_(0),
249 streams_pushed_and_claimed_count_(0), 251 streams_pushed_and_claimed_count_(0),
250 streams_abandoned_count_(0), 252 streams_abandoned_count_(0),
251 bytes_received_(0), 253 bytes_received_(0),
254 bytes_read_(0),
252 sent_settings_(false), 255 sent_settings_(false),
253 received_settings_(false), 256 received_settings_(false),
254 stalled_streams_(0), 257 stalled_streams_(0),
255 pings_in_flight_(0), 258 pings_in_flight_(0),
256 next_ping_id_(1), 259 next_ping_id_(1),
257 last_activity_time_(base::TimeTicks::Now()), 260 last_activity_time_(base::TimeTicks::Now()),
258 check_ping_status_pending_(false), 261 check_ping_status_pending_(false),
259 flow_control_(false), 262 flow_control_(false),
260 initial_send_window_size_(kSpdyStreamInitialWindowSize), 263 initial_send_window_size_(kSpdyStreamInitialWindowSize),
261 initial_recv_window_size_(initial_recv_window_size == 0 ? 264 initial_recv_window_size_(initial_recv_window_size == 0 ?
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
363 flow_control_ = (protocol >= kProtoSPDY3); 366 flow_control_ = (protocol >= kProtoSPDY3);
364 367
365 buffered_spdy_framer_.reset(new BufferedSpdyFramer(version, 368 buffered_spdy_framer_.reset(new BufferedSpdyFramer(version,
366 enable_compression_)); 369 enable_compression_));
367 buffered_spdy_framer_->set_visitor(this); 370 buffered_spdy_framer_->set_visitor(this);
368 SendInitialSettings(); 371 SendInitialSettings();
369 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol, kProtoMaximumVersion); 372 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol, kProtoMaximumVersion);
370 373
371 // Write out any data that we might have to send, such as the settings frame. 374 // Write out any data that we might have to send, such as the settings frame.
372 WriteSocketLater(); 375 WriteSocketLater();
373 net::Error error = ReadSocket(); 376 io_state_ = STATE_DO_READ;
377 bytes_read_ = 0;
378 int result = DoLoop(OK);
379 net::Error error = static_cast<net::Error>(result);
374 if (error == ERR_IO_PENDING) 380 if (error == ERR_IO_PENDING)
375 return OK; 381 return OK;
376 return error; 382 return error;
377 } 383 }
378 384
379 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 385 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
380 if (!verify_domain_authentication_) 386 if (!verify_domain_authentication_)
381 return true; 387 return true;
382 388
383 if (state_ != CONNECTED) 389 if (state_ != CONNECTED)
(...skipping 381 matching lines...) Expand 10 before | Expand all | Expand 10 after
765 // If we're connecting, defer to the connection to give us the actual 771 // If we're connecting, defer to the connection to give us the actual
766 // LoadState. 772 // LoadState.
767 if (state_ == CONNECTING) 773 if (state_ == CONNECTING)
768 return connection_->GetLoadState(); 774 return connection_->GetLoadState();
769 775
770 // Just report that we're idle since the session could be doing 776 // Just report that we're idle since the session could be doing
771 // many things concurrently. 777 // many things concurrently.
772 return LOAD_STATE_IDLE; 778 return LOAD_STATE_IDLE;
773 } 779 }
774 780
775 void SpdySession::OnReadComplete(int bytes_read) { 781 int SpdySession::DoLoop(int result) {
782 do {
783 if (read_pending_)
784 return OK;
785
786 if (state_ == CLOSED)
787 return ERR_UNEXPECTED;
788
789 IoState io_state = io_state_;
790 io_state_ = STATE_NONE;
791 switch (io_state) {
792 case STATE_DO_READ:
793 result = DoRead();
794 break;
795 case STATE_DO_READ_COMPLETE:
796 result = DoReadComplete(result);
797 break;
798 default:
799 NOTREACHED() << "io_state_: " << io_state;
800 break;
801 }
802 } while (io_state_ != STATE_NONE && result != ERR_IO_PENDING);
Ryan Sleevi 2012/12/27 05:29:48 Design Suggestion: You can probably make it cleare
803 return result;
804 }
805
806 int SpdySession::DoRead() {
807 io_state_ = STATE_DO_READ_COMPLETE;
808
809 CHECK(connection_.get());
810 CHECK(connection_->socket());
811 int bytes_read = connection_->socket()->Read(
812 read_buffer_.get(),
813 kReadBufferSize,
814 base::Bind(&SpdySession::OnReadComplete, base::Unretained(this)));
815 if (bytes_read == net::ERR_IO_PENDING) {
816 read_pending_ = true;
817 } else if (bytes_read > 0) {
818 if (bytes_read_ > kMaxReadBytes) {
819 // Data was read, process it.
820 // Schedule the work through the message loop to yield for other tasks to
821 // run on IO thread.
822 read_pending_ = true;
823 MessageLoop::current()->PostTask(
824 FROM_HERE,
825 base::Bind(&SpdySession::OnReadComplete,
826 weak_factory_.GetWeakPtr(), bytes_read));
827 }
828 }
Ryan Sleevi 2012/12/27 05:29:48 Here, it would be as simple as CHECK(...) return c
ramant (doing other things) 2012/12/29 03:02:20 Done.
829 return bytes_read;
830 }
831
832 int SpdySession::DoReadComplete(int bytes_read) {
776 // Parse a frame. For now this code requires that the frame fit into our 833 // Parse a frame. For now this code requires that the frame fit into our
777 // buffer (32KB). 834 // buffer (32KB).
778 // TODO(mbelshe): support arbitrarily large frames! 835 // TODO(mbelshe): support arbitrarily large frames!
779 836
780 read_pending_ = false;
781
782 if (bytes_read <= 0) { 837 if (bytes_read <= 0) {
783 // Session is tearing down. 838 // Session is tearing down.
784 net::Error error = static_cast<net::Error>(bytes_read); 839 net::Error error = static_cast<net::Error>(bytes_read);
785 if (bytes_read == 0) 840 if (bytes_read == 0)
786 error = ERR_CONNECTION_CLOSED; 841 error = ERR_CONNECTION_CLOSED;
787 CloseSessionOnError(error, true, "bytes_read is <= 0."); 842 CloseSessionOnError(error, true, "bytes_read is <= 0.");
788 return; 843 io_state_ = STATE_NONE;
Ryan Sleevi 2012/12/27 05:29:48 If you're relying on this pattern to set io_state_
ramant (doing other things) 2012/12/29 03:02:20 Done.
844 return error;
789 } 845 }
790 846
791 bytes_received_ += bytes_read; 847 bytes_received_ += bytes_read;
848 bytes_read_ += bytes_read;
792 849
793 last_activity_time_ = base::TimeTicks::Now(); 850 last_activity_time_ = base::TimeTicks::Now();
794 851
795 // The SpdyFramer will use callbacks onto |this| as it parses frames. 852 // The SpdyFramer will use callbacks onto |this| as it parses frames.
796 // When errors occur, those callbacks can lead to teardown of all references 853 // When errors occur, those callbacks can lead to teardown of all references
797 // to |this|, so maintain a reference to self during this call for safe 854 // to |this|, so maintain a reference to self during this call for safe
798 // cleanup. 855 // cleanup.
799 scoped_refptr<SpdySession> self(this); 856 scoped_refptr<SpdySession> self(this);
800 857
801 DCHECK(buffered_spdy_framer_.get()); 858 DCHECK(buffered_spdy_framer_.get());
802 char *data = read_buffer_->data(); 859 char *data = read_buffer_->data();
803 while (bytes_read && 860 while (bytes_read &&
804 buffered_spdy_framer_->error_code() == 861 buffered_spdy_framer_->error_code() ==
805 SpdyFramer::SPDY_NO_ERROR) { 862 SpdyFramer::SPDY_NO_ERROR) {
806 uint32 bytes_processed = 863 uint32 bytes_processed =
807 buffered_spdy_framer_->ProcessInput(data, bytes_read); 864 buffered_spdy_framer_->ProcessInput(data, bytes_read);
808 bytes_read -= bytes_processed; 865 bytes_read -= bytes_processed;
809 data += bytes_processed; 866 data += bytes_processed;
810 if (buffered_spdy_framer_->state() == SpdyFramer::SPDY_DONE) 867 if (buffered_spdy_framer_->state() == SpdyFramer::SPDY_DONE)
811 buffered_spdy_framer_->Reset(); 868 buffered_spdy_framer_->Reset();
812 } 869 }
813 870
814 if (state_ != CLOSED) 871 if (state_ != CLOSED)
815 ReadSocket(); 872 io_state_ = STATE_DO_READ;
873 else
874 io_state_ = STATE_NONE;
875 return OK;
876 }
877
878 void SpdySession::OnReadComplete(int bytes_read) {
879 io_state_ = STATE_DO_READ_COMPLETE;
Ryan Sleevi 2012/12/27 05:29:48 Normally we don't set the state in the bound callb
ramant (doing other things) 2012/12/29 03:02:20 Done.
880 read_pending_ = false;
881 bytes_read_ = 0;
Ryan Sleevi 2012/12/27 05:29:48 My primary reason for suggesting the loop shufflin
882 DoLoop(bytes_read);
816 } 883 }
817 884
818 void SpdySession::OnWriteComplete(int result) { 885 void SpdySession::OnWriteComplete(int result) {
819 DCHECK(write_pending_); 886 DCHECK(write_pending_);
820 DCHECK(in_flight_write_.size()); 887 DCHECK(in_flight_write_.size());
821 888
822 last_activity_time_ = base::TimeTicks::Now(); 889 last_activity_time_ = base::TimeTicks::Now();
823 write_pending_ = false; 890 write_pending_ = false;
824 891
825 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 892 scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
860 WriteSocketLater(); 927 WriteSocketLater();
861 } else { 928 } else {
862 in_flight_write_.release(); 929 in_flight_write_.release();
863 930
864 // The stream is now errored. Close it down. 931 // The stream is now errored. Close it down.
865 CloseSessionOnError( 932 CloseSessionOnError(
866 static_cast<net::Error>(result), true, "The stream has errored."); 933 static_cast<net::Error>(result), true, "The stream has errored.");
867 } 934 }
868 } 935 }
869 936
870 net::Error SpdySession::ReadSocket() {
871 if (read_pending_)
872 return OK;
873
874 if (state_ == CLOSED) {
875 NOTREACHED();
876 return ERR_UNEXPECTED;
877 }
878
879 CHECK(connection_.get());
880 CHECK(connection_->socket());
881 int bytes_read = connection_->socket()->Read(
882 read_buffer_.get(),
883 kReadBufferSize,
884 base::Bind(&SpdySession::OnReadComplete, base::Unretained(this)));
885 switch (bytes_read) {
886 case 0:
887 // Socket is closed!
888 CloseSessionOnError(ERR_CONNECTION_CLOSED, true, "bytes_read is 0.");
889 return ERR_CONNECTION_CLOSED;
890 case net::ERR_IO_PENDING:
891 // Waiting for data. Nothing to do now.
892 read_pending_ = true;
893 return ERR_IO_PENDING;
894 default:
895 // Data was read, process it.
896 // Schedule the work through the message loop to avoid recursive
897 // callbacks.
898 read_pending_ = true;
899 MessageLoop::current()->PostTask(
900 FROM_HERE,
901 base::Bind(&SpdySession::OnReadComplete,
902 weak_factory_.GetWeakPtr(), bytes_read));
903 break;
904 }
905 return OK;
906 }
907
908 void SpdySession::WriteSocketLater() { 937 void SpdySession::WriteSocketLater() {
909 if (delayed_write_pending_) 938 if (delayed_write_pending_)
910 return; 939 return;
911 940
912 if (state_ < CONNECTED) 941 if (state_ < CONNECTED)
913 return; 942 return;
914 943
915 delayed_write_pending_ = true; 944 delayed_write_pending_ = true;
916 MessageLoop::current()->PostTask( 945 MessageLoop::current()->PostTask(
917 FROM_HERE, 946 FROM_HERE,
(...skipping 1039 matching lines...) Expand 10 before | Expand all | Expand 10 after
1957 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 1986 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
1958 if (!is_secure_) 1987 if (!is_secure_)
1959 return NULL; 1988 return NULL;
1960 SSLClientSocket* ssl_socket = 1989 SSLClientSocket* ssl_socket =
1961 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 1990 reinterpret_cast<SSLClientSocket*>(connection_->socket());
1962 DCHECK(ssl_socket); 1991 DCHECK(ssl_socket);
1963 return ssl_socket; 1992 return ssl_socket;
1964 } 1993 }
1965 1994
1966 } // namespace net 1995 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_session.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698