Index: net/quic/quic_connection.cc |
diff --git a/net/quic/quic_connection.cc b/net/quic/quic_connection.cc |
index c81f50b9cb7dc7adb71d5f42391051fda637f1e4..955d82c3eb661be0cd56f974370078545f43fcb2 100644 |
--- a/net/quic/quic_connection.cc |
+++ b/net/quic/quic_connection.cc |
@@ -69,7 +69,7 @@ const int kMaxPacketsPerRetransmissionAlarm = 10; |
const bool kForce = true; |
// Named constant for CanWrite(). |
const bool kIsRetransmission = true; |
-// Named constatn for WritePacket. |
+// Named constant for WritePacket. |
const bool kHasRetransmittableData = true; |
bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) { |
@@ -81,11 +81,13 @@ bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) { |
QuicConnection::QuicConnection(QuicGuid guid, |
IPEndPoint address, |
- QuicConnectionHelperInterface* helper) |
+ QuicConnectionHelperInterface* helper, |
+ bool is_server) |
: helper_(helper), |
framer_(kQuicVersion1, |
QuicDecrypter::Create(kNULL), |
- QuicEncrypter::Create(kNULL)), |
+ QuicEncrypter::Create(kNULL), |
+ is_server), |
clock_(helper->GetClock()), |
random_generator_(helper->GetRandomGenerator()), |
guid_(guid), |
@@ -97,16 +99,21 @@ QuicConnection::QuicConnection(QuicGuid guid, |
handling_retransmission_timeout_(false), |
write_blocked_(false), |
debug_visitor_(NULL), |
- packet_creator_(guid_, &framer_, random_generator_), |
+ packet_creator_(guid_, &framer_, random_generator_, is_server), |
packet_generator_(this, &packet_creator_), |
timeout_(QuicTime::Delta::FromMicroseconds(kDefaultTimeoutUs)), |
time_of_last_received_packet_(clock_->ApproximateNow()), |
time_of_last_sent_packet_(clock_->ApproximateNow()), |
congestion_manager_(clock_, kTCP), |
+ version_negotiation_state_(START_NEGOTIATION), |
+ quic_version_(kQuicVersion1), |
+ is_server_(is_server), |
connected_(true), |
received_truncated_ack_(false), |
send_ack_in_response_to_packet_(false) { |
helper_->SetConnection(this); |
+ // TODO(satyamshekhar): Have a smaller timeout till version is negotiated and |
+ // connection is established (CHLO fully processed). |
helper_->SetTimeoutAlarm(timeout_); |
framer_.set_visitor(this); |
framer_.set_entropy_calculator(&entropy_manager_); |
@@ -134,11 +141,27 @@ QuicConnection::~QuicConnection() { |
} |
} |
+bool QuicConnection::SelectMutualVersion( |
+ const QuicVersionTagList& available_versions) { |
+ // TODO(satyamshekhar): Make this generic. |
+ if (std::find(available_versions.begin(), available_versions.end(), |
+ kQuicVersion1) == available_versions.end()) { |
+ return false; |
+ } |
+ |
+ // Right now we only support kQuicVersion1 so it's okay not to |
+ // update the framer and quic_version_. When start supporting more |
+ // versions please update both. |
+ return true; |
+} |
+ |
void QuicConnection::OnError(QuicFramer* framer) { |
SendConnectionClose(framer->error()); |
} |
void QuicConnection::OnPacket() { |
+ // TODO(satyamshekhar): Validate packet before updating the time |
+ // since it affects the timeout of the connection. |
time_of_last_received_packet_ = clock_->Now(); |
DVLOG(1) << "time of last received packet: " |
<< time_of_last_received_packet_.ToMicroseconds(); |
@@ -156,6 +179,91 @@ void QuicConnection::OnPublicResetPacket( |
CloseConnection(QUIC_PUBLIC_RESET, true); |
} |
+bool QuicConnection::OnProtocolVersionMismatch( |
+ QuicVersionTag received_version) { |
+ // TODO(satyamshekhar): Implement no server state in this mode. |
+ if (!is_server_) { |
+ LOG(DFATAL) << "Framer called OnProtocolVersionMismatch for server. " |
+ << "Closing connection."; |
+ CloseConnection(QUIC_INTERNAL_ERROR, false); |
+ } |
+ DCHECK_NE(quic_version_, received_version); |
+ |
+ if (debug_visitor_) { |
+ debug_visitor_->OnProtocolVersionMismatch(received_version); |
+ } |
+ |
+ switch (version_negotiation_state_) { |
+ case START_NEGOTIATION: |
+ if (!framer_.IsSupportedVersion(received_version)) { |
+ SendVersionNegotiationPacket(); |
+ version_negotiation_state_ = SENT_NEGOTIATION_PACKET; |
+ return false; |
+ } |
+ break; |
+ |
+ case SENT_NEGOTIATION_PACKET: |
+ if (!framer_.IsSupportedVersion(received_version)) { |
+ // Drop packets which can't be parsed due to version mismatch. |
+ return false; |
+ } |
+ break; |
+ |
+ case NEGOTIATED_VERSION: |
+ // Might be old packets that were sent by the client before the version |
+ // was negotiated. Drop these. |
+ return false; |
+ |
+ default: |
+ DCHECK(false); |
+ } |
+ |
+ // Right now we only support kQuicVersion1 so it's okay not to |
+ // update the framer and quic_version_. When start supporting more |
+ // versions please update both. |
+ version_negotiation_state_ = NEGOTIATED_VERSION; |
+ // TODO(satyamshekhar): Store the sequence number of this packet and close the |
+ // connection if we ever received a packet with incorrect version and whose |
+ // sequence number is greater. |
+ return true; |
+} |
+ |
+// Handles version negotiation for client connection. |
+void QuicConnection::OnVersionNegotiationPacket( |
+ const QuicVersionNegotiationPacket& packet) { |
+ if (is_server_) { |
+ LOG(DFATAL) << "Framer parsed VersionNegotiationPacket for server." |
+ << "Closing connection."; |
+ CloseConnection(QUIC_INTERNAL_ERROR, false); |
+ } |
+ if (debug_visitor_) { |
+ debug_visitor_->OnVersionNegotiationPacket(packet); |
+ } |
+ |
+ if (version_negotiation_state_ == NEGOTIATED_VERSION) { |
+ // Possibly a duplicate version negotiation packet. |
+ return; |
+ } |
+ |
+ if (std::find(packet.versions.begin(), |
+ packet.versions.end(), quic_version_) != |
+ packet.versions.end()) { |
+ DLOG(WARNING) << "The server already supports our version. It should have " |
+ << "accepted our connection."; |
+ // Just drop the connection. |
+ CloseConnection(QUIC_INVALID_VERSION_NEGOTIATION_PACKET, false); |
+ return; |
+ } |
+ |
+ if (!SelectMutualVersion(packet.versions)) { |
+ SendConnectionCloseWithDetails(QUIC_INVALID_VERSION, |
+ "no common version found"); |
+ } |
+ |
+ version_negotiation_state_ = NEGOTIATED_VERSION; |
+ RetransmitAllUnackedPackets(); |
+} |
+ |
void QuicConnection::OnRevivedPacket() { |
} |
@@ -163,6 +271,10 @@ bool QuicConnection::OnPacketHeader(const QuicPacketHeader& header) { |
if (debug_visitor_) { |
debug_visitor_->OnPacketHeader(header); |
} |
+ |
+ // Will be decrement below if we fall through to return true; |
+ ++stats_.packets_dropped; |
+ |
if (header.public_header.guid != guid_) { |
DLOG(INFO) << "Ignoring packet from unexpected GUID: " |
<< header.public_header.guid << " instead of " << guid_; |
@@ -184,6 +296,32 @@ bool QuicConnection::OnPacketHeader(const QuicPacketHeader& header) { |
return false; |
} |
+ if (version_negotiation_state_ != NEGOTIATED_VERSION) { |
+ if (is_server_) { |
+ if (!header.public_header.version_flag) { |
+ DLOG(WARNING) << "Got packet without version flag before version " |
+ << "negotiated."; |
+ // Packets should have the version flag till version negotiation is |
+ // done. |
+ CloseConnection(QUIC_INVALID_VERSION, false); |
+ return false; |
+ } else { |
+ DCHECK_EQ(1u, header.public_header.versions.size()); |
+ DCHECK_EQ(header.public_header.versions[0], quic_version_); |
+ version_negotiation_state_ = NEGOTIATED_VERSION; |
+ } |
+ } else { |
+ DCHECK(!header.public_header.version_flag); |
+ // If the client gets a packet without the version flag from the server |
+ // it should stop sending version since the version negotiation is done. |
+ packet_creator_.StopSendingVersion(); |
+ version_negotiation_state_ = NEGOTIATED_VERSION; |
+ } |
+ } |
+ |
+ DCHECK_EQ(NEGOTIATED_VERSION, version_negotiation_state_); |
+ |
+ --stats_.packets_dropped; |
DVLOG(1) << "Received packet header: " << header; |
last_header_ = header; |
return true; |
@@ -239,15 +377,16 @@ void QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) { |
if (queued_packets_.empty()) { |
return; |
} |
+ bool has_retransmittable_data = true; |
QuicTime::Delta delay = congestion_manager_.TimeUntilSend( |
- time_of_last_received_packet_, false); |
+ time_of_last_received_packet_, false, has_retransmittable_data); |
if (delay.IsZero()) { |
helper_->UnregisterSendAlarmIfRegistered(); |
if (!write_blocked_) { |
OnCanWrite(); |
} |
- } else { |
+ } else if (!delay.IsInfinite()) { |
helper_->SetSendAlarm(delay); |
} |
} |
@@ -402,11 +541,11 @@ bool QuicConnection::DontWaitForPacketsBefore( |
QuicPacketSequenceNumber least_unacked) { |
size_t missing_packets_count = |
outgoing_ack_.received_info.missing_packets.size(); |
- outgoing_ack_.received_info.missing_packets.erase( |
+ outgoing_ack_.received_info.missing_packets.erase( |
outgoing_ack_.received_info.missing_packets.begin(), |
outgoing_ack_.received_info.missing_packets.lower_bound(least_unacked)); |
- return missing_packets_count != |
- outgoing_ack_.received_info.missing_packets.size(); |
+ return missing_packets_count != |
+ outgoing_ack_.received_info.missing_packets.size(); |
} |
void QuicConnection::UpdatePacketInformationSentByPeer( |
@@ -516,6 +655,17 @@ void QuicConnection::MaybeSendAckInResponseToPacket() { |
send_ack_in_response_to_packet_ = !send_ack_in_response_to_packet_; |
} |
+void QuicConnection::SendVersionNegotiationPacket() { |
+ QuicVersionTagList supported_versions; |
+ supported_versions.push_back(kQuicVersion1); |
+ QuicEncryptedPacket* encrypted = |
+ packet_creator_.SerializeVersionNegotiationPacket(supported_versions); |
+ // TODO(satyamshekhar): implement zero server state negotiation. |
+ int error; |
+ helper_->WritePacketToWire(*encrypted, &error); |
+ delete encrypted; |
+} |
+ |
QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id, |
base::StringPiece data, |
QuicStreamOffset offset, |
@@ -529,6 +679,14 @@ void QuicConnection::SendRstStream(QuicStreamId id, |
QuicFrame(new QuicRstStreamFrame(id, error))); |
} |
+const QuicConnectionStats& QuicConnection::GetStats() { |
+ // Update rtt and estimated bandwidth. |
+ stats_.rtt = congestion_manager_.SmoothedRtt().ToMicroseconds(); |
+ stats_.estimated_bandwidth = |
+ congestion_manager_.BandwidthEstimate().ToBytesPerSecond(); |
+ return stats_; |
+} |
+ |
void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address, |
const IPEndPoint& peer_address, |
const QuicEncryptedPacket& packet) { |
@@ -539,12 +697,15 @@ void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address, |
last_size_ = packet.length(); |
last_self_address_ = self_address; |
last_peer_address_ = peer_address; |
+ |
+ stats_.bytes_received += packet.length(); |
+ ++stats_.packets_received; |
+ |
framer_.ProcessPacket(packet); |
MaybeProcessRevivedPacket(); |
} |
bool QuicConnection::OnCanWrite() { |
- LOG(INFO) << "here!!!"; |
write_blocked_ = false; |
WriteQueuedPackets(); |
@@ -553,15 +714,14 @@ bool QuicConnection::OnCanWrite() { |
// or the congestion manager to prohibit sending. If we've sent everything |
// we had queued and we're still not blocked, let the visitor know it can |
// write more. |
- // TODO(rch): shouldn't this be "if (CanWrite(false))" |
- if (!write_blocked_) { |
+ if (CanWrite(false, true)) { |
packet_generator_.StartBatchOperations(); |
bool all_bytes_written = visitor_->OnCanWrite(); |
packet_generator_.FinishBatchOperations(); |
// After the visitor writes, it may have caused the socket to become write |
// blocked or the congestion manager to prohibit sending, so check again. |
- if (!write_blocked_ && !all_bytes_written && !helper_->IsSendAlarmSet()) { |
+ if (!write_blocked_ && !all_bytes_written && CanWrite(false, true)) { |
// We're not write blocked, but some stream didn't write out all of its |
// bytes. Register for 'immediate' resumption so we'll keep writing after |
// other quic connections have had a chance to use the socket. |
@@ -577,19 +737,20 @@ bool QuicConnection::WriteQueuedPackets() { |
size_t num_queued_packets = queued_packets_.size() + 1; |
QueuedPacketList::iterator packet_iterator = queued_packets_.begin(); |
- while (!write_blocked_ && !helper_->IsSendAlarmSet() && |
- packet_iterator != queued_packets_.end()) { |
+ while (!write_blocked_ && packet_iterator != queued_packets_.end()) { |
// Ensure that from one iteration of this loop to the next we |
// succeeded in sending a packet so we don't infinitely loop. |
// TODO(rch): clean up and close the connection if we really hit this. |
DCHECK_LT(queued_packets_.size(), num_queued_packets); |
num_queued_packets = queued_packets_.size(); |
if (WritePacket(packet_iterator->sequence_number, |
- packet_iterator->packet, kHasRetransmittableData, |
+ packet_iterator->packet, |
+ packet_iterator->has_retransmittable_data, |
!kForce)) { |
packet_iterator = queued_packets_.erase(packet_iterator); |
} else { |
// Continue, because some queued packets may still be writable. |
+ // This can happen if a retransmit send fail. |
++packet_iterator; |
} |
} |
@@ -618,6 +779,7 @@ void QuicConnection::RecordPacketReceived(const QuicPacketHeader& header) { |
outgoing_ack_.received_info.largest_observed = max( |
outgoing_ack_.received_info.largest_observed, |
header.packet_sequence_number); |
+ // TODO(pwestin): update received_info with time_of_last_received_packet_. |
entropy_manager_.RecordReceivedPacketEntropyHash(sequence_number, |
header.entropy_hash); |
} |
@@ -643,11 +805,19 @@ bool QuicConnection::MaybeRetransmitPacketForRTO( |
sequence_number > peer_largest_observed_packet_) { |
return false; |
} else { |
+ ++stats_.rto_count; |
RetransmitPacket(sequence_number); |
return true; |
} |
} |
+void QuicConnection::RetransmitAllUnackedPackets() { |
+ for (UnackedPacketMap::iterator it = unacked_packets_.begin(); |
+ it != unacked_packets_.end(); ++it) { |
+ RetransmitPacket(it->first); |
+ } |
+} |
+ |
void QuicConnection::RetransmitPacket( |
QuicPacketSequenceNumber sequence_number) { |
UnackedPacketMap::iterator unacked_it = |
@@ -661,6 +831,9 @@ void QuicConnection::RetransmitPacket( |
DCHECK(unacked_it != unacked_packets_.end()); |
DCHECK(retransmission_it != retransmission_map_.end()); |
RetransmittableFrames* unacked = unacked_it->second; |
+ // TODO(pwestin): Need to fix potential issue with FEC and a 1 packet |
+ // congestion window see b/8331807 for details. |
+ congestion_manager_.AbandoningPacket(sequence_number); |
// TODO(ianswett): Never change the sequence number of the connect packet. |
// Re-packetize the frames with a new sequence number for retransmission. |
// Retransmitted data packets do not use FEC, even when it's enabled. |
@@ -686,17 +859,22 @@ void QuicConnection::RetransmitPacket( |
true); |
} |
-bool QuicConnection::CanWrite(bool is_retransmission) { |
+bool QuicConnection::CanWrite(bool is_retransmission, |
+ bool has_retransmittable_data) { |
// TODO(ianswett): If the packet is a retransmit, the current send alarm may |
// be too long. |
if (write_blocked_ || helper_->IsSendAlarmSet()) { |
return false; |
} |
- QuicTime::Delta delay = congestion_manager_.TimeUntilSend(clock_->Now(), |
- is_retransmission); |
+ |
+ QuicTime::Delta delay = congestion_manager_.TimeUntilSend( |
+ clock_->Now(), is_retransmission, has_retransmittable_data); |
+ if (delay.IsInfinite()) { |
+ return false; |
+ } |
+ |
// If the scheduler requires a delay, then we can not send this packet now. |
- if (!delay.IsZero() && !delay.IsInfinite()) { |
- // TODO(pwestin): we need to handle delay.IsInfinite() separately. |
+ if (!delay.IsZero()) { |
helper_->SetSendAlarm(delay); |
return false; |
} |
@@ -752,7 +930,7 @@ bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number, |
bool is_retransmission = IsRetransmission(sequence_number); |
// If we are not forced and we can't write, then simply return false; |
- if (!forced && !CanWrite(is_retransmission)) { |
+ if (!forced && !CanWrite(is_retransmission, has_retransmittable_data)) { |
return false; |
} |
@@ -760,13 +938,13 @@ bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number, |
framer_.EncryptPacket(sequence_number, *packet)); |
DLOG(INFO) << "Sending packet number " << sequence_number << " : " |
<< (packet->is_fec_packet() ? "FEC " : |
- (ContainsKey(retransmission_map_, sequence_number) ? |
- "data bearing " : " ack only ")); |
+ (has_retransmittable_data ? "data bearing " : " ack only ")) |
+ << " Packet length:" << packet->length(); |
DCHECK(encrypted->length() <= kMaxPacketSize) |
<< "Packet " << sequence_number << " will not be read; too large: " |
<< packet->length() << " " << encrypted->length() << " " |
- << outgoing_ack_; |
+ << outgoing_ack_ << " forced: " << (forced ? "yes" : "no"); |
int error; |
QuicTime now = clock_->Now(); |
@@ -788,7 +966,16 @@ bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number, |
MaybeSetupRetransmission(sequence_number); |
congestion_manager_.SentPacket(sequence_number, now, packet->length(), |
- is_retransmission, has_retransmittable_data); |
+ is_retransmission); |
+ |
+ stats_.bytes_sent += encrypted->length(); |
+ ++stats_.packets_sent; |
+ |
+ if (is_retransmission) { |
+ stats_.bytes_retransmitted += encrypted->length(); |
+ ++stats_.packets_retransmitted; |
+ } |
+ |
delete packet; |
return true; |
} |
@@ -821,7 +1008,8 @@ bool QuicConnection::SendOrQueuePacket(QuicPacketSequenceNumber sequence_number, |
entropy_hash); |
if (!WritePacket(sequence_number, packet, has_retransmittable_data, |
!kForce)) { |
- queued_packets_.push_back(QueuedPacket(sequence_number, packet)); |
+ queued_packets_.push_back(QueuedPacket(sequence_number, packet, |
+ has_retransmittable_data)); |
return false; |
} |
return true; |
@@ -931,6 +1119,8 @@ void QuicConnection::MaybeProcessRevivedPacket() { |
debug_visitor_->OnRevivedPacket(revived_header, |
StringPiece(revived_payload, len)); |
} |
+ |
+ ++stats_.packets_revived; |
framer_.ProcessRevivedPacket(&revived_header, |
StringPiece(revived_payload, len)); |
} |
@@ -976,8 +1166,6 @@ void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error, |
} |
void QuicConnection::CloseConnection(QuicErrorCode error, bool from_peer) { |
- // TODO(satyamshekhar): Ask the dispatcher to delete visitor and hence self |
- // if the visitor will always be deleted by closing the connection. |
connected_ = false; |
visitor_->ConnectionClose(error, from_peer); |
} |