Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(501)

Unified Diff: net/quic/quic_connection.cc

Issue 12334063: Land recent QUIC changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: more EXPECT_FALSE Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/quic/quic_connection.h ('k') | net/quic/quic_connection_helper_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/quic/quic_connection.cc
diff --git a/net/quic/quic_connection.cc b/net/quic/quic_connection.cc
index 8dc19942fc7aa40d77fe24bb6bb632be505d70cc..c04c0dd1f15520f058fb1a60988f44d438b009a4 100644
--- a/net/quic/quic_connection.cc
+++ b/net/quic/quic_connection.cc
@@ -17,6 +17,7 @@ using base::StringPiece;
using std::list;
using std::make_pair;
using std::min;
+using std::max;
using std::vector;
using std::set;
using std::string;
@@ -34,7 +35,12 @@ const QuicPacketSequenceNumber kMaxPacketGap = 5000;
// The maximum number of nacks which can be transmitted in a single ack packet
// without exceeding kMaxPacketSize.
-const QuicPacketSequenceNumber kMaxUnackedPackets = 192u;
+// TODO(satyamshekhar): Get rid of magic numbers and move this to protocol.h
+// 16 - Min ack frame size.
+// 16 - Crypto hash for integrity. Not a static value. Use
+// QuicEncrypter::GetMaxPlaintextSize.
+const QuicPacketSequenceNumber kMaxUnackedPackets =
+ (kMaxPacketSize - kPacketHeaderSize - 16 - 16) / kSequenceNumberSize;
// We want to make sure if we get a large nack packet, we don't queue up too
// many packets at once. 10 is arbitrary.
@@ -55,9 +61,7 @@ const int kMaxPacketsToSerializeAtOnce = 6;
// eventually cede. 10 is arbitrary.
const int kMaxPacketsPerRetransmissionAlarm = 10;
-// Named constant for WriteQueuedData().
-const bool kFlush = true;
-// Named constant for WritePacket(), SendOrQueuePacket().
+// Named constant for WritePacket()
const bool kForce = true;
// Named constant for CanWrite().
const bool kIsRetransmission = true;
@@ -69,19 +73,6 @@ bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) {
} // namespace
-QuicConnection::UnackedPacket::UnackedPacket(QuicFrames unacked_frames)
- : frames(unacked_frames) {
-}
-
-QuicConnection::UnackedPacket::UnackedPacket(QuicFrames unacked_frames,
- std::string data)
- : frames(unacked_frames),
- data(data) {
-}
-
-QuicConnection::UnackedPacket::~UnackedPacket() {
-}
-
QuicConnection::QuicConnection(QuicGuid guid,
IPEndPoint address,
QuicConnectionHelperInterface* helper)
@@ -91,17 +82,18 @@ QuicConnection::QuicConnection(QuicGuid guid,
random_generator_(helper->GetRandomGenerator()),
guid_(guid),
peer_address_(address),
- should_send_ack_(false),
- should_send_congestion_feedback_(false),
largest_seen_packet_with_ack_(0),
peer_largest_observed_packet_(0),
+ least_packet_awaited_by_peer_(1),
peer_least_packet_awaiting_ack_(0),
handling_retransmission_timeout_(false),
write_blocked_(false),
debug_visitor_(NULL),
- packet_creator_(guid_, &framer_),
+ packet_creator_(guid_, &framer_, random_generator_),
+ packet_generator_(this, &packet_creator_),
timeout_(QuicTime::Delta::FromMicroseconds(kDefaultTimeoutUs)),
- time_of_last_packet_(clock_->Now()),
+ time_of_last_received_packet_(clock_->ApproximateNow()),
+ time_of_last_sent_packet_(clock_->ApproximateNow()),
congestion_manager_(clock_, kTCP),
connected_(true),
received_truncated_ack_(false),
@@ -109,9 +101,12 @@ QuicConnection::QuicConnection(QuicGuid guid,
helper_->SetConnection(this);
helper_->SetTimeoutAlarm(timeout_);
framer_.set_visitor(this);
+ framer_.set_entropy_calculator(&entropy_manager_);
memset(&last_header_, 0, sizeof(last_header_));
outgoing_ack_.sent_info.least_unacked = 0;
+ outgoing_ack_.sent_info.entropy_hash = 0;
outgoing_ack_.received_info.largest_observed = 0;
+ outgoing_ack_.received_info.entropy_hash = 0;
/*
if (FLAGS_fake_packet_loss_percentage > 0) {
@@ -123,12 +118,6 @@ QuicConnection::QuicConnection(QuicGuid guid,
}
QuicConnection::~QuicConnection() {
- // Call DeleteEnclosedFrames on each QuicPacket because the destructor does
- // not delete enclosed frames.
- for (UnackedPacketMap::iterator it = unacked_packets_.begin();
- it != unacked_packets_.end(); ++it) {
- DeleteEnclosedFrames(it->second);
- }
STLDeleteValues(&unacked_packets_);
STLDeleteValues(&group_map_);
for (QueuedPacketList::iterator it = queued_packets_.begin();
@@ -137,45 +126,14 @@ QuicConnection::~QuicConnection() {
}
}
-void QuicConnection::DeleteEnclosedFrame(QuicFrame* frame) {
- switch (frame->type) {
- case PADDING_FRAME:
- delete frame->padding_frame;
- break;
- case STREAM_FRAME:
- delete frame->stream_frame;
- break;
- case ACK_FRAME:
- delete frame->ack_frame;
- break;
- case CONGESTION_FEEDBACK_FRAME:
- delete frame->congestion_feedback_frame;
- break;
- case RST_STREAM_FRAME:
- delete frame->rst_stream_frame;
- break;
- case CONNECTION_CLOSE_FRAME:
- delete frame->connection_close_frame;
- break;
- case NUM_FRAME_TYPES:
- DCHECK(false) << "Cannot delete type: " << frame->type;
- }
-}
-
-void QuicConnection::DeleteEnclosedFrames(UnackedPacket* unacked) {
- for (QuicFrames::iterator it = unacked->frames.begin();
- it != unacked->frames.end(); ++it) {
- DeleteEnclosedFrame(&(*it));
- }
-}
-
void QuicConnection::OnError(QuicFramer* framer) {
SendConnectionClose(framer->error());
}
void QuicConnection::OnPacket() {
- time_of_last_packet_ = clock_->Now();
- DVLOG(1) << "last packet: " << time_of_last_packet_.ToMicroseconds();
+ time_of_last_received_packet_ = clock_->Now();
+ DVLOG(1) << "time of last received packet: "
+ << time_of_last_received_packet_.ToMicroseconds();
// TODO(alyssar, rch) handle migration!
self_address_ = last_self_address_;
@@ -213,11 +171,12 @@ bool QuicConnection::OnPacketHeader(const QuicPacketHeader& header) {
// If this packet has already been seen, or that the sender
// has told us will not be retransmitted, then stop processing the packet.
- if (!outgoing_ack_.received_info.IsAwaitingPacket(
- header.packet_sequence_number)) {
+ if (!IsAwaitingPacket(outgoing_ack_.received_info,
+ header.packet_sequence_number)) {
return false;
}
+ DVLOG(1) << "Received packet header: " << header;
last_header_ = header;
return true;
}
@@ -239,7 +198,7 @@ void QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) {
if (debug_visitor_) {
debug_visitor_->OnAckFrame(incoming_ack);
}
- DVLOG(1) << "Ack packet: " << incoming_ack;
+ DVLOG(1) << "OnAckFrame: " << incoming_ack;
if (last_header_.packet_sequence_number <= largest_seen_packet_with_ack_) {
DLOG(INFO) << "Received an old ack frame: ignoring";
@@ -252,19 +211,28 @@ void QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) {
return;
}
+ // TODO(satyamshekhar): Not true if missing_packets.size() was actually
+ // kMaxUnackedPackets. This can result in a dead connection if all the
+ // missing packets get lost during retransmission. Now the new packets(or the
+ // older packets) will not be retransmitted due to RTO
+ // since received_truncated_ack_ is true and their sequence_number is >
+ // peer_largest_observed_packet. Fix either by resetting it in
+ // MaybeRetransmitPacketForRTO or keeping an explicit flag for ack truncation.
received_truncated_ack_ =
incoming_ack.received_info.missing_packets.size() >= kMaxUnackedPackets;
UpdatePacketInformationReceivedByPeer(incoming_ack);
UpdatePacketInformationSentByPeer(incoming_ack);
- congestion_manager_.OnIncomingAckFrame(incoming_ack);
+ congestion_manager_.OnIncomingAckFrame(incoming_ack,
+ time_of_last_received_packet_);
// Now the we have received an ack, we might be able to send queued packets.
if (queued_packets_.empty()) {
return;
}
- QuicTime::Delta delay = congestion_manager_.TimeUntilSend(false);
+ QuicTime::Delta delay = congestion_manager_.TimeUntilSend(
+ time_of_last_received_packet_, false);
if (delay.IsZero()) {
helper_->UnregisterSendAlarmIfRegistered();
if (!write_blocked_) {
@@ -280,7 +248,8 @@ void QuicConnection::OnCongestionFeedbackFrame(
if (debug_visitor_) {
debug_visitor_->OnCongestionFeedbackFrame(feedback);
}
- congestion_manager_.OnIncomingQuicCongestionFeedbackFrame(feedback);
+ congestion_manager_.OnIncomingQuicCongestionFeedbackFrame(
+ feedback, time_of_last_received_packet_);
}
bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) {
@@ -308,19 +277,47 @@ bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) {
kMaxUnackedPackets);
if (incoming_ack.sent_info.least_unacked < peer_least_packet_awaiting_ack_) {
- DLOG(INFO) << "Client sent low least_unacked: "
- << incoming_ack.sent_info.least_unacked
- << " vs " << peer_least_packet_awaiting_ack_;
+ DLOG(ERROR) << "Client sent low least_unacked: "
+ << incoming_ack.sent_info.least_unacked
+ << " vs " << peer_least_packet_awaiting_ack_;
// We never process old ack frames, so this number should only increase.
return false;
}
if (incoming_ack.sent_info.least_unacked >
last_header_.packet_sequence_number) {
- DLOG(INFO) << "Client sent least_unacked:"
- << incoming_ack.sent_info.least_unacked
- << " greater than the enclosing packet sequence number:"
- << last_header_.packet_sequence_number;
+ DLOG(ERROR) << "Client sent least_unacked:"
+ << incoming_ack.sent_info.least_unacked
+ << " greater than the enclosing packet sequence number:"
+ << last_header_.packet_sequence_number;
+ return false;
+ }
+
+ if (!incoming_ack.received_info.missing_packets.empty() &&
+ *incoming_ack.received_info.missing_packets.rbegin() >
+ incoming_ack.received_info.largest_observed) {
+ DLOG(ERROR) << "Client sent missing packet: "
+ << *incoming_ack.received_info.missing_packets.rbegin()
+ << " greater than largest observed: "
+ << incoming_ack.received_info.largest_observed;
+ return false;
+ }
+
+ if (!incoming_ack.received_info.missing_packets.empty() &&
+ *incoming_ack.received_info.missing_packets.begin() <
+ least_packet_awaited_by_peer_) {
+ DLOG(ERROR) << "Client sent missing packet: "
+ << *incoming_ack.received_info.missing_packets.begin()
+ << "smaller than least_packet_awaited_by_peer_: "
+ << least_packet_awaited_by_peer_;
+ return false;
+ }
+
+ if (!entropy_manager_.IsValidEntropy(
+ incoming_ack.received_info.largest_observed,
+ incoming_ack.received_info.missing_packets,
+ incoming_ack.received_info.entropy_hash)) {
+ DLOG(ERROR) << "Client sent invalid entropy.";
return false;
}
@@ -329,46 +326,36 @@ bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) {
void QuicConnection::UpdatePacketInformationReceivedByPeer(
const QuicAckFrame& incoming_ack) {
- QuicConnectionVisitorInterface::AckedPackets acked_packets;
+ SequenceNumberSet acked_packets;
// ValidateAck should fail if largest_observed ever shrinks.
DCHECK_LE(peer_largest_observed_packet_,
incoming_ack.received_info.largest_observed);
peer_largest_observed_packet_ = incoming_ack.received_info.largest_observed;
- // Pick an upper bound for the lowest_unacked; we'll then loop through the
- // unacked packets and lower it if necessary.
- QuicPacketSequenceNumber lowest_unacked = min(
- packet_creator_.sequence_number() + 1,
- peer_largest_observed_packet_ + 1);
+ if (incoming_ack.received_info.missing_packets.empty()) {
+ least_packet_awaited_by_peer_ = peer_largest_observed_packet_ + 1;
+ } else {
+ least_packet_awaited_by_peer_ =
+ *(incoming_ack.received_info.missing_packets.begin());
+ }
- int retransmitted_packets = 0;
+ entropy_manager_.ClearSentEntropyBefore(least_packet_awaited_by_peer_ - 1);
+ int retransmitted_packets = 0;
// Go through the packets we have not received an ack for and see if this
// incoming_ack shows they've been seen by the peer.
UnackedPacketMap::iterator it = unacked_packets_.begin();
while (it != unacked_packets_.end()) {
QuicPacketSequenceNumber sequence_number = it->first;
- UnackedPacket* unacked = it->second;
- if (!incoming_ack.received_info.IsAwaitingPacket(sequence_number)) {
+ if (sequence_number > peer_largest_observed_packet_) {
+ break;
+ }
+ RetransmittableFrames* unacked = it->second;
+ if (!IsAwaitingPacket(incoming_ack.received_info, sequence_number)) {
// Packet was acked, so remove it from our unacked packet list.
DVLOG(1) << "Got an ack for " << sequence_number;
- // TODO(rch): This is inefficient and should be sped up.
- // TODO(ianswett): Ensure this inner loop is applicable now that we're
- // always sending packets with new sequence numbers. I believe it may
- // only be relevant for the first crypto connect packet, which doesn't
- // get a new packet sequence number.
- // The acked packet might be queued (if a retransmission had been
- // attempted).
- for (QueuedPacketList::iterator q = queued_packets_.begin();
- q != queued_packets_.end(); ++q) {
- if (q->sequence_number == sequence_number) {
- queued_packets_.erase(q);
- break;
- }
- }
acked_packets.insert(sequence_number);
- DeleteEnclosedFrames(unacked);
delete unacked;
UnackedPacketMap::iterator it_tmp = it;
++it;
@@ -379,70 +366,66 @@ void QuicConnection::UpdatePacketInformationReceivedByPeer(
// seen at the time of this ack being sent out. See if it's our new
// lowest unacked packet.
DVLOG(1) << "still missing " << sequence_number;
- if (sequence_number < lowest_unacked) {
- lowest_unacked = sequence_number;
- }
++it;
- // Determine if this packet is being explicitly nacked and, if so, if it
- // is worth retransmitting.
- if (sequence_number <= peer_largest_observed_packet_) {
- // The peer got packets after this sequence number. This is an explicit
- // nack.
- RetransmissionMap::iterator retransmission_it =
- retransmission_map_.find(sequence_number);
- ++(retransmission_it->second.number_nacks);
- if (retransmission_it->second.number_nacks >=
- kNumberOfNacksBeforeRetransmission &&
- retransmitted_packets < kMaxRetransmissionsPerAck) {
- ++retransmitted_packets;
- DVLOG(1) << "Trying to retransmit packet " << sequence_number
- << " as it has been nacked 3 or more times.";
- // TODO(satyamshekhar): save in a vector and retransmit after the
- // loop.
- RetransmitPacket(sequence_number);
- }
+ // The peer got packets after this sequence number. This is an explicit
+ // nack.
+ RetransmissionMap::iterator retransmission_it =
+ retransmission_map_.find(sequence_number);
+ ++(retransmission_it->second.number_nacks);
+ if (retransmission_it->second.number_nacks >=
+ kNumberOfNacksBeforeRetransmission &&
+ retransmitted_packets < kMaxRetransmissionsPerAck) {
+ ++retransmitted_packets;
+ DVLOG(1) << "Trying to retransmit packet " << sequence_number
+ << " as it has been nacked 3 or more times.";
+ // TODO(satyamshekhar): save in a vector and retransmit after the
+ // loop.
+ RetransmitPacket(sequence_number);
}
}
}
if (acked_packets.size() > 0) {
visitor_->OnAck(acked_packets);
}
- SetLeastUnacked(lowest_unacked);
-}
-
-void QuicConnection::SetLeastUnacked(QuicPacketSequenceNumber lowest_unacked) {
- // If we've gotten an ack for the lowest packet we were waiting on,
- // update that and the list of packets we advertise we will not retransmit.
- if (lowest_unacked > outgoing_ack_.sent_info.least_unacked) {
- outgoing_ack_.sent_info.least_unacked = lowest_unacked;
- }
}
-void QuicConnection::UpdateLeastUnacked(
- QuicPacketSequenceNumber acked_sequence_number) {
- if (acked_sequence_number != outgoing_ack_.sent_info.least_unacked) {
- return;
- }
- QuicPacketSequenceNumber least_unacked =
- packet_creator_.sequence_number() + 1;
- for (UnackedPacketMap::iterator it = unacked_packets_.begin();
- it != unacked_packets_.end(); ++it) {
- least_unacked = min<int>(least_unacked, it->first);
- }
-
- SetLeastUnacked(least_unacked);
+bool QuicConnection::DontWaitForPacketsBefore(
+ QuicPacketSequenceNumber least_unacked) {
+ size_t missing_packets_count =
+ outgoing_ack_.received_info.missing_packets.size();
+ outgoing_ack_.received_info.missing_packets.erase(
+ outgoing_ack_.received_info.missing_packets.begin(),
+ outgoing_ack_.received_info.missing_packets.lower_bound(least_unacked));
+ return missing_packets_count !=
+ outgoing_ack_.received_info.missing_packets.size();
}
void QuicConnection::UpdatePacketInformationSentByPeer(
const QuicAckFrame& incoming_ack) {
- // Make sure we also don't ack any packets lower than the peer's
- // last-packet-awaiting-ack.
+ // ValidateAck() should fail if peer_least_packet_awaiting_ack_ shrinks.
+ DCHECK_LE(peer_least_packet_awaiting_ack_,
+ incoming_ack.sent_info.least_unacked);
if (incoming_ack.sent_info.least_unacked > peer_least_packet_awaiting_ack_) {
- outgoing_ack_.received_info.ClearMissingBefore(
- incoming_ack.sent_info.least_unacked);
+ bool missed_packets =
+ DontWaitForPacketsBefore(incoming_ack.sent_info.least_unacked);
+ if (missed_packets || incoming_ack.sent_info.least_unacked >
+ outgoing_ack_.received_info.largest_observed + 1) {
+ DVLOG(1) << "Updating entropy hashed since we missed packets";
+ // There were some missing packets that we won't ever get now. Recalculate
+ // the received entropy hash.
+ entropy_manager_.RecalculateReceivedEntropyHash(
+ incoming_ack.sent_info.least_unacked,
+ incoming_ack.sent_info.entropy_hash);
+ }
peer_least_packet_awaiting_ack_ = incoming_ack.sent_info.least_unacked;
- }
-
+ // TODO(satyamshekhar): We get this iterator O(logN) in
+ // RecalculateReceivedEntropyHash also.
+ entropy_manager_.ClearReceivedEntropyBefore(
+ peer_least_packet_awaiting_ack_);
+ }
+ DCHECK(outgoing_ack_.received_info.missing_packets.empty() ||
+ *outgoing_ack_.received_info.missing_packets.begin() >=
+ peer_least_packet_awaiting_ack_);
// Possibly close any FecGroups which are now irrelevant
CloseFecGroupsBefore(incoming_ack.sent_info.least_unacked + 1);
}
@@ -450,7 +433,8 @@ void QuicConnection::UpdatePacketInformationSentByPeer(
void QuicConnection::OnFecData(const QuicFecData& fec) {
DCHECK_NE(0u, last_header_.fec_group);
QuicFecGroup* group = GetFecGroup();
- group->UpdateFec(last_header_.packet_sequence_number, fec);
+ group->UpdateFec(last_header_.packet_sequence_number,
+ last_header_.fec_entropy_flag, fec);
}
void QuicConnection::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
@@ -472,22 +456,31 @@ void QuicConnection::OnConnectionCloseFrame(
CloseConnection(frame.error_code, true);
}
+void QuicConnection::OnGoAwayFrame(const QuicGoAwayFrame& frame) {
+ DLOG(INFO) << "Go away received with error "
+ << QuicUtils::ErrorToString(frame.error_code)
+ << " and reason:" << frame.reason_phrase;
+ visitor_->OnGoAway(frame);
+}
+
void QuicConnection::OnPacketComplete() {
+ // TODO(satyamshekhar): Don't do anything if this packet closed the
+ // connection.
if (!last_packet_revived_) {
DLOG(INFO) << "Got packet " << last_header_.packet_sequence_number
<< " with " << last_stream_frames_.size()
<< " stream frames for " << last_header_.public_header.guid;
congestion_manager_.RecordIncomingPacket(
last_size_, last_header_.packet_sequence_number,
- clock_->Now(), last_packet_revived_);
+ time_of_last_received_packet_, last_packet_revived_);
} else {
DLOG(INFO) << "Got revived packet with " << last_stream_frames_.size()
<< " frames.";
}
- if (last_stream_frames_.empty() ||
- visitor_->OnPacket(self_address_, peer_address_,
- last_header_, last_stream_frames_)) {
+ if ((last_stream_frames_.empty() ||
+ visitor_->OnPacket(self_address_, peer_address_,
+ last_header_, last_stream_frames_))) {
RecordPacketReceived(last_header_);
}
@@ -495,6 +488,14 @@ void QuicConnection::OnPacketComplete() {
last_stream_frames_.clear();
}
+QuicAckFrame* QuicConnection::CreateAckFrame() {
+ return new QuicAckFrame(outgoing_ack_);
+}
+
+QuicCongestionFeedbackFrame* QuicConnection::CreateFeedbackFrame() {
+ return new QuicCongestionFeedbackFrame(outgoing_congestion_feedback_);
+}
+
void QuicConnection::MaybeSendAckInResponseToPacket() {
if (send_ack_in_response_to_packet_) {
SendAck();
@@ -507,62 +508,16 @@ void QuicConnection::MaybeSendAckInResponseToPacket() {
}
QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id,
- StringPiece data,
- QuicStreamOffset offset,
- bool fin) {
- size_t total_bytes_consumed = 0;
- bool fin_consumed = false;
-
- while (queued_packets_.empty()) {
- packet_creator_.MaybeStartFEC();
- QuicFrame frame;
- size_t bytes_consumed = packet_creator_.CreateStreamFrame(
- id, data, offset, fin, &frame);
- bool success = packet_creator_.AddFrame(frame);
- DCHECK(success);
-
- total_bytes_consumed += bytes_consumed;
- offset += bytes_consumed;
- fin_consumed = fin && bytes_consumed == data.size();
- data.remove_prefix(bytes_consumed);
-
- // TODO(ianswett): Currently this does not pack stream data together,
- // because SendStreamData does not know if there are more streams to write.
- // TODO(ianswett): Restore packet reordering.
- SendOrQueueCurrentPacket();
-
- if (packet_creator_.ShouldSendFec(false)) {
- PacketPair fec_pair = packet_creator_.SerializeFec();
- // Never retransmit FEC packets.
- SendOrQueuePacket(fec_pair.first, fec_pair.second, !kForce);
- }
-
- if (data.empty()) {
- // We're done writing the data. Exit the loop.
- // We don't make this a precondition because we could have 0 bytes of data
- // if we're simply writing a fin.
- break;
- }
- }
- // Ensure the FEC group is closed at the end of this method.
- if (packet_creator_.ShouldSendFec(true)) {
- PacketPair fec_pair = packet_creator_.SerializeFec();
- // Never retransmit FEC packets.
- SendOrQueuePacket(fec_pair.first, fec_pair.second, !kForce);
- }
- return QuicConsumedData(total_bytes_consumed, fin_consumed);
+ base::StringPiece data,
+ QuicStreamOffset offset,
+ bool fin) {
+ return packet_generator_.ConsumeData(id, data, offset, fin);
}
void QuicConnection::SendRstStream(QuicStreamId id,
- QuicErrorCode error,
- QuicStreamOffset offset) {
- queued_control_frames_.push_back(QuicFrame(
- new QuicRstStreamFrame(id, offset, error)));
-
- // Try to write immediately if possible.
- if (CanWrite(!kIsRetransmission)) {
- WriteQueuedData(kFlush);
- }
+ QuicErrorCode error) {
+ packet_generator_.AddControlFrame(
+ QuicFrame(new QuicRstStreamFrame(id, error)));
}
void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address,
@@ -580,25 +535,24 @@ void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address,
}
bool QuicConnection::OnCanWrite() {
+ LOG(INFO) << "here!!!";
write_blocked_ = false;
- WriteQueuedData(!kFlush);
+ WriteQueuedPackets();
- // Ensure there's enough room for a StreamFrame before calling the visitor.
- if (packet_creator_.BytesFree() <= kMinStreamFrameLength) {
- SendOrQueueCurrentPacket();
- }
-
- // If we've sent everything we had queued and we're still not blocked, let the
- // visitor know it can write more.
+ // Sending queued packets may have caused the socket to become write blocked,
+ // or the congestion manager to prohibit sending. If we've sent everything
+ // we had queued and we're still not blocked, let the visitor know it can
+ // write more.
+ // TODO(rch): shouldn't this be "if (CanWrite(false))"
if (!write_blocked_) {
+ packet_generator_.StartBatchOperations();
bool all_bytes_written = visitor_->OnCanWrite();
- // If the latest write caused a socket-level blockage, return false: we will
- // be rescheduled by the kernel.
- if (write_blocked_) {
- return false;
- }
- if (!all_bytes_written && !helper_->IsSendAlarmSet()) {
+ packet_generator_.FinishBatchOperations();
+
+ // After the visitor writes, it may have caused the socket to become write
+ // blocked or the congestion manager to prohibit sending, so check again.
+ if (!write_blocked_ && !all_bytes_written && !helper_->IsSendAlarmSet()) {
// We're not write blocked, but some stream didn't write out all of its
// bytes. Register for 'immediate' resumption so we'll keep writing after
// other quic connections have had a chance to use the socket.
@@ -606,20 +560,12 @@ bool QuicConnection::OnCanWrite() {
}
}
- // If a write can still be performed, ensure there are no pending frames,
- // even if they didn't fill a packet.
- if (packet_creator_.HasPendingFrames() && CanWrite(!kIsRetransmission)) {
- SendOrQueueCurrentPacket();
- }
-
return !write_blocked_;
}
-bool QuicConnection::WriteQueuedData(bool flush) {
+bool QuicConnection::WriteQueuedPackets() {
DCHECK(!write_blocked_);
- DCHECK(!packet_creator_.HasPendingFrames());
- // Send all queued packets first.
size_t num_queued_packets = queued_packets_.size() + 1;
QueuedPacketList::iterator packet_iterator = queued_packets_.begin();
while (!write_blocked_ && !helper_->IsSendAlarmSet() &&
@@ -633,52 +579,37 @@ bool QuicConnection::WriteQueuedData(bool flush) {
packet_iterator->packet, !kForce)) {
packet_iterator = queued_packets_.erase(packet_iterator);
} else {
- // TODO(ianswett): Why not break or return false here?
+ // Continue, because some queued packets may still be writable.
++packet_iterator;
}
}
- if (write_blocked_) {
- return false;
- }
-
- while ((!queued_control_frames_.empty() || should_send_ack_ ||
- should_send_congestion_feedback_) && CanWrite(!kIsRetransmission)) {
- bool full_packet = false;
- if (!queued_control_frames_.empty()) {
- full_packet = !packet_creator_.AddFrame(queued_control_frames_.back());
- if (!full_packet) {
- queued_control_frames_.pop_back();
- }
- } else if (should_send_ack_) {
- full_packet = !packet_creator_.AddFrame(QuicFrame(&outgoing_ack_));
- if (!full_packet) {
- should_send_ack_ = false;
- }
- } else if (should_send_congestion_feedback_) {
- full_packet = !packet_creator_.AddFrame(
- QuicFrame(&outgoing_congestion_feedback_));
- if (!full_packet) {
- should_send_congestion_feedback_ = false;
- }
- }
-
- if (full_packet) {
- SendOrQueueCurrentPacket();
- }
- }
-
- if (flush && packet_creator_.HasPendingFrames()) {
- SendOrQueueCurrentPacket();
- }
-
return !write_blocked_;
}
void QuicConnection::RecordPacketReceived(const QuicPacketHeader& header) {
+ DLOG(INFO) << "Recording received packet: " << header.packet_sequence_number;
QuicPacketSequenceNumber sequence_number = header.packet_sequence_number;
- DCHECK(outgoing_ack_.received_info.IsAwaitingPacket(sequence_number));
- outgoing_ack_.received_info.RecordReceived(sequence_number);
+ DCHECK(IsAwaitingPacket(outgoing_ack_.received_info, sequence_number));
+
+ InsertMissingPacketsBetween(
+ &outgoing_ack_.received_info,
+ max(outgoing_ack_.received_info.largest_observed + 1,
+ peer_least_packet_awaiting_ack_),
+ header.packet_sequence_number);
+
+ if (outgoing_ack_.received_info.largest_observed >
+ header.packet_sequence_number) {
+ // We've gotten one of the out of order packets - remove it from our
+ // "missing packets" list.
+ DVLOG(1) << "Removing " << sequence_number << " from missing list";
+ outgoing_ack_.received_info.missing_packets.erase(sequence_number);
+ }
+ outgoing_ack_.received_info.largest_observed = max(
+ outgoing_ack_.received_info.largest_observed,
+ header.packet_sequence_number);
+ entropy_manager_.RecordReceivedPacketEntropyHash(sequence_number,
+ header.entropy_hash);
}
bool QuicConnection::MaybeRetransmitPacketForRTO(
@@ -688,7 +619,7 @@ bool QuicConnection::MaybeRetransmitPacketForRTO(
if (!ContainsKey(unacked_packets_, sequence_number)) {
DVLOG(2) << "alarm fired for " << sequence_number
- << " but it has been acked or already retransmitted with "
+ << " but it has been acked or already retransmitted with"
<< " different sequence number.";
// So no extra delay is added for this packet.
return true;
@@ -719,25 +650,29 @@ void QuicConnection::RetransmitPacket(
// ignored by MaybeRetransmitPacketForRTO.
DCHECK(unacked_it != unacked_packets_.end());
DCHECK(retransmission_it != retransmission_map_.end());
- UnackedPacket* unacked = unacked_it->second;
+ RetransmittableFrames* unacked = unacked_it->second;
// TODO(ianswett): Never change the sequence number of the connect packet.
// Re-packetize the frames with a new sequence number for retransmission.
// Retransmitted data packets do not use FEC, even when it's enabled.
- PacketPair packetpair = packet_creator_.SerializeAllFrames(unacked->frames);
- RetransmissionInfo retransmission_info(packetpair.first);
+ SerializedPacket serialized_packet =
+ packet_creator_.SerializeAllFrames(unacked->frames());
+ RetransmissionInfo retransmission_info(serialized_packet.sequence_number);
retransmission_info.number_retransmissions =
retransmission_it->second.number_retransmissions + 1;
- retransmission_map_.insert(make_pair(packetpair.first, retransmission_info));
+ retransmission_map_.insert(make_pair(serialized_packet.sequence_number,
+ retransmission_info));
// Remove info with old sequence number.
unacked_packets_.erase(unacked_it);
retransmission_map_.erase(retransmission_it);
DVLOG(1) << "Retransmitting unacked packet " << sequence_number << " as "
- << packetpair.first;
- unacked_packets_.insert(make_pair(packetpair.first, unacked));
- // Make sure if this was our least unacked packet, that we update our
- // outgoing ack. If this wasn't the least unacked, this is a no-op.
- UpdateLeastUnacked(sequence_number);
- SendOrQueuePacket(packetpair.first, packetpair.second, !kForce);
+ << serialized_packet.sequence_number;
+ DCHECK(unacked_packets_.empty() ||
+ unacked_packets_.rbegin()->first < serialized_packet.sequence_number);
+ unacked_packets_.insert(make_pair(serialized_packet.sequence_number,
+ unacked));
+ SendOrQueuePacket(serialized_packet.sequence_number,
+ serialized_packet.packet,
+ serialized_packet.entropy_hash);
}
bool QuicConnection::CanWrite(bool is_retransmission) {
@@ -746,7 +681,8 @@ bool QuicConnection::CanWrite(bool is_retransmission) {
if (write_blocked_ || helper_->IsSendAlarmSet()) {
return false;
}
- QuicTime::Delta delay = congestion_manager_.TimeUntilSend(is_retransmission);
+ QuicTime::Delta delay = congestion_manager_.TimeUntilSend(clock_->Now(),
+ is_retransmission);
// If the scheduler requires a delay, then we can not send this packet now.
if (!delay.IsZero() && !delay.IsInfinite()) {
// TODO(pwestin): we need to handle delay.IsInfinite() separately.
@@ -774,8 +710,10 @@ void QuicConnection::MaybeSetupRetransmission(
RetransmissionInfo retransmission_info = it->second;
QuicTime::Delta retransmission_delay =
congestion_manager_.GetRetransmissionDelay(
+ unacked_packets_.size(),
retransmission_info.number_retransmissions);
- retransmission_info.scheduled_time = clock_->Now().Add(retransmission_delay);
+ retransmission_info.scheduled_time =
+ clock_->ApproximateNow().Add(retransmission_delay);
retransmission_timeouts_.push(retransmission_info);
// Do not set the retransmisson alarm if we're already handling the
@@ -784,13 +722,8 @@ void QuicConnection::MaybeSetupRetransmission(
if (!handling_retransmission_timeout_) {
helper_->SetRetransmissionAlarm(retransmission_delay);
}
-
- // The second case should never happen in the real world, but does here
- // because we sometimes send out of order to validate corner cases.
- if (outgoing_ack_.sent_info.least_unacked == 0 ||
- sequence_number < outgoing_ack_.sent_info.least_unacked) {
- outgoing_ack_.sent_info.least_unacked = sequence_number;
- }
+ // TODO(satyamshekhar): restore pacekt reordering with Ian's TODO in
+ // SendStreamData().
}
bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number,
@@ -811,23 +744,30 @@ bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number,
return false;
}
- scoped_ptr<QuicEncryptedPacket> encrypted(framer_.EncryptPacket(*packet));
- DLOG(INFO) << "Sending packet : "
+ scoped_ptr<QuicEncryptedPacket> encrypted(
+ framer_.EncryptPacket(sequence_number, *packet));
+ DLOG(INFO) << "Sending packet number " << sequence_number << " : "
<< (packet->is_fec_packet() ? "FEC " :
(ContainsKey(retransmission_map_, sequence_number) ?
- "data bearing " : " ack only "))
- << "packet " << sequence_number;
+ "data bearing " : " ack only "));
+
DCHECK(encrypted->length() <= kMaxPacketSize)
<< "Packet " << sequence_number << " will not be read; too large: "
<< packet->length() << " " << encrypted->length() << " "
<< outgoing_ack_;
int error;
+ QuicTime now = clock_->Now();
int rv = helper_->WritePacketToWire(*encrypted, &error);
if (rv == -1 && error == ERR_IO_PENDING) {
+ // TODO(satyashekhar): It might be more efficient (fewer system calls), if
+ // all connections share this variable i.e this becomes a part of
+ // PacketWriterInterface.
write_blocked_ = true;
return false;
}
+ time_of_last_sent_packet_ = now;
+ DVLOG(1) << "time of last sent packet: " << now.ToMicroseconds();
// TODO(wtc): Is it correct to continue if the write failed.
// Set the retransmit alarm only when we have sent the packet to the client
@@ -835,44 +775,36 @@ bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number,
// an entry to retransmission_timeout_ every time we attempt a write.
MaybeSetupRetransmission(sequence_number);
- time_of_last_packet_ = clock_->Now();
- DVLOG(1) << "last packet: " << time_of_last_packet_.ToMicroseconds();
-
- congestion_manager_.SentPacket(sequence_number, packet->length(),
+ congestion_manager_.SentPacket(sequence_number, now, packet->length(),
is_retransmission);
delete packet;
return true;
}
-void QuicConnection::SendOrQueueCurrentPacket() {
- QuicFrames retransmittable_frames;
- PacketPair pair = packet_creator_.SerializePacket(&retransmittable_frames);
- const bool should_retransmit = !retransmittable_frames.empty();
- if (should_retransmit) {
- UnackedPacket* unacked = new UnackedPacket(retransmittable_frames);
- for (size_t i = 0; i < retransmittable_frames.size(); ++i) {
- if (retransmittable_frames[i].type == STREAM_FRAME) {
- DCHECK(unacked->data.empty());
- // Make an owned copy of the StringPiece.
- unacked->data =
- retransmittable_frames[i].stream_frame->data.as_string();
- // Ensure the frame's StringPiece points to the owned copy of the data.
- retransmittable_frames[i].stream_frame->data =
- StringPiece(unacked->data);
- }
- }
- unacked_packets_.insert(make_pair(pair.first, unacked));
+bool QuicConnection::OnSerializedPacket(
+ const SerializedPacket& serialized_packet) {
+ if (serialized_packet.retransmittable_frames != NULL) {
+ DCHECK(unacked_packets_.empty() ||
+ unacked_packets_.rbegin()->first <
+ serialized_packet.sequence_number);
+ unacked_packets_.insert(
+ make_pair(serialized_packet.sequence_number,
+ serialized_packet.retransmittable_frames));
// All unacked packets might be retransmitted.
- retransmission_map_.insert(make_pair(pair.first,
- RetransmissionInfo(pair.first)));
+ retransmission_map_.insert(
+ make_pair(serialized_packet.sequence_number,
+ RetransmissionInfo(serialized_packet.sequence_number)));
}
- SendOrQueuePacket(pair.first, pair.second, !kForce);
+ return SendOrQueuePacket(serialized_packet.sequence_number,
+ serialized_packet.packet,
+ serialized_packet.entropy_hash);
}
bool QuicConnection::SendOrQueuePacket(QuicPacketSequenceNumber sequence_number,
QuicPacket* packet,
- bool force) {
- if (!WritePacket(sequence_number, packet, force)) {
+ QuicPacketEntropyHash entropy_hash) {
+ entropy_manager_.RecordSentPacketEntropyHash(sequence_number, entropy_hash);
+ if (!WritePacket(sequence_number, packet, !kForce)) {
queued_packets_.push_back(QueuedPacket(sequence_number, packet));
return false;
}
@@ -888,28 +820,39 @@ bool QuicConnection::ShouldSimulateLostPacket() {
*/
}
-void QuicConnection::SendAck() {
- helper_->ClearAckAlarm();
-
- if (!ContainsKey(unacked_packets_, outgoing_ack_.sent_info.least_unacked)) {
- // At some point, all packets were acked, and we set least_unacked to a
- // packet we will not retransmit. Make sure we update it.
- UpdateLeastUnacked(outgoing_ack_.sent_info.least_unacked);
+void QuicConnection::UpdateOutgoingAck() {
+ if (!unacked_packets_.empty()) {
+ outgoing_ack_.sent_info.least_unacked = unacked_packets_.begin()->first;
+ } else {
+ // If there are no unacked packets, set the least unacked packet to
+ // sequence_number() + 1 since that will be the sequence number of this
+ // ack packet whenever it is sent.
+ outgoing_ack_.sent_info.least_unacked =
+ packet_creator_.sequence_number() + 1;
}
+ outgoing_ack_.sent_info.entropy_hash = entropy_manager_.SentEntropyHash(
+ outgoing_ack_.sent_info.least_unacked - 1);
+ outgoing_ack_.received_info.entropy_hash =
+ entropy_manager_.ReceivedEntropyHash(
+ outgoing_ack_.received_info.largest_observed);
+}
- DVLOG(1) << "Sending ack " << outgoing_ack_;
-
- should_send_ack_ = true;
+void QuicConnection::SendAck() {
+ helper_->ClearAckAlarm();
+ UpdateOutgoingAck();
+ DVLOG(1) << "Sending ack: " << outgoing_ack_;
+ // TODO(rch): delay this until the CreateFeedbackFrame
+ // method is invoked. This requires changes SetShouldSendAck
+ // to be a no-arg method, and re-jiggering its implementation.
+ bool send_feedback = false;
if (congestion_manager_.GenerateCongestionFeedback(
&outgoing_congestion_feedback_)) {
DVLOG(1) << "Sending feedback " << outgoing_congestion_feedback_;
- should_send_congestion_feedback_ = true;
- }
- // Try to write immediately if possible.
- if (CanWrite(!kIsRetransmission)) {
- WriteQueuedData(kFlush);
+ send_feedback = true;
}
+
+ packet_generator_.SetShouldSendAck(send_feedback);
}
QuicTime QuicConnection::OnRetransmissionTimeout() {
@@ -926,7 +869,7 @@ QuicTime QuicConnection::OnRetransmissionTimeout() {
!retransmission_timeouts_.empty(); ++i) {
RetransmissionInfo retransmission_info = retransmission_timeouts_.top();
DCHECK(retransmission_info.scheduled_time.IsInitialized());
- if (retransmission_info.scheduled_time > clock_->Now()) {
+ if (retransmission_info.scheduled_time > clock_->ApproximateNow()) {
break;
}
retransmission_timeouts_.pop();
@@ -934,7 +877,7 @@ QuicTime QuicConnection::OnRetransmissionTimeout() {
DLOG(INFO) << "MaybeRetransmitPacketForRTO failed: "
<< "adding an extra delay for "
<< retransmission_info.sequence_number;
- retransmission_info.scheduled_time = clock_->Now().Add(
+ retransmission_info.scheduled_time = clock_->ApproximateNow().Add(
congestion_manager_.DefaultRetransmissionTime());
retransmission_timeouts_.push(retransmission_info);
}
@@ -960,8 +903,9 @@ void QuicConnection::MaybeProcessRevivedPacket() {
char revived_payload[kMaxPacketSize];
size_t len = group->Revive(&revived_header, revived_payload, kMaxPacketSize);
revived_header.public_header.guid = guid_;
- revived_header.public_header.flags = PACKET_PUBLIC_FLAGS_NONE;
- revived_header.private_flags = PACKET_PRIVATE_FLAGS_NONE;
+ revived_header.public_header.version_flag = false;
+ revived_header.public_header.reset_flag = false;
+ revived_header.fec_flag = false;
revived_header.fec_group = kNoFecOffset;
group_map_.erase(last_header_.fec_group);
delete group;
@@ -971,7 +915,7 @@ void QuicConnection::MaybeProcessRevivedPacket() {
debug_visitor_->OnRevivedPacket(revived_header,
StringPiece(revived_payload, len));
}
- framer_.ProcessRevivedPacket(revived_header,
+ framer_.ProcessRevivedPacket(&revived_header,
StringPiece(revived_payload, len));
}
@@ -991,19 +935,25 @@ void QuicConnection::SendConnectionClose(QuicErrorCode error) {
SendConnectionCloseWithDetails(error, string());
}
-void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error,
- const string& details) {
+void QuicConnection::SendConnectionClosePacket(QuicErrorCode error,
+ const string& details) {
DLOG(INFO) << "Force closing with error " << QuicUtils::ErrorToString(error)
<< " (" << error << ")";
QuicConnectionCloseFrame frame;
frame.error_code = error;
frame.error_details = details;
+ UpdateOutgoingAck();
frame.ack_frame = outgoing_ack_;
- PacketPair packetpair = packet_creator_.CloseConnection(&frame);
- // There's no point in retransmitting/queueing this: we're closing the
- // connection.
- WritePacket(packetpair.first, packetpair.second, kForce);
+ SerializedPacket serialized_packet =
+ packet_creator_.SerializeConnectionClose(&frame);
+ SendOrQueuePacket(serialized_packet.sequence_number, serialized_packet.packet,
+ serialized_packet.entropy_hash);
+}
+
+void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error,
+ const string& details) {
+ SendConnectionClosePacket(error, details);
CloseConnection(error, false);
}
@@ -1014,6 +964,15 @@ void QuicConnection::CloseConnection(QuicErrorCode error, bool from_peer) {
visitor_->ConnectionClose(error, from_peer);
}
+void QuicConnection::SendGoAway(QuicErrorCode error,
+ QuicStreamId last_good_stream_id,
+ const string& reason) {
+ DLOG(INFO) << "Going away with error " << QuicUtils::ErrorToString(error)
+ << " (" << error << ")";
+ packet_generator_.AddControlFrame(
+ QuicFrame(new QuicGoAwayFrame(error, last_good_stream_id, reason)));
+}
+
void QuicConnection::CloseFecGroupsBefore(
QuicPacketSequenceNumber sequence_number) {
FecGroupMap::iterator it = group_map_.begin();
@@ -1036,14 +995,16 @@ void QuicConnection::CloseFecGroupsBefore(
}
bool QuicConnection::HasQueuedData() const {
- return !queued_packets_.empty() || should_send_ack_ ||
- should_send_congestion_feedback_;
+ return !queued_packets_.empty() || packet_generator_.HasQueuedData();
}
bool QuicConnection::CheckForTimeout() {
- QuicTime now = clock_->Now();
- QuicTime::Delta delta = now.Subtract(time_of_last_packet_);
- DVLOG(1) << "last_packet " << time_of_last_packet_.ToMicroseconds()
+ QuicTime now = clock_->ApproximateNow();
+ QuicTime time_of_last_packet = std::max(time_of_last_received_packet_,
+ time_of_last_sent_packet_);
+
+ QuicTime::Delta delta = now.Subtract(time_of_last_packet);
+ DVLOG(1) << "last packet " << time_of_last_packet.ToMicroseconds()
<< " now:" << now.ToMicroseconds()
<< " delta:" << delta.ToMicroseconds();
if (delta >= timeout_) {
« no previous file with comments | « net/quic/quic_connection.h ('k') | net/quic/quic_connection_helper_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698