Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(229)

Side by Side Diff: net/quic/quic_connection.cc

Issue 12334063: Land recent QUIC changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: more EXPECT_FALSE Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/quic_connection.h ('k') | net/quic/quic_connection_helper_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/quic_connection.h" 5 #include "net/quic/quic_connection.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/stl_util.h" 10 #include "base/stl_util.h"
11 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
12 #include "net/quic/quic_utils.h" 12 #include "net/quic/quic_utils.h"
13 13
14 using base::hash_map; 14 using base::hash_map;
15 using base::hash_set; 15 using base::hash_set;
16 using base::StringPiece; 16 using base::StringPiece;
17 using std::list; 17 using std::list;
18 using std::make_pair; 18 using std::make_pair;
19 using std::min; 19 using std::min;
20 using std::max;
20 using std::vector; 21 using std::vector;
21 using std::set; 22 using std::set;
22 using std::string; 23 using std::string;
23 24
24 namespace net { 25 namespace net {
25 26
26 // TODO(pwestin): kDefaultTimeoutUs is in int64. 27 // TODO(pwestin): kDefaultTimeoutUs is in int64.
27 int32 kNegotiatedTimeoutUs = kDefaultTimeoutUs; 28 int32 kNegotiatedTimeoutUs = kDefaultTimeoutUs;
28 29
29 namespace { 30 namespace {
30 31
31 // The largest gap in packets we'll accept without closing the connection. 32 // The largest gap in packets we'll accept without closing the connection.
32 // This will likely have to be tuned. 33 // This will likely have to be tuned.
33 const QuicPacketSequenceNumber kMaxPacketGap = 5000; 34 const QuicPacketSequenceNumber kMaxPacketGap = 5000;
34 35
35 // The maximum number of nacks which can be transmitted in a single ack packet 36 // The maximum number of nacks which can be transmitted in a single ack packet
36 // without exceeding kMaxPacketSize. 37 // without exceeding kMaxPacketSize.
37 const QuicPacketSequenceNumber kMaxUnackedPackets = 192u; 38 // TODO(satyamshekhar): Get rid of magic numbers and move this to protocol.h
39 // 16 - Min ack frame size.
40 // 16 - Crypto hash for integrity. Not a static value. Use
41 // QuicEncrypter::GetMaxPlaintextSize.
42 const QuicPacketSequenceNumber kMaxUnackedPackets =
43 (kMaxPacketSize - kPacketHeaderSize - 16 - 16) / kSequenceNumberSize;
38 44
39 // We want to make sure if we get a large nack packet, we don't queue up too 45 // We want to make sure if we get a large nack packet, we don't queue up too
40 // many packets at once. 10 is arbitrary. 46 // many packets at once. 10 is arbitrary.
41 const int kMaxRetransmissionsPerAck = 10; 47 const int kMaxRetransmissionsPerAck = 10;
42 48
43 // TCP retransmits after 2 nacks. We allow for a third in case of out-of-order 49 // TCP retransmits after 2 nacks. We allow for a third in case of out-of-order
44 // delivery. 50 // delivery.
45 // TODO(ianswett): Change to match TCP's rule of retransmitting once an ack 51 // TODO(ianswett): Change to match TCP's rule of retransmitting once an ack
46 // at least 3 sequence numbers larger arrives. 52 // at least 3 sequence numbers larger arrives.
47 const size_t kNumberOfNacksBeforeRetransmission = 3; 53 const size_t kNumberOfNacksBeforeRetransmission = 3;
48 54
49 // The maxiumum number of packets we'd like to queue. We may end up queueing 55 // The maxiumum number of packets we'd like to queue. We may end up queueing
50 // more in the case of many control frames. 56 // more in the case of many control frames.
51 // 6 is arbitrary. 57 // 6 is arbitrary.
52 const int kMaxPacketsToSerializeAtOnce = 6; 58 const int kMaxPacketsToSerializeAtOnce = 6;
53 59
54 // Limit the number of packets we send per retransmission-alarm so we 60 // Limit the number of packets we send per retransmission-alarm so we
55 // eventually cede. 10 is arbitrary. 61 // eventually cede. 10 is arbitrary.
56 const int kMaxPacketsPerRetransmissionAlarm = 10; 62 const int kMaxPacketsPerRetransmissionAlarm = 10;
57 63
58 // Named constant for WriteQueuedData(). 64 // Named constant for WritePacket()
59 const bool kFlush = true;
60 // Named constant for WritePacket(), SendOrQueuePacket().
61 const bool kForce = true; 65 const bool kForce = true;
62 // Named constant for CanWrite(). 66 // Named constant for CanWrite().
63 const bool kIsRetransmission = true; 67 const bool kIsRetransmission = true;
64 68
65 bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) { 69 bool Near(QuicPacketSequenceNumber a, QuicPacketSequenceNumber b) {
66 QuicPacketSequenceNumber delta = (a > b) ? a - b : b - a; 70 QuicPacketSequenceNumber delta = (a > b) ? a - b : b - a;
67 return delta <= kMaxPacketGap; 71 return delta <= kMaxPacketGap;
68 } 72 }
69 73
70 } // namespace 74 } // namespace
71 75
72 QuicConnection::UnackedPacket::UnackedPacket(QuicFrames unacked_frames)
73 : frames(unacked_frames) {
74 }
75
76 QuicConnection::UnackedPacket::UnackedPacket(QuicFrames unacked_frames,
77 std::string data)
78 : frames(unacked_frames),
79 data(data) {
80 }
81
82 QuicConnection::UnackedPacket::~UnackedPacket() {
83 }
84
85 QuicConnection::QuicConnection(QuicGuid guid, 76 QuicConnection::QuicConnection(QuicGuid guid,
86 IPEndPoint address, 77 IPEndPoint address,
87 QuicConnectionHelperInterface* helper) 78 QuicConnectionHelperInterface* helper)
88 : helper_(helper), 79 : helper_(helper),
89 framer_(QuicDecrypter::Create(kNULL), QuicEncrypter::Create(kNULL)), 80 framer_(QuicDecrypter::Create(kNULL), QuicEncrypter::Create(kNULL)),
90 clock_(helper->GetClock()), 81 clock_(helper->GetClock()),
91 random_generator_(helper->GetRandomGenerator()), 82 random_generator_(helper->GetRandomGenerator()),
92 guid_(guid), 83 guid_(guid),
93 peer_address_(address), 84 peer_address_(address),
94 should_send_ack_(false),
95 should_send_congestion_feedback_(false),
96 largest_seen_packet_with_ack_(0), 85 largest_seen_packet_with_ack_(0),
97 peer_largest_observed_packet_(0), 86 peer_largest_observed_packet_(0),
87 least_packet_awaited_by_peer_(1),
98 peer_least_packet_awaiting_ack_(0), 88 peer_least_packet_awaiting_ack_(0),
99 handling_retransmission_timeout_(false), 89 handling_retransmission_timeout_(false),
100 write_blocked_(false), 90 write_blocked_(false),
101 debug_visitor_(NULL), 91 debug_visitor_(NULL),
102 packet_creator_(guid_, &framer_), 92 packet_creator_(guid_, &framer_, random_generator_),
93 packet_generator_(this, &packet_creator_),
103 timeout_(QuicTime::Delta::FromMicroseconds(kDefaultTimeoutUs)), 94 timeout_(QuicTime::Delta::FromMicroseconds(kDefaultTimeoutUs)),
104 time_of_last_packet_(clock_->Now()), 95 time_of_last_received_packet_(clock_->ApproximateNow()),
96 time_of_last_sent_packet_(clock_->ApproximateNow()),
105 congestion_manager_(clock_, kTCP), 97 congestion_manager_(clock_, kTCP),
106 connected_(true), 98 connected_(true),
107 received_truncated_ack_(false), 99 received_truncated_ack_(false),
108 send_ack_in_response_to_packet_(false) { 100 send_ack_in_response_to_packet_(false) {
109 helper_->SetConnection(this); 101 helper_->SetConnection(this);
110 helper_->SetTimeoutAlarm(timeout_); 102 helper_->SetTimeoutAlarm(timeout_);
111 framer_.set_visitor(this); 103 framer_.set_visitor(this);
104 framer_.set_entropy_calculator(&entropy_manager_);
112 memset(&last_header_, 0, sizeof(last_header_)); 105 memset(&last_header_, 0, sizeof(last_header_));
113 outgoing_ack_.sent_info.least_unacked = 0; 106 outgoing_ack_.sent_info.least_unacked = 0;
107 outgoing_ack_.sent_info.entropy_hash = 0;
114 outgoing_ack_.received_info.largest_observed = 0; 108 outgoing_ack_.received_info.largest_observed = 0;
109 outgoing_ack_.received_info.entropy_hash = 0;
115 110
116 /* 111 /*
117 if (FLAGS_fake_packet_loss_percentage > 0) { 112 if (FLAGS_fake_packet_loss_percentage > 0) {
118 int32 seed = RandomBase::WeakSeed32(); 113 int32 seed = RandomBase::WeakSeed32();
119 LOG(INFO) << "Seeding packet loss with " << seed; 114 LOG(INFO) << "Seeding packet loss with " << seed;
120 random_.reset(new MTRandom(seed)); 115 random_.reset(new MTRandom(seed));
121 } 116 }
122 */ 117 */
123 } 118 }
124 119
125 QuicConnection::~QuicConnection() { 120 QuicConnection::~QuicConnection() {
126 // Call DeleteEnclosedFrames on each QuicPacket because the destructor does
127 // not delete enclosed frames.
128 for (UnackedPacketMap::iterator it = unacked_packets_.begin();
129 it != unacked_packets_.end(); ++it) {
130 DeleteEnclosedFrames(it->second);
131 }
132 STLDeleteValues(&unacked_packets_); 121 STLDeleteValues(&unacked_packets_);
133 STLDeleteValues(&group_map_); 122 STLDeleteValues(&group_map_);
134 for (QueuedPacketList::iterator it = queued_packets_.begin(); 123 for (QueuedPacketList::iterator it = queued_packets_.begin();
135 it != queued_packets_.end(); ++it) { 124 it != queued_packets_.end(); ++it) {
136 delete it->packet; 125 delete it->packet;
137 } 126 }
138 } 127 }
139 128
140 void QuicConnection::DeleteEnclosedFrame(QuicFrame* frame) {
141 switch (frame->type) {
142 case PADDING_FRAME:
143 delete frame->padding_frame;
144 break;
145 case STREAM_FRAME:
146 delete frame->stream_frame;
147 break;
148 case ACK_FRAME:
149 delete frame->ack_frame;
150 break;
151 case CONGESTION_FEEDBACK_FRAME:
152 delete frame->congestion_feedback_frame;
153 break;
154 case RST_STREAM_FRAME:
155 delete frame->rst_stream_frame;
156 break;
157 case CONNECTION_CLOSE_FRAME:
158 delete frame->connection_close_frame;
159 break;
160 case NUM_FRAME_TYPES:
161 DCHECK(false) << "Cannot delete type: " << frame->type;
162 }
163 }
164
165 void QuicConnection::DeleteEnclosedFrames(UnackedPacket* unacked) {
166 for (QuicFrames::iterator it = unacked->frames.begin();
167 it != unacked->frames.end(); ++it) {
168 DeleteEnclosedFrame(&(*it));
169 }
170 }
171
172 void QuicConnection::OnError(QuicFramer* framer) { 129 void QuicConnection::OnError(QuicFramer* framer) {
173 SendConnectionClose(framer->error()); 130 SendConnectionClose(framer->error());
174 } 131 }
175 132
176 void QuicConnection::OnPacket() { 133 void QuicConnection::OnPacket() {
177 time_of_last_packet_ = clock_->Now(); 134 time_of_last_received_packet_ = clock_->Now();
178 DVLOG(1) << "last packet: " << time_of_last_packet_.ToMicroseconds(); 135 DVLOG(1) << "time of last received packet: "
136 << time_of_last_received_packet_.ToMicroseconds();
179 137
180 // TODO(alyssar, rch) handle migration! 138 // TODO(alyssar, rch) handle migration!
181 self_address_ = last_self_address_; 139 self_address_ = last_self_address_;
182 peer_address_ = last_peer_address_; 140 peer_address_ = last_peer_address_;
183 } 141 }
184 142
185 void QuicConnection::OnPublicResetPacket( 143 void QuicConnection::OnPublicResetPacket(
186 const QuicPublicResetPacket& packet) { 144 const QuicPublicResetPacket& packet) {
187 if (debug_visitor_) { 145 if (debug_visitor_) {
188 debug_visitor_->OnPublicResetPacket(packet); 146 debug_visitor_->OnPublicResetPacket(packet);
(...skipping 17 matching lines...) Expand all
206 if (!Near(header.packet_sequence_number, 164 if (!Near(header.packet_sequence_number,
207 last_header_.packet_sequence_number)) { 165 last_header_.packet_sequence_number)) {
208 DLOG(INFO) << "Packet " << header.packet_sequence_number 166 DLOG(INFO) << "Packet " << header.packet_sequence_number
209 << " out of bounds. Discarding"; 167 << " out of bounds. Discarding";
210 // TODO(alyssar) close the connection entirely. 168 // TODO(alyssar) close the connection entirely.
211 return false; 169 return false;
212 } 170 }
213 171
214 // If this packet has already been seen, or that the sender 172 // If this packet has already been seen, or that the sender
215 // has told us will not be retransmitted, then stop processing the packet. 173 // has told us will not be retransmitted, then stop processing the packet.
216 if (!outgoing_ack_.received_info.IsAwaitingPacket( 174 if (!IsAwaitingPacket(outgoing_ack_.received_info,
217 header.packet_sequence_number)) { 175 header.packet_sequence_number)) {
218 return false; 176 return false;
219 } 177 }
220 178
179 DVLOG(1) << "Received packet header: " << header;
221 last_header_ = header; 180 last_header_ = header;
222 return true; 181 return true;
223 } 182 }
224 183
225 void QuicConnection::OnFecProtectedPayload(StringPiece payload) { 184 void QuicConnection::OnFecProtectedPayload(StringPiece payload) {
226 DCHECK_NE(0u, last_header_.fec_group); 185 DCHECK_NE(0u, last_header_.fec_group);
227 QuicFecGroup* group = GetFecGroup(); 186 QuicFecGroup* group = GetFecGroup();
228 group->Update(last_header_, payload); 187 group->Update(last_header_, payload);
229 } 188 }
230 189
231 void QuicConnection::OnStreamFrame(const QuicStreamFrame& frame) { 190 void QuicConnection::OnStreamFrame(const QuicStreamFrame& frame) {
232 if (debug_visitor_) { 191 if (debug_visitor_) {
233 debug_visitor_->OnStreamFrame(frame); 192 debug_visitor_->OnStreamFrame(frame);
234 } 193 }
235 last_stream_frames_.push_back(frame); 194 last_stream_frames_.push_back(frame);
236 } 195 }
237 196
238 void QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) { 197 void QuicConnection::OnAckFrame(const QuicAckFrame& incoming_ack) {
239 if (debug_visitor_) { 198 if (debug_visitor_) {
240 debug_visitor_->OnAckFrame(incoming_ack); 199 debug_visitor_->OnAckFrame(incoming_ack);
241 } 200 }
242 DVLOG(1) << "Ack packet: " << incoming_ack; 201 DVLOG(1) << "OnAckFrame: " << incoming_ack;
243 202
244 if (last_header_.packet_sequence_number <= largest_seen_packet_with_ack_) { 203 if (last_header_.packet_sequence_number <= largest_seen_packet_with_ack_) {
245 DLOG(INFO) << "Received an old ack frame: ignoring"; 204 DLOG(INFO) << "Received an old ack frame: ignoring";
246 return; 205 return;
247 } 206 }
248 largest_seen_packet_with_ack_ = last_header_.packet_sequence_number; 207 largest_seen_packet_with_ack_ = last_header_.packet_sequence_number;
249 208
250 if (!ValidateAckFrame(incoming_ack)) { 209 if (!ValidateAckFrame(incoming_ack)) {
251 SendConnectionClose(QUIC_INVALID_ACK_DATA); 210 SendConnectionClose(QUIC_INVALID_ACK_DATA);
252 return; 211 return;
253 } 212 }
254 213
214 // TODO(satyamshekhar): Not true if missing_packets.size() was actually
215 // kMaxUnackedPackets. This can result in a dead connection if all the
216 // missing packets get lost during retransmission. Now the new packets(or the
217 // older packets) will not be retransmitted due to RTO
218 // since received_truncated_ack_ is true and their sequence_number is >
219 // peer_largest_observed_packet. Fix either by resetting it in
220 // MaybeRetransmitPacketForRTO or keeping an explicit flag for ack truncation.
255 received_truncated_ack_ = 221 received_truncated_ack_ =
256 incoming_ack.received_info.missing_packets.size() >= kMaxUnackedPackets; 222 incoming_ack.received_info.missing_packets.size() >= kMaxUnackedPackets;
257 223
258 UpdatePacketInformationReceivedByPeer(incoming_ack); 224 UpdatePacketInformationReceivedByPeer(incoming_ack);
259 UpdatePacketInformationSentByPeer(incoming_ack); 225 UpdatePacketInformationSentByPeer(incoming_ack);
260 congestion_manager_.OnIncomingAckFrame(incoming_ack); 226 congestion_manager_.OnIncomingAckFrame(incoming_ack,
227 time_of_last_received_packet_);
261 228
262 // Now the we have received an ack, we might be able to send queued packets. 229 // Now the we have received an ack, we might be able to send queued packets.
263 if (queued_packets_.empty()) { 230 if (queued_packets_.empty()) {
264 return; 231 return;
265 } 232 }
266 233
267 QuicTime::Delta delay = congestion_manager_.TimeUntilSend(false); 234 QuicTime::Delta delay = congestion_manager_.TimeUntilSend(
235 time_of_last_received_packet_, false);
268 if (delay.IsZero()) { 236 if (delay.IsZero()) {
269 helper_->UnregisterSendAlarmIfRegistered(); 237 helper_->UnregisterSendAlarmIfRegistered();
270 if (!write_blocked_) { 238 if (!write_blocked_) {
271 OnCanWrite(); 239 OnCanWrite();
272 } 240 }
273 } else { 241 } else {
274 helper_->SetSendAlarm(delay); 242 helper_->SetSendAlarm(delay);
275 } 243 }
276 } 244 }
277 245
278 void QuicConnection::OnCongestionFeedbackFrame( 246 void QuicConnection::OnCongestionFeedbackFrame(
279 const QuicCongestionFeedbackFrame& feedback) { 247 const QuicCongestionFeedbackFrame& feedback) {
280 if (debug_visitor_) { 248 if (debug_visitor_) {
281 debug_visitor_->OnCongestionFeedbackFrame(feedback); 249 debug_visitor_->OnCongestionFeedbackFrame(feedback);
282 } 250 }
283 congestion_manager_.OnIncomingQuicCongestionFeedbackFrame(feedback); 251 congestion_manager_.OnIncomingQuicCongestionFeedbackFrame(
252 feedback, time_of_last_received_packet_);
284 } 253 }
285 254
286 bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) { 255 bool QuicConnection::ValidateAckFrame(const QuicAckFrame& incoming_ack) {
287 if (incoming_ack.received_info.largest_observed > 256 if (incoming_ack.received_info.largest_observed >
288 packet_creator_.sequence_number()) { 257 packet_creator_.sequence_number()) {
289 DLOG(ERROR) << "Client observed unsent packet:" 258 DLOG(ERROR) << "Client observed unsent packet:"
290 << incoming_ack.received_info.largest_observed << " vs " 259 << incoming_ack.received_info.largest_observed << " vs "
291 << packet_creator_.sequence_number(); 260 << packet_creator_.sequence_number();
292 // We got an error for data we have not sent. Error out. 261 // We got an error for data we have not sent. Error out.
293 return false; 262 return false;
294 } 263 }
295 264
296 if (incoming_ack.received_info.largest_observed < 265 if (incoming_ack.received_info.largest_observed <
297 peer_largest_observed_packet_) { 266 peer_largest_observed_packet_) {
298 DLOG(ERROR) << "Client's largest_observed packet decreased:" 267 DLOG(ERROR) << "Client's largest_observed packet decreased:"
299 << incoming_ack.received_info.largest_observed << " vs " 268 << incoming_ack.received_info.largest_observed << " vs "
300 << peer_largest_observed_packet_; 269 << peer_largest_observed_packet_;
301 // We got an error for data we have not sent. Error out. 270 // We got an error for data we have not sent. Error out.
302 return false; 271 return false;
303 } 272 }
304 273
305 // We can't have too many unacked packets, or our ack frames go over 274 // We can't have too many unacked packets, or our ack frames go over
306 // kMaxPacketSize. 275 // kMaxPacketSize.
307 DCHECK_LE(incoming_ack.received_info.missing_packets.size(), 276 DCHECK_LE(incoming_ack.received_info.missing_packets.size(),
308 kMaxUnackedPackets); 277 kMaxUnackedPackets);
309 278
310 if (incoming_ack.sent_info.least_unacked < peer_least_packet_awaiting_ack_) { 279 if (incoming_ack.sent_info.least_unacked < peer_least_packet_awaiting_ack_) {
311 DLOG(INFO) << "Client sent low least_unacked: " 280 DLOG(ERROR) << "Client sent low least_unacked: "
312 << incoming_ack.sent_info.least_unacked 281 << incoming_ack.sent_info.least_unacked
313 << " vs " << peer_least_packet_awaiting_ack_; 282 << " vs " << peer_least_packet_awaiting_ack_;
314 // We never process old ack frames, so this number should only increase. 283 // We never process old ack frames, so this number should only increase.
315 return false; 284 return false;
316 } 285 }
317 286
318 if (incoming_ack.sent_info.least_unacked > 287 if (incoming_ack.sent_info.least_unacked >
319 last_header_.packet_sequence_number) { 288 last_header_.packet_sequence_number) {
320 DLOG(INFO) << "Client sent least_unacked:" 289 DLOG(ERROR) << "Client sent least_unacked:"
321 << incoming_ack.sent_info.least_unacked 290 << incoming_ack.sent_info.least_unacked
322 << " greater than the enclosing packet sequence number:" 291 << " greater than the enclosing packet sequence number:"
323 << last_header_.packet_sequence_number; 292 << last_header_.packet_sequence_number;
324 return false; 293 return false;
325 } 294 }
326 295
296 if (!incoming_ack.received_info.missing_packets.empty() &&
297 *incoming_ack.received_info.missing_packets.rbegin() >
298 incoming_ack.received_info.largest_observed) {
299 DLOG(ERROR) << "Client sent missing packet: "
300 << *incoming_ack.received_info.missing_packets.rbegin()
301 << " greater than largest observed: "
302 << incoming_ack.received_info.largest_observed;
303 return false;
304 }
305
306 if (!incoming_ack.received_info.missing_packets.empty() &&
307 *incoming_ack.received_info.missing_packets.begin() <
308 least_packet_awaited_by_peer_) {
309 DLOG(ERROR) << "Client sent missing packet: "
310 << *incoming_ack.received_info.missing_packets.begin()
311 << "smaller than least_packet_awaited_by_peer_: "
312 << least_packet_awaited_by_peer_;
313 return false;
314 }
315
316 if (!entropy_manager_.IsValidEntropy(
317 incoming_ack.received_info.largest_observed,
318 incoming_ack.received_info.missing_packets,
319 incoming_ack.received_info.entropy_hash)) {
320 DLOG(ERROR) << "Client sent invalid entropy.";
321 return false;
322 }
323
327 return true; 324 return true;
328 } 325 }
329 326
330 void QuicConnection::UpdatePacketInformationReceivedByPeer( 327 void QuicConnection::UpdatePacketInformationReceivedByPeer(
331 const QuicAckFrame& incoming_ack) { 328 const QuicAckFrame& incoming_ack) {
332 QuicConnectionVisitorInterface::AckedPackets acked_packets; 329 SequenceNumberSet acked_packets;
333 330
334 // ValidateAck should fail if largest_observed ever shrinks. 331 // ValidateAck should fail if largest_observed ever shrinks.
335 DCHECK_LE(peer_largest_observed_packet_, 332 DCHECK_LE(peer_largest_observed_packet_,
336 incoming_ack.received_info.largest_observed); 333 incoming_ack.received_info.largest_observed);
337 peer_largest_observed_packet_ = incoming_ack.received_info.largest_observed; 334 peer_largest_observed_packet_ = incoming_ack.received_info.largest_observed;
338 335
339 // Pick an upper bound for the lowest_unacked; we'll then loop through the 336 if (incoming_ack.received_info.missing_packets.empty()) {
340 // unacked packets and lower it if necessary. 337 least_packet_awaited_by_peer_ = peer_largest_observed_packet_ + 1;
341 QuicPacketSequenceNumber lowest_unacked = min( 338 } else {
342 packet_creator_.sequence_number() + 1, 339 least_packet_awaited_by_peer_ =
343 peer_largest_observed_packet_ + 1); 340 *(incoming_ack.received_info.missing_packets.begin());
341 }
342
343 entropy_manager_.ClearSentEntropyBefore(least_packet_awaited_by_peer_ - 1);
344 344
345 int retransmitted_packets = 0; 345 int retransmitted_packets = 0;
346
347 // Go through the packets we have not received an ack for and see if this 346 // Go through the packets we have not received an ack for and see if this
348 // incoming_ack shows they've been seen by the peer. 347 // incoming_ack shows they've been seen by the peer.
349 UnackedPacketMap::iterator it = unacked_packets_.begin(); 348 UnackedPacketMap::iterator it = unacked_packets_.begin();
350 while (it != unacked_packets_.end()) { 349 while (it != unacked_packets_.end()) {
351 QuicPacketSequenceNumber sequence_number = it->first; 350 QuicPacketSequenceNumber sequence_number = it->first;
352 UnackedPacket* unacked = it->second; 351 if (sequence_number > peer_largest_observed_packet_) {
353 if (!incoming_ack.received_info.IsAwaitingPacket(sequence_number)) { 352 break;
353 }
354 RetransmittableFrames* unacked = it->second;
355 if (!IsAwaitingPacket(incoming_ack.received_info, sequence_number)) {
354 // Packet was acked, so remove it from our unacked packet list. 356 // Packet was acked, so remove it from our unacked packet list.
355 DVLOG(1) << "Got an ack for " << sequence_number; 357 DVLOG(1) << "Got an ack for " << sequence_number;
356 // TODO(rch): This is inefficient and should be sped up.
357 // TODO(ianswett): Ensure this inner loop is applicable now that we're
358 // always sending packets with new sequence numbers. I believe it may
359 // only be relevant for the first crypto connect packet, which doesn't
360 // get a new packet sequence number.
361 // The acked packet might be queued (if a retransmission had been
362 // attempted).
363 for (QueuedPacketList::iterator q = queued_packets_.begin();
364 q != queued_packets_.end(); ++q) {
365 if (q->sequence_number == sequence_number) {
366 queued_packets_.erase(q);
367 break;
368 }
369 }
370 acked_packets.insert(sequence_number); 358 acked_packets.insert(sequence_number);
371 DeleteEnclosedFrames(unacked);
372 delete unacked; 359 delete unacked;
373 UnackedPacketMap::iterator it_tmp = it; 360 UnackedPacketMap::iterator it_tmp = it;
374 ++it; 361 ++it;
375 unacked_packets_.erase(it_tmp); 362 unacked_packets_.erase(it_tmp);
376 retransmission_map_.erase(sequence_number); 363 retransmission_map_.erase(sequence_number);
377 } else { 364 } else {
378 // This is a packet which we planned on retransmitting and has not been 365 // This is a packet which we planned on retransmitting and has not been
379 // seen at the time of this ack being sent out. See if it's our new 366 // seen at the time of this ack being sent out. See if it's our new
380 // lowest unacked packet. 367 // lowest unacked packet.
381 DVLOG(1) << "still missing " << sequence_number; 368 DVLOG(1) << "still missing " << sequence_number;
382 if (sequence_number < lowest_unacked) {
383 lowest_unacked = sequence_number;
384 }
385 ++it; 369 ++it;
386 // Determine if this packet is being explicitly nacked and, if so, if it 370 // The peer got packets after this sequence number. This is an explicit
387 // is worth retransmitting. 371 // nack.
388 if (sequence_number <= peer_largest_observed_packet_) { 372 RetransmissionMap::iterator retransmission_it =
389 // The peer got packets after this sequence number. This is an explicit 373 retransmission_map_.find(sequence_number);
390 // nack. 374 ++(retransmission_it->second.number_nacks);
391 RetransmissionMap::iterator retransmission_it = 375 if (retransmission_it->second.number_nacks >=
392 retransmission_map_.find(sequence_number); 376 kNumberOfNacksBeforeRetransmission &&
393 ++(retransmission_it->second.number_nacks); 377 retransmitted_packets < kMaxRetransmissionsPerAck) {
394 if (retransmission_it->second.number_nacks >= 378 ++retransmitted_packets;
395 kNumberOfNacksBeforeRetransmission && 379 DVLOG(1) << "Trying to retransmit packet " << sequence_number
396 retransmitted_packets < kMaxRetransmissionsPerAck) { 380 << " as it has been nacked 3 or more times.";
397 ++retransmitted_packets; 381 // TODO(satyamshekhar): save in a vector and retransmit after the
398 DVLOG(1) << "Trying to retransmit packet " << sequence_number 382 // loop.
399 << " as it has been nacked 3 or more times."; 383 RetransmitPacket(sequence_number);
400 // TODO(satyamshekhar): save in a vector and retransmit after the
401 // loop.
402 RetransmitPacket(sequence_number);
403 }
404 } 384 }
405 } 385 }
406 } 386 }
407 if (acked_packets.size() > 0) { 387 if (acked_packets.size() > 0) {
408 visitor_->OnAck(acked_packets); 388 visitor_->OnAck(acked_packets);
409 } 389 }
410 SetLeastUnacked(lowest_unacked);
411 } 390 }
412 391
413 void QuicConnection::SetLeastUnacked(QuicPacketSequenceNumber lowest_unacked) { 392 bool QuicConnection::DontWaitForPacketsBefore(
414 // If we've gotten an ack for the lowest packet we were waiting on, 393 QuicPacketSequenceNumber least_unacked) {
415 // update that and the list of packets we advertise we will not retransmit. 394 size_t missing_packets_count =
416 if (lowest_unacked > outgoing_ack_.sent_info.least_unacked) { 395 outgoing_ack_.received_info.missing_packets.size();
417 outgoing_ack_.sent_info.least_unacked = lowest_unacked; 396 outgoing_ack_.received_info.missing_packets.erase(
418 } 397 outgoing_ack_.received_info.missing_packets.begin(),
419 } 398 outgoing_ack_.received_info.missing_packets.lower_bound(least_unacked));
420 399 return missing_packets_count !=
421 void QuicConnection::UpdateLeastUnacked( 400 outgoing_ack_.received_info.missing_packets.size();
422 QuicPacketSequenceNumber acked_sequence_number) {
423 if (acked_sequence_number != outgoing_ack_.sent_info.least_unacked) {
424 return;
425 }
426 QuicPacketSequenceNumber least_unacked =
427 packet_creator_.sequence_number() + 1;
428 for (UnackedPacketMap::iterator it = unacked_packets_.begin();
429 it != unacked_packets_.end(); ++it) {
430 least_unacked = min<int>(least_unacked, it->first);
431 }
432
433 SetLeastUnacked(least_unacked);
434 } 401 }
435 402
436 void QuicConnection::UpdatePacketInformationSentByPeer( 403 void QuicConnection::UpdatePacketInformationSentByPeer(
437 const QuicAckFrame& incoming_ack) { 404 const QuicAckFrame& incoming_ack) {
438 // Make sure we also don't ack any packets lower than the peer's 405 // ValidateAck() should fail if peer_least_packet_awaiting_ack_ shrinks.
439 // last-packet-awaiting-ack. 406 DCHECK_LE(peer_least_packet_awaiting_ack_,
407 incoming_ack.sent_info.least_unacked);
440 if (incoming_ack.sent_info.least_unacked > peer_least_packet_awaiting_ack_) { 408 if (incoming_ack.sent_info.least_unacked > peer_least_packet_awaiting_ack_) {
441 outgoing_ack_.received_info.ClearMissingBefore( 409 bool missed_packets =
442 incoming_ack.sent_info.least_unacked); 410 DontWaitForPacketsBefore(incoming_ack.sent_info.least_unacked);
411 if (missed_packets || incoming_ack.sent_info.least_unacked >
412 outgoing_ack_.received_info.largest_observed + 1) {
413 DVLOG(1) << "Updating entropy hashed since we missed packets";
414 // There were some missing packets that we won't ever get now. Recalculate
415 // the received entropy hash.
416 entropy_manager_.RecalculateReceivedEntropyHash(
417 incoming_ack.sent_info.least_unacked,
418 incoming_ack.sent_info.entropy_hash);
419 }
443 peer_least_packet_awaiting_ack_ = incoming_ack.sent_info.least_unacked; 420 peer_least_packet_awaiting_ack_ = incoming_ack.sent_info.least_unacked;
421 // TODO(satyamshekhar): We get this iterator O(logN) in
422 // RecalculateReceivedEntropyHash also.
423 entropy_manager_.ClearReceivedEntropyBefore(
424 peer_least_packet_awaiting_ack_);
444 } 425 }
445 426 DCHECK(outgoing_ack_.received_info.missing_packets.empty() ||
427 *outgoing_ack_.received_info.missing_packets.begin() >=
428 peer_least_packet_awaiting_ack_);
446 // Possibly close any FecGroups which are now irrelevant 429 // Possibly close any FecGroups which are now irrelevant
447 CloseFecGroupsBefore(incoming_ack.sent_info.least_unacked + 1); 430 CloseFecGroupsBefore(incoming_ack.sent_info.least_unacked + 1);
448 } 431 }
449 432
450 void QuicConnection::OnFecData(const QuicFecData& fec) { 433 void QuicConnection::OnFecData(const QuicFecData& fec) {
451 DCHECK_NE(0u, last_header_.fec_group); 434 DCHECK_NE(0u, last_header_.fec_group);
452 QuicFecGroup* group = GetFecGroup(); 435 QuicFecGroup* group = GetFecGroup();
453 group->UpdateFec(last_header_.packet_sequence_number, fec); 436 group->UpdateFec(last_header_.packet_sequence_number,
437 last_header_.fec_entropy_flag, fec);
454 } 438 }
455 439
456 void QuicConnection::OnRstStreamFrame(const QuicRstStreamFrame& frame) { 440 void QuicConnection::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
457 if (debug_visitor_) { 441 if (debug_visitor_) {
458 debug_visitor_->OnRstStreamFrame(frame); 442 debug_visitor_->OnRstStreamFrame(frame);
459 } 443 }
460 DLOG(INFO) << "Stream reset with error " 444 DLOG(INFO) << "Stream reset with error "
461 << QuicUtils::ErrorToString(frame.error_code); 445 << QuicUtils::ErrorToString(frame.error_code);
462 visitor_->OnRstStream(frame); 446 visitor_->OnRstStream(frame);
463 } 447 }
464 448
465 void QuicConnection::OnConnectionCloseFrame( 449 void QuicConnection::OnConnectionCloseFrame(
466 const QuicConnectionCloseFrame& frame) { 450 const QuicConnectionCloseFrame& frame) {
467 if (debug_visitor_) { 451 if (debug_visitor_) {
468 debug_visitor_->OnConnectionCloseFrame(frame); 452 debug_visitor_->OnConnectionCloseFrame(frame);
469 } 453 }
470 DLOG(INFO) << "Connection closed with error " 454 DLOG(INFO) << "Connection closed with error "
471 << QuicUtils::ErrorToString(frame.error_code); 455 << QuicUtils::ErrorToString(frame.error_code);
472 CloseConnection(frame.error_code, true); 456 CloseConnection(frame.error_code, true);
473 } 457 }
474 458
459 void QuicConnection::OnGoAwayFrame(const QuicGoAwayFrame& frame) {
460 DLOG(INFO) << "Go away received with error "
461 << QuicUtils::ErrorToString(frame.error_code)
462 << " and reason:" << frame.reason_phrase;
463 visitor_->OnGoAway(frame);
464 }
465
475 void QuicConnection::OnPacketComplete() { 466 void QuicConnection::OnPacketComplete() {
467 // TODO(satyamshekhar): Don't do anything if this packet closed the
468 // connection.
476 if (!last_packet_revived_) { 469 if (!last_packet_revived_) {
477 DLOG(INFO) << "Got packet " << last_header_.packet_sequence_number 470 DLOG(INFO) << "Got packet " << last_header_.packet_sequence_number
478 << " with " << last_stream_frames_.size() 471 << " with " << last_stream_frames_.size()
479 << " stream frames for " << last_header_.public_header.guid; 472 << " stream frames for " << last_header_.public_header.guid;
480 congestion_manager_.RecordIncomingPacket( 473 congestion_manager_.RecordIncomingPacket(
481 last_size_, last_header_.packet_sequence_number, 474 last_size_, last_header_.packet_sequence_number,
482 clock_->Now(), last_packet_revived_); 475 time_of_last_received_packet_, last_packet_revived_);
483 } else { 476 } else {
484 DLOG(INFO) << "Got revived packet with " << last_stream_frames_.size() 477 DLOG(INFO) << "Got revived packet with " << last_stream_frames_.size()
485 << " frames."; 478 << " frames.";
486 } 479 }
487 480
488 if (last_stream_frames_.empty() || 481 if ((last_stream_frames_.empty() ||
489 visitor_->OnPacket(self_address_, peer_address_, 482 visitor_->OnPacket(self_address_, peer_address_,
490 last_header_, last_stream_frames_)) { 483 last_header_, last_stream_frames_))) {
491 RecordPacketReceived(last_header_); 484 RecordPacketReceived(last_header_);
492 } 485 }
493 486
494 MaybeSendAckInResponseToPacket(); 487 MaybeSendAckInResponseToPacket();
495 last_stream_frames_.clear(); 488 last_stream_frames_.clear();
496 } 489 }
497 490
491 QuicAckFrame* QuicConnection::CreateAckFrame() {
492 return new QuicAckFrame(outgoing_ack_);
493 }
494
495 QuicCongestionFeedbackFrame* QuicConnection::CreateFeedbackFrame() {
496 return new QuicCongestionFeedbackFrame(outgoing_congestion_feedback_);
497 }
498
498 void QuicConnection::MaybeSendAckInResponseToPacket() { 499 void QuicConnection::MaybeSendAckInResponseToPacket() {
499 if (send_ack_in_response_to_packet_) { 500 if (send_ack_in_response_to_packet_) {
500 SendAck(); 501 SendAck();
501 } else if (!last_stream_frames_.empty()) { 502 } else if (!last_stream_frames_.empty()) {
502 // TODO(alyssar) this case should really be "if the packet contained any 503 // TODO(alyssar) this case should really be "if the packet contained any
503 // non-ack frame", rather than "if the packet contained a stream frame" 504 // non-ack frame", rather than "if the packet contained a stream frame"
504 helper_->SetAckAlarm(congestion_manager_.DefaultRetransmissionTime()); 505 helper_->SetAckAlarm(congestion_manager_.DefaultRetransmissionTime());
505 } 506 }
506 send_ack_in_response_to_packet_ = !send_ack_in_response_to_packet_; 507 send_ack_in_response_to_packet_ = !send_ack_in_response_to_packet_;
507 } 508 }
508 509
509 QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id, 510 QuicConsumedData QuicConnection::SendStreamData(QuicStreamId id,
510 StringPiece data, 511 base::StringPiece data,
511 QuicStreamOffset offset, 512 QuicStreamOffset offset,
512 bool fin) { 513 bool fin) {
513 size_t total_bytes_consumed = 0; 514 return packet_generator_.ConsumeData(id, data, offset, fin);
514 bool fin_consumed = false;
515
516 while (queued_packets_.empty()) {
517 packet_creator_.MaybeStartFEC();
518 QuicFrame frame;
519 size_t bytes_consumed = packet_creator_.CreateStreamFrame(
520 id, data, offset, fin, &frame);
521 bool success = packet_creator_.AddFrame(frame);
522 DCHECK(success);
523
524 total_bytes_consumed += bytes_consumed;
525 offset += bytes_consumed;
526 fin_consumed = fin && bytes_consumed == data.size();
527 data.remove_prefix(bytes_consumed);
528
529 // TODO(ianswett): Currently this does not pack stream data together,
530 // because SendStreamData does not know if there are more streams to write.
531 // TODO(ianswett): Restore packet reordering.
532 SendOrQueueCurrentPacket();
533
534 if (packet_creator_.ShouldSendFec(false)) {
535 PacketPair fec_pair = packet_creator_.SerializeFec();
536 // Never retransmit FEC packets.
537 SendOrQueuePacket(fec_pair.first, fec_pair.second, !kForce);
538 }
539
540 if (data.empty()) {
541 // We're done writing the data. Exit the loop.
542 // We don't make this a precondition because we could have 0 bytes of data
543 // if we're simply writing a fin.
544 break;
545 }
546 }
547 // Ensure the FEC group is closed at the end of this method.
548 if (packet_creator_.ShouldSendFec(true)) {
549 PacketPair fec_pair = packet_creator_.SerializeFec();
550 // Never retransmit FEC packets.
551 SendOrQueuePacket(fec_pair.first, fec_pair.second, !kForce);
552 }
553 return QuicConsumedData(total_bytes_consumed, fin_consumed);
554 } 515 }
555 516
556 void QuicConnection::SendRstStream(QuicStreamId id, 517 void QuicConnection::SendRstStream(QuicStreamId id,
557 QuicErrorCode error, 518 QuicErrorCode error) {
558 QuicStreamOffset offset) { 519 packet_generator_.AddControlFrame(
559 queued_control_frames_.push_back(QuicFrame( 520 QuicFrame(new QuicRstStreamFrame(id, error)));
560 new QuicRstStreamFrame(id, offset, error)));
561
562 // Try to write immediately if possible.
563 if (CanWrite(!kIsRetransmission)) {
564 WriteQueuedData(kFlush);
565 }
566 } 521 }
567 522
568 void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address, 523 void QuicConnection::ProcessUdpPacket(const IPEndPoint& self_address,
569 const IPEndPoint& peer_address, 524 const IPEndPoint& peer_address,
570 const QuicEncryptedPacket& packet) { 525 const QuicEncryptedPacket& packet) {
571 if (debug_visitor_) { 526 if (debug_visitor_) {
572 debug_visitor_->OnPacketReceived(self_address, peer_address, packet); 527 debug_visitor_->OnPacketReceived(self_address, peer_address, packet);
573 } 528 }
574 last_packet_revived_ = false; 529 last_packet_revived_ = false;
575 last_size_ = packet.length(); 530 last_size_ = packet.length();
576 last_self_address_ = self_address; 531 last_self_address_ = self_address;
577 last_peer_address_ = peer_address; 532 last_peer_address_ = peer_address;
578 framer_.ProcessPacket(packet); 533 framer_.ProcessPacket(packet);
579 MaybeProcessRevivedPacket(); 534 MaybeProcessRevivedPacket();
580 } 535 }
581 536
582 bool QuicConnection::OnCanWrite() { 537 bool QuicConnection::OnCanWrite() {
538 LOG(INFO) << "here!!!";
583 write_blocked_ = false; 539 write_blocked_ = false;
584 540
585 WriteQueuedData(!kFlush); 541 WriteQueuedPackets();
586 542
587 // Ensure there's enough room for a StreamFrame before calling the visitor. 543 // Sending queued packets may have caused the socket to become write blocked,
588 if (packet_creator_.BytesFree() <= kMinStreamFrameLength) { 544 // or the congestion manager to prohibit sending. If we've sent everything
589 SendOrQueueCurrentPacket(); 545 // we had queued and we're still not blocked, let the visitor know it can
590 } 546 // write more.
547 // TODO(rch): shouldn't this be "if (CanWrite(false))"
548 if (!write_blocked_) {
549 packet_generator_.StartBatchOperations();
550 bool all_bytes_written = visitor_->OnCanWrite();
551 packet_generator_.FinishBatchOperations();
591 552
592 // If we've sent everything we had queued and we're still not blocked, let the 553 // After the visitor writes, it may have caused the socket to become write
593 // visitor know it can write more. 554 // blocked or the congestion manager to prohibit sending, so check again.
594 if (!write_blocked_) { 555 if (!write_blocked_ && !all_bytes_written && !helper_->IsSendAlarmSet()) {
595 bool all_bytes_written = visitor_->OnCanWrite();
596 // If the latest write caused a socket-level blockage, return false: we will
597 // be rescheduled by the kernel.
598 if (write_blocked_) {
599 return false;
600 }
601 if (!all_bytes_written && !helper_->IsSendAlarmSet()) {
602 // We're not write blocked, but some stream didn't write out all of its 556 // We're not write blocked, but some stream didn't write out all of its
603 // bytes. Register for 'immediate' resumption so we'll keep writing after 557 // bytes. Register for 'immediate' resumption so we'll keep writing after
604 // other quic connections have had a chance to use the socket. 558 // other quic connections have had a chance to use the socket.
605 helper_->SetSendAlarm(QuicTime::Delta::Zero()); 559 helper_->SetSendAlarm(QuicTime::Delta::Zero());
606 } 560 }
607 } 561 }
608 562
609 // If a write can still be performed, ensure there are no pending frames,
610 // even if they didn't fill a packet.
611 if (packet_creator_.HasPendingFrames() && CanWrite(!kIsRetransmission)) {
612 SendOrQueueCurrentPacket();
613 }
614
615 return !write_blocked_; 563 return !write_blocked_;
616 } 564 }
617 565
618 bool QuicConnection::WriteQueuedData(bool flush) { 566 bool QuicConnection::WriteQueuedPackets() {
619 DCHECK(!write_blocked_); 567 DCHECK(!write_blocked_);
620 DCHECK(!packet_creator_.HasPendingFrames());
621 568
622 // Send all queued packets first.
623 size_t num_queued_packets = queued_packets_.size() + 1; 569 size_t num_queued_packets = queued_packets_.size() + 1;
624 QueuedPacketList::iterator packet_iterator = queued_packets_.begin(); 570 QueuedPacketList::iterator packet_iterator = queued_packets_.begin();
625 while (!write_blocked_ && !helper_->IsSendAlarmSet() && 571 while (!write_blocked_ && !helper_->IsSendAlarmSet() &&
626 packet_iterator != queued_packets_.end()) { 572 packet_iterator != queued_packets_.end()) {
627 // Ensure that from one iteration of this loop to the next we 573 // Ensure that from one iteration of this loop to the next we
628 // succeeded in sending a packet so we don't infinitely loop. 574 // succeeded in sending a packet so we don't infinitely loop.
629 // TODO(rch): clean up and close the connection if we really hit this. 575 // TODO(rch): clean up and close the connection if we really hit this.
630 DCHECK_LT(queued_packets_.size(), num_queued_packets); 576 DCHECK_LT(queued_packets_.size(), num_queued_packets);
631 num_queued_packets = queued_packets_.size(); 577 num_queued_packets = queued_packets_.size();
632 if (WritePacket(packet_iterator->sequence_number, 578 if (WritePacket(packet_iterator->sequence_number,
633 packet_iterator->packet, !kForce)) { 579 packet_iterator->packet, !kForce)) {
634 packet_iterator = queued_packets_.erase(packet_iterator); 580 packet_iterator = queued_packets_.erase(packet_iterator);
635 } else { 581 } else {
636 // TODO(ianswett): Why not break or return false here? 582 // Continue, because some queued packets may still be writable.
637 ++packet_iterator; 583 ++packet_iterator;
638 } 584 }
639 } 585 }
640 586
641 if (write_blocked_) {
642 return false;
643 }
644
645 while ((!queued_control_frames_.empty() || should_send_ack_ ||
646 should_send_congestion_feedback_) && CanWrite(!kIsRetransmission)) {
647 bool full_packet = false;
648 if (!queued_control_frames_.empty()) {
649 full_packet = !packet_creator_.AddFrame(queued_control_frames_.back());
650 if (!full_packet) {
651 queued_control_frames_.pop_back();
652 }
653 } else if (should_send_ack_) {
654 full_packet = !packet_creator_.AddFrame(QuicFrame(&outgoing_ack_));
655 if (!full_packet) {
656 should_send_ack_ = false;
657 }
658 } else if (should_send_congestion_feedback_) {
659 full_packet = !packet_creator_.AddFrame(
660 QuicFrame(&outgoing_congestion_feedback_));
661 if (!full_packet) {
662 should_send_congestion_feedback_ = false;
663 }
664 }
665
666 if (full_packet) {
667 SendOrQueueCurrentPacket();
668 }
669 }
670
671 if (flush && packet_creator_.HasPendingFrames()) {
672 SendOrQueueCurrentPacket();
673 }
674
675 return !write_blocked_; 587 return !write_blocked_;
676 } 588 }
677 589
678 void QuicConnection::RecordPacketReceived(const QuicPacketHeader& header) { 590 void QuicConnection::RecordPacketReceived(const QuicPacketHeader& header) {
591 DLOG(INFO) << "Recording received packet: " << header.packet_sequence_number;
679 QuicPacketSequenceNumber sequence_number = header.packet_sequence_number; 592 QuicPacketSequenceNumber sequence_number = header.packet_sequence_number;
680 DCHECK(outgoing_ack_.received_info.IsAwaitingPacket(sequence_number)); 593 DCHECK(IsAwaitingPacket(outgoing_ack_.received_info, sequence_number));
681 outgoing_ack_.received_info.RecordReceived(sequence_number); 594
595 InsertMissingPacketsBetween(
596 &outgoing_ack_.received_info,
597 max(outgoing_ack_.received_info.largest_observed + 1,
598 peer_least_packet_awaiting_ack_),
599 header.packet_sequence_number);
600
601 if (outgoing_ack_.received_info.largest_observed >
602 header.packet_sequence_number) {
603 // We've gotten one of the out of order packets - remove it from our
604 // "missing packets" list.
605 DVLOG(1) << "Removing " << sequence_number << " from missing list";
606 outgoing_ack_.received_info.missing_packets.erase(sequence_number);
607 }
608 outgoing_ack_.received_info.largest_observed = max(
609 outgoing_ack_.received_info.largest_observed,
610 header.packet_sequence_number);
611 entropy_manager_.RecordReceivedPacketEntropyHash(sequence_number,
612 header.entropy_hash);
682 } 613 }
683 614
684 bool QuicConnection::MaybeRetransmitPacketForRTO( 615 bool QuicConnection::MaybeRetransmitPacketForRTO(
685 QuicPacketSequenceNumber sequence_number) { 616 QuicPacketSequenceNumber sequence_number) {
686 DCHECK_EQ(ContainsKey(unacked_packets_, sequence_number), 617 DCHECK_EQ(ContainsKey(unacked_packets_, sequence_number),
687 ContainsKey(retransmission_map_, sequence_number)); 618 ContainsKey(retransmission_map_, sequence_number));
688 619
689 if (!ContainsKey(unacked_packets_, sequence_number)) { 620 if (!ContainsKey(unacked_packets_, sequence_number)) {
690 DVLOG(2) << "alarm fired for " << sequence_number 621 DVLOG(2) << "alarm fired for " << sequence_number
691 << " but it has been acked or already retransmitted with " 622 << " but it has been acked or already retransmitted with"
692 << " different sequence number."; 623 << " different sequence number.";
693 // So no extra delay is added for this packet. 624 // So no extra delay is added for this packet.
694 return true; 625 return true;
695 } 626 }
696 627
697 // If the packet hasn't been acked and we're getting truncated acks, ignore 628 // If the packet hasn't been acked and we're getting truncated acks, ignore
698 // any RTO for packets larger than the peer's largest observed packet; it may 629 // any RTO for packets larger than the peer's largest observed packet; it may
699 // have been received by the peer and just wasn't acked due to the ack frame 630 // have been received by the peer and just wasn't acked due to the ack frame
700 // running out of space. 631 // running out of space.
701 if (received_truncated_ack_ && 632 if (received_truncated_ack_ &&
(...skipping 10 matching lines...) Expand all
712 UnackedPacketMap::iterator unacked_it = 643 UnackedPacketMap::iterator unacked_it =
713 unacked_packets_.find(sequence_number); 644 unacked_packets_.find(sequence_number);
714 RetransmissionMap::iterator retransmission_it = 645 RetransmissionMap::iterator retransmission_it =
715 retransmission_map_.find(sequence_number); 646 retransmission_map_.find(sequence_number);
716 // There should always be an entry corresponding to |sequence_number| in 647 // There should always be an entry corresponding to |sequence_number| in
717 // both |retransmission_map_| and |unacked_packets_|. Retransmissions due to 648 // both |retransmission_map_| and |unacked_packets_|. Retransmissions due to
718 // RTO for sequence numbers that are already acked or retransmitted are 649 // RTO for sequence numbers that are already acked or retransmitted are
719 // ignored by MaybeRetransmitPacketForRTO. 650 // ignored by MaybeRetransmitPacketForRTO.
720 DCHECK(unacked_it != unacked_packets_.end()); 651 DCHECK(unacked_it != unacked_packets_.end());
721 DCHECK(retransmission_it != retransmission_map_.end()); 652 DCHECK(retransmission_it != retransmission_map_.end());
722 UnackedPacket* unacked = unacked_it->second; 653 RetransmittableFrames* unacked = unacked_it->second;
723 // TODO(ianswett): Never change the sequence number of the connect packet. 654 // TODO(ianswett): Never change the sequence number of the connect packet.
724 // Re-packetize the frames with a new sequence number for retransmission. 655 // Re-packetize the frames with a new sequence number for retransmission.
725 // Retransmitted data packets do not use FEC, even when it's enabled. 656 // Retransmitted data packets do not use FEC, even when it's enabled.
726 PacketPair packetpair = packet_creator_.SerializeAllFrames(unacked->frames); 657 SerializedPacket serialized_packet =
727 RetransmissionInfo retransmission_info(packetpair.first); 658 packet_creator_.SerializeAllFrames(unacked->frames());
659 RetransmissionInfo retransmission_info(serialized_packet.sequence_number);
728 retransmission_info.number_retransmissions = 660 retransmission_info.number_retransmissions =
729 retransmission_it->second.number_retransmissions + 1; 661 retransmission_it->second.number_retransmissions + 1;
730 retransmission_map_.insert(make_pair(packetpair.first, retransmission_info)); 662 retransmission_map_.insert(make_pair(serialized_packet.sequence_number,
663 retransmission_info));
731 // Remove info with old sequence number. 664 // Remove info with old sequence number.
732 unacked_packets_.erase(unacked_it); 665 unacked_packets_.erase(unacked_it);
733 retransmission_map_.erase(retransmission_it); 666 retransmission_map_.erase(retransmission_it);
734 DVLOG(1) << "Retransmitting unacked packet " << sequence_number << " as " 667 DVLOG(1) << "Retransmitting unacked packet " << sequence_number << " as "
735 << packetpair.first; 668 << serialized_packet.sequence_number;
736 unacked_packets_.insert(make_pair(packetpair.first, unacked)); 669 DCHECK(unacked_packets_.empty() ||
737 // Make sure if this was our least unacked packet, that we update our 670 unacked_packets_.rbegin()->first < serialized_packet.sequence_number);
738 // outgoing ack. If this wasn't the least unacked, this is a no-op. 671 unacked_packets_.insert(make_pair(serialized_packet.sequence_number,
739 UpdateLeastUnacked(sequence_number); 672 unacked));
740 SendOrQueuePacket(packetpair.first, packetpair.second, !kForce); 673 SendOrQueuePacket(serialized_packet.sequence_number,
674 serialized_packet.packet,
675 serialized_packet.entropy_hash);
741 } 676 }
742 677
743 bool QuicConnection::CanWrite(bool is_retransmission) { 678 bool QuicConnection::CanWrite(bool is_retransmission) {
744 // TODO(ianswett): If the packet is a retransmit, the current send alarm may 679 // TODO(ianswett): If the packet is a retransmit, the current send alarm may
745 // be too long. 680 // be too long.
746 if (write_blocked_ || helper_->IsSendAlarmSet()) { 681 if (write_blocked_ || helper_->IsSendAlarmSet()) {
747 return false; 682 return false;
748 } 683 }
749 QuicTime::Delta delay = congestion_manager_.TimeUntilSend(is_retransmission); 684 QuicTime::Delta delay = congestion_manager_.TimeUntilSend(clock_->Now(),
685 is_retransmission);
750 // If the scheduler requires a delay, then we can not send this packet now. 686 // If the scheduler requires a delay, then we can not send this packet now.
751 if (!delay.IsZero() && !delay.IsInfinite()) { 687 if (!delay.IsZero() && !delay.IsInfinite()) {
752 // TODO(pwestin): we need to handle delay.IsInfinite() separately. 688 // TODO(pwestin): we need to handle delay.IsInfinite() separately.
753 helper_->SetSendAlarm(delay); 689 helper_->SetSendAlarm(delay);
754 return false; 690 return false;
755 } 691 }
756 return true; 692 return true;
757 } 693 }
758 694
759 bool QuicConnection::IsRetransmission( 695 bool QuicConnection::IsRetransmission(
760 QuicPacketSequenceNumber sequence_number) { 696 QuicPacketSequenceNumber sequence_number) {
761 RetransmissionMap::iterator it = retransmission_map_.find(sequence_number); 697 RetransmissionMap::iterator it = retransmission_map_.find(sequence_number);
762 return it != retransmission_map_.end() && 698 return it != retransmission_map_.end() &&
763 it->second.number_retransmissions > 0; 699 it->second.number_retransmissions > 0;
764 } 700 }
765 701
766 void QuicConnection::MaybeSetupRetransmission( 702 void QuicConnection::MaybeSetupRetransmission(
767 QuicPacketSequenceNumber sequence_number) { 703 QuicPacketSequenceNumber sequence_number) {
768 RetransmissionMap::iterator it = retransmission_map_.find(sequence_number); 704 RetransmissionMap::iterator it = retransmission_map_.find(sequence_number);
769 if (it == retransmission_map_.end()) { 705 if (it == retransmission_map_.end()) {
770 DVLOG(1) << "Will not retransmit packet " << sequence_number; 706 DVLOG(1) << "Will not retransmit packet " << sequence_number;
771 return; 707 return;
772 } 708 }
773 709
774 RetransmissionInfo retransmission_info = it->second; 710 RetransmissionInfo retransmission_info = it->second;
775 QuicTime::Delta retransmission_delay = 711 QuicTime::Delta retransmission_delay =
776 congestion_manager_.GetRetransmissionDelay( 712 congestion_manager_.GetRetransmissionDelay(
713 unacked_packets_.size(),
777 retransmission_info.number_retransmissions); 714 retransmission_info.number_retransmissions);
778 retransmission_info.scheduled_time = clock_->Now().Add(retransmission_delay); 715 retransmission_info.scheduled_time =
716 clock_->ApproximateNow().Add(retransmission_delay);
779 retransmission_timeouts_.push(retransmission_info); 717 retransmission_timeouts_.push(retransmission_info);
780 718
781 // Do not set the retransmisson alarm if we're already handling the 719 // Do not set the retransmisson alarm if we're already handling the
782 // retransmission alarm because the retransmission alarm will be reset when 720 // retransmission alarm because the retransmission alarm will be reset when
783 // OnRetransmissionTimeout completes. 721 // OnRetransmissionTimeout completes.
784 if (!handling_retransmission_timeout_) { 722 if (!handling_retransmission_timeout_) {
785 helper_->SetRetransmissionAlarm(retransmission_delay); 723 helper_->SetRetransmissionAlarm(retransmission_delay);
786 } 724 }
787 725 // TODO(satyamshekhar): restore pacekt reordering with Ian's TODO in
788 // The second case should never happen in the real world, but does here 726 // SendStreamData().
789 // because we sometimes send out of order to validate corner cases.
790 if (outgoing_ack_.sent_info.least_unacked == 0 ||
791 sequence_number < outgoing_ack_.sent_info.least_unacked) {
792 outgoing_ack_.sent_info.least_unacked = sequence_number;
793 }
794 } 727 }
795 728
796 bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number, 729 bool QuicConnection::WritePacket(QuicPacketSequenceNumber sequence_number,
797 QuicPacket* packet, 730 QuicPacket* packet,
798 bool forced) { 731 bool forced) {
799 if (!connected_) { 732 if (!connected_) {
800 DLOG(INFO) 733 DLOG(INFO)
801 << "Dropping packet to be sent since connection is disconnected."; 734 << "Dropping packet to be sent since connection is disconnected.";
802 delete packet; 735 delete packet;
803 // Returning true because we deleted the packet and the caller shouldn't 736 // Returning true because we deleted the packet and the caller shouldn't
804 // delete it again. 737 // delete it again.
805 return true; 738 return true;
806 } 739 }
807 740
808 bool is_retransmission = IsRetransmission(sequence_number); 741 bool is_retransmission = IsRetransmission(sequence_number);
809 // If we are not forced and we can't write, then simply return false; 742 // If we are not forced and we can't write, then simply return false;
810 if (!forced && !CanWrite(is_retransmission)) { 743 if (!forced && !CanWrite(is_retransmission)) {
811 return false; 744 return false;
812 } 745 }
813 746
814 scoped_ptr<QuicEncryptedPacket> encrypted(framer_.EncryptPacket(*packet)); 747 scoped_ptr<QuicEncryptedPacket> encrypted(
815 DLOG(INFO) << "Sending packet : " 748 framer_.EncryptPacket(sequence_number, *packet));
749 DLOG(INFO) << "Sending packet number " << sequence_number << " : "
816 << (packet->is_fec_packet() ? "FEC " : 750 << (packet->is_fec_packet() ? "FEC " :
817 (ContainsKey(retransmission_map_, sequence_number) ? 751 (ContainsKey(retransmission_map_, sequence_number) ?
818 "data bearing " : " ack only ")) 752 "data bearing " : " ack only "));
819 << "packet " << sequence_number; 753
820 DCHECK(encrypted->length() <= kMaxPacketSize) 754 DCHECK(encrypted->length() <= kMaxPacketSize)
821 << "Packet " << sequence_number << " will not be read; too large: " 755 << "Packet " << sequence_number << " will not be read; too large: "
822 << packet->length() << " " << encrypted->length() << " " 756 << packet->length() << " " << encrypted->length() << " "
823 << outgoing_ack_; 757 << outgoing_ack_;
824 758
825 int error; 759 int error;
760 QuicTime now = clock_->Now();
826 int rv = helper_->WritePacketToWire(*encrypted, &error); 761 int rv = helper_->WritePacketToWire(*encrypted, &error);
827 if (rv == -1 && error == ERR_IO_PENDING) { 762 if (rv == -1 && error == ERR_IO_PENDING) {
763 // TODO(satyashekhar): It might be more efficient (fewer system calls), if
764 // all connections share this variable i.e this becomes a part of
765 // PacketWriterInterface.
828 write_blocked_ = true; 766 write_blocked_ = true;
829 return false; 767 return false;
830 } 768 }
769 time_of_last_sent_packet_ = now;
770 DVLOG(1) << "time of last sent packet: " << now.ToMicroseconds();
831 // TODO(wtc): Is it correct to continue if the write failed. 771 // TODO(wtc): Is it correct to continue if the write failed.
832 772
833 // Set the retransmit alarm only when we have sent the packet to the client 773 // Set the retransmit alarm only when we have sent the packet to the client
834 // and not when it goes to the pending queue, otherwise we will end up adding 774 // and not when it goes to the pending queue, otherwise we will end up adding
835 // an entry to retransmission_timeout_ every time we attempt a write. 775 // an entry to retransmission_timeout_ every time we attempt a write.
836 MaybeSetupRetransmission(sequence_number); 776 MaybeSetupRetransmission(sequence_number);
837 777
838 time_of_last_packet_ = clock_->Now(); 778 congestion_manager_.SentPacket(sequence_number, now, packet->length(),
839 DVLOG(1) << "last packet: " << time_of_last_packet_.ToMicroseconds();
840
841 congestion_manager_.SentPacket(sequence_number, packet->length(),
842 is_retransmission); 779 is_retransmission);
843 delete packet; 780 delete packet;
844 return true; 781 return true;
845 } 782 }
846 783
847 void QuicConnection::SendOrQueueCurrentPacket() { 784 bool QuicConnection::OnSerializedPacket(
848 QuicFrames retransmittable_frames; 785 const SerializedPacket& serialized_packet) {
849 PacketPair pair = packet_creator_.SerializePacket(&retransmittable_frames); 786 if (serialized_packet.retransmittable_frames != NULL) {
850 const bool should_retransmit = !retransmittable_frames.empty(); 787 DCHECK(unacked_packets_.empty() ||
851 if (should_retransmit) { 788 unacked_packets_.rbegin()->first <
852 UnackedPacket* unacked = new UnackedPacket(retransmittable_frames); 789 serialized_packet.sequence_number);
853 for (size_t i = 0; i < retransmittable_frames.size(); ++i) { 790 unacked_packets_.insert(
854 if (retransmittable_frames[i].type == STREAM_FRAME) { 791 make_pair(serialized_packet.sequence_number,
855 DCHECK(unacked->data.empty()); 792 serialized_packet.retransmittable_frames));
856 // Make an owned copy of the StringPiece.
857 unacked->data =
858 retransmittable_frames[i].stream_frame->data.as_string();
859 // Ensure the frame's StringPiece points to the owned copy of the data.
860 retransmittable_frames[i].stream_frame->data =
861 StringPiece(unacked->data);
862 }
863 }
864 unacked_packets_.insert(make_pair(pair.first, unacked));
865 // All unacked packets might be retransmitted. 793 // All unacked packets might be retransmitted.
866 retransmission_map_.insert(make_pair(pair.first, 794 retransmission_map_.insert(
867 RetransmissionInfo(pair.first))); 795 make_pair(serialized_packet.sequence_number,
796 RetransmissionInfo(serialized_packet.sequence_number)));
868 } 797 }
869 SendOrQueuePacket(pair.first, pair.second, !kForce); 798 return SendOrQueuePacket(serialized_packet.sequence_number,
799 serialized_packet.packet,
800 serialized_packet.entropy_hash);
870 } 801 }
871 802
872 bool QuicConnection::SendOrQueuePacket(QuicPacketSequenceNumber sequence_number, 803 bool QuicConnection::SendOrQueuePacket(QuicPacketSequenceNumber sequence_number,
873 QuicPacket* packet, 804 QuicPacket* packet,
874 bool force) { 805 QuicPacketEntropyHash entropy_hash) {
875 if (!WritePacket(sequence_number, packet, force)) { 806 entropy_manager_.RecordSentPacketEntropyHash(sequence_number, entropy_hash);
807 if (!WritePacket(sequence_number, packet, !kForce)) {
876 queued_packets_.push_back(QueuedPacket(sequence_number, packet)); 808 queued_packets_.push_back(QueuedPacket(sequence_number, packet));
877 return false; 809 return false;
878 } 810 }
879 return true; 811 return true;
880 } 812 }
881 813
882 bool QuicConnection::ShouldSimulateLostPacket() { 814 bool QuicConnection::ShouldSimulateLostPacket() {
883 // TODO(rch): enable this 815 // TODO(rch): enable this
884 return false; 816 return false;
885 /* 817 /*
886 return FLAGS_fake_packet_loss_percentage > 0 && 818 return FLAGS_fake_packet_loss_percentage > 0 &&
887 random_->Rand32() % 100 < FLAGS_fake_packet_loss_percentage; 819 random_->Rand32() % 100 < FLAGS_fake_packet_loss_percentage;
888 */ 820 */
889 } 821 }
890 822
823 void QuicConnection::UpdateOutgoingAck() {
824 if (!unacked_packets_.empty()) {
825 outgoing_ack_.sent_info.least_unacked = unacked_packets_.begin()->first;
826 } else {
827 // If there are no unacked packets, set the least unacked packet to
828 // sequence_number() + 1 since that will be the sequence number of this
829 // ack packet whenever it is sent.
830 outgoing_ack_.sent_info.least_unacked =
831 packet_creator_.sequence_number() + 1;
832 }
833 outgoing_ack_.sent_info.entropy_hash = entropy_manager_.SentEntropyHash(
834 outgoing_ack_.sent_info.least_unacked - 1);
835 outgoing_ack_.received_info.entropy_hash =
836 entropy_manager_.ReceivedEntropyHash(
837 outgoing_ack_.received_info.largest_observed);
838 }
839
891 void QuicConnection::SendAck() { 840 void QuicConnection::SendAck() {
892 helper_->ClearAckAlarm(); 841 helper_->ClearAckAlarm();
842 UpdateOutgoingAck();
843 DVLOG(1) << "Sending ack: " << outgoing_ack_;
893 844
894 if (!ContainsKey(unacked_packets_, outgoing_ack_.sent_info.least_unacked)) { 845 // TODO(rch): delay this until the CreateFeedbackFrame
895 // At some point, all packets were acked, and we set least_unacked to a 846 // method is invoked. This requires changes SetShouldSendAck
896 // packet we will not retransmit. Make sure we update it. 847 // to be a no-arg method, and re-jiggering its implementation.
897 UpdateLeastUnacked(outgoing_ack_.sent_info.least_unacked); 848 bool send_feedback = false;
898 }
899
900 DVLOG(1) << "Sending ack " << outgoing_ack_;
901
902 should_send_ack_ = true;
903
904 if (congestion_manager_.GenerateCongestionFeedback( 849 if (congestion_manager_.GenerateCongestionFeedback(
905 &outgoing_congestion_feedback_)) { 850 &outgoing_congestion_feedback_)) {
906 DVLOG(1) << "Sending feedback " << outgoing_congestion_feedback_; 851 DVLOG(1) << "Sending feedback " << outgoing_congestion_feedback_;
907 should_send_congestion_feedback_ = true; 852 send_feedback = true;
908 } 853 }
909 // Try to write immediately if possible. 854
910 if (CanWrite(!kIsRetransmission)) { 855 packet_generator_.SetShouldSendAck(send_feedback);
911 WriteQueuedData(kFlush);
912 }
913 } 856 }
914 857
915 QuicTime QuicConnection::OnRetransmissionTimeout() { 858 QuicTime QuicConnection::OnRetransmissionTimeout() {
916 // This guards against registering the alarm later than we should. 859 // This guards against registering the alarm later than we should.
917 // 860 //
918 // If we have packet A and B in the list and we call 861 // If we have packet A and B in the list and we call
919 // MaybeRetransmitPacketForRTO on A, that may trigger a call to 862 // MaybeRetransmitPacketForRTO on A, that may trigger a call to
920 // SetRetransmissionAlarm if A is retransmitted as C. In that case we 863 // SetRetransmissionAlarm if A is retransmitted as C. In that case we
921 // don't want to register the alarm under SetRetransmissionAlarm; we 864 // don't want to register the alarm under SetRetransmissionAlarm; we
922 // want to set it to the RTO of B when we return from this function. 865 // want to set it to the RTO of B when we return from this function.
923 handling_retransmission_timeout_ = true; 866 handling_retransmission_timeout_ = true;
924 867
925 for (int i = 0; i < kMaxPacketsPerRetransmissionAlarm && 868 for (int i = 0; i < kMaxPacketsPerRetransmissionAlarm &&
926 !retransmission_timeouts_.empty(); ++i) { 869 !retransmission_timeouts_.empty(); ++i) {
927 RetransmissionInfo retransmission_info = retransmission_timeouts_.top(); 870 RetransmissionInfo retransmission_info = retransmission_timeouts_.top();
928 DCHECK(retransmission_info.scheduled_time.IsInitialized()); 871 DCHECK(retransmission_info.scheduled_time.IsInitialized());
929 if (retransmission_info.scheduled_time > clock_->Now()) { 872 if (retransmission_info.scheduled_time > clock_->ApproximateNow()) {
930 break; 873 break;
931 } 874 }
932 retransmission_timeouts_.pop(); 875 retransmission_timeouts_.pop();
933 if (!MaybeRetransmitPacketForRTO(retransmission_info.sequence_number)) { 876 if (!MaybeRetransmitPacketForRTO(retransmission_info.sequence_number)) {
934 DLOG(INFO) << "MaybeRetransmitPacketForRTO failed: " 877 DLOG(INFO) << "MaybeRetransmitPacketForRTO failed: "
935 << "adding an extra delay for " 878 << "adding an extra delay for "
936 << retransmission_info.sequence_number; 879 << retransmission_info.sequence_number;
937 retransmission_info.scheduled_time = clock_->Now().Add( 880 retransmission_info.scheduled_time = clock_->ApproximateNow().Add(
938 congestion_manager_.DefaultRetransmissionTime()); 881 congestion_manager_.DefaultRetransmissionTime());
939 retransmission_timeouts_.push(retransmission_info); 882 retransmission_timeouts_.push(retransmission_info);
940 } 883 }
941 } 884 }
942 885
943 handling_retransmission_timeout_ = false; 886 handling_retransmission_timeout_ = false;
944 887
945 if (retransmission_timeouts_.empty()) { 888 if (retransmission_timeouts_.empty()) {
946 return QuicTime::FromMilliseconds(0); 889 return QuicTime::FromMilliseconds(0);
947 } 890 }
948 891
949 // We have packets remaining. Return the absolute RTO of the oldest packet 892 // We have packets remaining. Return the absolute RTO of the oldest packet
950 // on the list. 893 // on the list.
951 return retransmission_timeouts_.top().scheduled_time; 894 return retransmission_timeouts_.top().scheduled_time;
952 } 895 }
953 896
954 void QuicConnection::MaybeProcessRevivedPacket() { 897 void QuicConnection::MaybeProcessRevivedPacket() {
955 QuicFecGroup* group = GetFecGroup(); 898 QuicFecGroup* group = GetFecGroup();
956 if (group == NULL || !group->CanRevive()) { 899 if (group == NULL || !group->CanRevive()) {
957 return; 900 return;
958 } 901 }
959 QuicPacketHeader revived_header; 902 QuicPacketHeader revived_header;
960 char revived_payload[kMaxPacketSize]; 903 char revived_payload[kMaxPacketSize];
961 size_t len = group->Revive(&revived_header, revived_payload, kMaxPacketSize); 904 size_t len = group->Revive(&revived_header, revived_payload, kMaxPacketSize);
962 revived_header.public_header.guid = guid_; 905 revived_header.public_header.guid = guid_;
963 revived_header.public_header.flags = PACKET_PUBLIC_FLAGS_NONE; 906 revived_header.public_header.version_flag = false;
964 revived_header.private_flags = PACKET_PRIVATE_FLAGS_NONE; 907 revived_header.public_header.reset_flag = false;
908 revived_header.fec_flag = false;
965 revived_header.fec_group = kNoFecOffset; 909 revived_header.fec_group = kNoFecOffset;
966 group_map_.erase(last_header_.fec_group); 910 group_map_.erase(last_header_.fec_group);
967 delete group; 911 delete group;
968 912
969 last_packet_revived_ = true; 913 last_packet_revived_ = true;
970 if (debug_visitor_) { 914 if (debug_visitor_) {
971 debug_visitor_->OnRevivedPacket(revived_header, 915 debug_visitor_->OnRevivedPacket(revived_header,
972 StringPiece(revived_payload, len)); 916 StringPiece(revived_payload, len));
973 } 917 }
974 framer_.ProcessRevivedPacket(revived_header, 918 framer_.ProcessRevivedPacket(&revived_header,
975 StringPiece(revived_payload, len)); 919 StringPiece(revived_payload, len));
976 } 920 }
977 921
978 QuicFecGroup* QuicConnection::GetFecGroup() { 922 QuicFecGroup* QuicConnection::GetFecGroup() {
979 QuicFecGroupNumber fec_group_num = last_header_.fec_group; 923 QuicFecGroupNumber fec_group_num = last_header_.fec_group;
980 if (fec_group_num == 0) { 924 if (fec_group_num == 0) {
981 return NULL; 925 return NULL;
982 } 926 }
983 if (group_map_.count(fec_group_num) == 0) { 927 if (group_map_.count(fec_group_num) == 0) {
984 // TODO(rch): limit the number of active FEC groups. 928 // TODO(rch): limit the number of active FEC groups.
985 group_map_[fec_group_num] = new QuicFecGroup(); 929 group_map_[fec_group_num] = new QuicFecGroup();
986 } 930 }
987 return group_map_[fec_group_num]; 931 return group_map_[fec_group_num];
988 } 932 }
989 933
990 void QuicConnection::SendConnectionClose(QuicErrorCode error) { 934 void QuicConnection::SendConnectionClose(QuicErrorCode error) {
991 SendConnectionCloseWithDetails(error, string()); 935 SendConnectionCloseWithDetails(error, string());
992 } 936 }
993 937
994 void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error, 938 void QuicConnection::SendConnectionClosePacket(QuicErrorCode error,
995 const string& details) { 939 const string& details) {
996 DLOG(INFO) << "Force closing with error " << QuicUtils::ErrorToString(error) 940 DLOG(INFO) << "Force closing with error " << QuicUtils::ErrorToString(error)
997 << " (" << error << ")"; 941 << " (" << error << ")";
998 QuicConnectionCloseFrame frame; 942 QuicConnectionCloseFrame frame;
999 frame.error_code = error; 943 frame.error_code = error;
1000 frame.error_details = details; 944 frame.error_details = details;
945 UpdateOutgoingAck();
1001 frame.ack_frame = outgoing_ack_; 946 frame.ack_frame = outgoing_ack_;
1002 947
1003 PacketPair packetpair = packet_creator_.CloseConnection(&frame); 948 SerializedPacket serialized_packet =
1004 // There's no point in retransmitting/queueing this: we're closing the 949 packet_creator_.SerializeConnectionClose(&frame);
1005 // connection. 950 SendOrQueuePacket(serialized_packet.sequence_number, serialized_packet.packet,
1006 WritePacket(packetpair.first, packetpair.second, kForce); 951 serialized_packet.entropy_hash);
952 }
953
954 void QuicConnection::SendConnectionCloseWithDetails(QuicErrorCode error,
955 const string& details) {
956 SendConnectionClosePacket(error, details);
1007 CloseConnection(error, false); 957 CloseConnection(error, false);
1008 } 958 }
1009 959
1010 void QuicConnection::CloseConnection(QuicErrorCode error, bool from_peer) { 960 void QuicConnection::CloseConnection(QuicErrorCode error, bool from_peer) {
1011 // TODO(satyamshekhar): Ask the dispatcher to delete visitor and hence self 961 // TODO(satyamshekhar): Ask the dispatcher to delete visitor and hence self
1012 // if the visitor will always be deleted by closing the connection. 962 // if the visitor will always be deleted by closing the connection.
1013 connected_ = false; 963 connected_ = false;
1014 visitor_->ConnectionClose(error, from_peer); 964 visitor_->ConnectionClose(error, from_peer);
1015 } 965 }
1016 966
967 void QuicConnection::SendGoAway(QuicErrorCode error,
968 QuicStreamId last_good_stream_id,
969 const string& reason) {
970 DLOG(INFO) << "Going away with error " << QuicUtils::ErrorToString(error)
971 << " (" << error << ")";
972 packet_generator_.AddControlFrame(
973 QuicFrame(new QuicGoAwayFrame(error, last_good_stream_id, reason)));
974 }
975
1017 void QuicConnection::CloseFecGroupsBefore( 976 void QuicConnection::CloseFecGroupsBefore(
1018 QuicPacketSequenceNumber sequence_number) { 977 QuicPacketSequenceNumber sequence_number) {
1019 FecGroupMap::iterator it = group_map_.begin(); 978 FecGroupMap::iterator it = group_map_.begin();
1020 while (it != group_map_.end()) { 979 while (it != group_map_.end()) {
1021 // If this is the current group or the group doesn't protect this packet 980 // If this is the current group or the group doesn't protect this packet
1022 // we can ignore it. 981 // we can ignore it.
1023 if (last_header_.fec_group == it->first || 982 if (last_header_.fec_group == it->first ||
1024 !it->second->ProtectsPacketsBefore(sequence_number)) { 983 !it->second->ProtectsPacketsBefore(sequence_number)) {
1025 ++it; 984 ++it;
1026 continue; 985 continue;
1027 } 986 }
1028 QuicFecGroup* fec_group = it->second; 987 QuicFecGroup* fec_group = it->second;
1029 DCHECK(!fec_group->CanRevive()); 988 DCHECK(!fec_group->CanRevive());
1030 FecGroupMap::iterator next = it; 989 FecGroupMap::iterator next = it;
1031 ++next; 990 ++next;
1032 group_map_.erase(it); 991 group_map_.erase(it);
1033 delete fec_group; 992 delete fec_group;
1034 it = next; 993 it = next;
1035 } 994 }
1036 } 995 }
1037 996
1038 bool QuicConnection::HasQueuedData() const { 997 bool QuicConnection::HasQueuedData() const {
1039 return !queued_packets_.empty() || should_send_ack_ || 998 return !queued_packets_.empty() || packet_generator_.HasQueuedData();
1040 should_send_congestion_feedback_;
1041 } 999 }
1042 1000
1043 bool QuicConnection::CheckForTimeout() { 1001 bool QuicConnection::CheckForTimeout() {
1044 QuicTime now = clock_->Now(); 1002 QuicTime now = clock_->ApproximateNow();
1045 QuicTime::Delta delta = now.Subtract(time_of_last_packet_); 1003 QuicTime time_of_last_packet = std::max(time_of_last_received_packet_,
1046 DVLOG(1) << "last_packet " << time_of_last_packet_.ToMicroseconds() 1004 time_of_last_sent_packet_);
1005
1006 QuicTime::Delta delta = now.Subtract(time_of_last_packet);
1007 DVLOG(1) << "last packet " << time_of_last_packet.ToMicroseconds()
1047 << " now:" << now.ToMicroseconds() 1008 << " now:" << now.ToMicroseconds()
1048 << " delta:" << delta.ToMicroseconds(); 1009 << " delta:" << delta.ToMicroseconds();
1049 if (delta >= timeout_) { 1010 if (delta >= timeout_) {
1050 SendConnectionClose(QUIC_CONNECTION_TIMED_OUT); 1011 SendConnectionClose(QUIC_CONNECTION_TIMED_OUT);
1051 return true; 1012 return true;
1052 } 1013 }
1053 helper_->SetTimeoutAlarm(timeout_.Subtract(delta)); 1014 helper_->SetTimeoutAlarm(timeout_.Subtract(delta));
1054 return false; 1015 return false;
1055 } 1016 }
1056 1017
1057 } // namespace net 1018 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/quic_connection.h ('k') | net/quic/quic_connection_helper_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698