Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: net/udp/udp_socket_win.cc

Issue 861963002: UDP: Windows implementation using non-blocking IO (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: added perf test Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« net/udp/udp_socket_unittest.cc ('K') | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/udp/udp_socket_win.h" 5 #include "net/udp/udp_socket_win.h"
6 6
7 #include <mstcpip.h> 7 #include <mstcpip.h>
8 8
9 #include "base/basictypes.h" 9 #include "base/basictypes.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
(...skipping 26 matching lines...) Expand all
37 37
38 namespace net { 38 namespace net {
39 39
40 // This class encapsulates all the state that has to be preserved as long as 40 // This class encapsulates all the state that has to be preserved as long as
41 // there is a network IO operation in progress. If the owner UDPSocketWin 41 // there is a network IO operation in progress. If the owner UDPSocketWin
42 // is destroyed while an operation is in progress, the Core is detached and it 42 // is destroyed while an operation is in progress, the Core is detached and it
43 // lives until the operation completes and the OS doesn't reference any resource 43 // lives until the operation completes and the OS doesn't reference any resource
44 // declared on this class anymore. 44 // declared on this class anymore.
45 class UDPSocketWin::Core : public base::RefCounted<Core> { 45 class UDPSocketWin::Core : public base::RefCounted<Core> {
46 public: 46 public:
47 explicit Core(UDPSocketWin* socket); 47 explicit Core(UDPSocketWin* socket, bool use_overlapped_io);
48 48
49 // Start watching for the end of a read or write operation. 49 // Start watching for the end of a read or write operation.
rvargas (doing something else) 2015/01/22 23:45:13 nit: the comment is stale (for nb)
Alpha Left Google 2015/01/23 02:29:47 Done.
50 void WatchForRead(); 50 void WatchForRead();
51 void WatchForWrite(); 51 void WatchForWrite();
52 52
53 // The UDPSocketWin is going away. 53 // The UDPSocketWin is going away.
54 void Detach() { socket_ = NULL; } 54 void Detach() { socket_ = NULL; }
55 55
56 // The separate OVERLAPPED variables for asynchronous operation. 56 bool use_overlapped_io() const { return use_overlapped_io_; }
57 OVERLAPPED read_overlapped_;
58 OVERLAPPED write_overlapped_;
59 57
60 // The buffers used in Read() and Write(). 58 OVERLAPPED* read_overlapped() { return &read_overlapped_; }
61 scoped_refptr<IOBuffer> read_iobuffer_; 59 OVERLAPPED* write_overlapped() { return &write_overlapped_; }
62 scoped_refptr<IOBuffer> write_iobuffer_; 60 WSAEVENT read_event() const { return read_event_; }
rvargas (doing something else) 2015/01/22 23:45:14 nit: I think we need some empty lines somewhere he
Alpha Left Google 2015/01/23 02:29:47 Done.
61 WSAEVENT write_event() const { return write_event_; }
62 scoped_refptr<IOBuffer> read_iobuffer() const { return read_iobuffer_; }
63 scoped_refptr<IOBuffer> write_iobuffer() const { return write_iobuffer_; }
64 void set_read_iobuffer(scoped_refptr<IOBuffer> buf, int len) {
65 read_iobuffer_ = buf;
66 read_iobuffer_len_ = len;
67 }
68 void set_write_iobuffer(scoped_refptr<IOBuffer> buf, int len) {
69 write_iobuffer_ = buf;
70 write_iobuffer_len_ = len;
71 }
63 72
64 // The address storage passed to WSARecvFrom(). 73 int read_iobuffer_len() const { return read_iobuffer_len_; }
65 SockaddrStorage recv_addr_storage_; 74 int write_iobuffer_len() const { return write_iobuffer_len_; }
75
76 SockaddrStorage* recv_addr_storage() { return &recv_addr_storage_; }
66 77
67 private: 78 private:
68 friend class base::RefCounted<Core>; 79 friend class base::RefCounted<Core>;
69 80
70 class ReadDelegate : public base::win::ObjectWatcher::Delegate { 81 class ReadDelegate : public base::win::ObjectWatcher::Delegate {
71 public: 82 public:
72 explicit ReadDelegate(Core* core) : core_(core) {} 83 explicit ReadDelegate(Core* core) : core_(core) {}
73 virtual ~ReadDelegate() {} 84 virtual ~ReadDelegate() {}
74 85
75 // base::ObjectWatcher::Delegate methods: 86 // base::ObjectWatcher::Delegate methods:
(...skipping 12 matching lines...) Expand all
88 virtual void OnObjectSignaled(HANDLE object); 99 virtual void OnObjectSignaled(HANDLE object);
89 100
90 private: 101 private:
91 Core* const core_; 102 Core* const core_;
92 }; 103 };
93 104
94 ~Core(); 105 ~Core();
95 106
96 // The socket that created this object. 107 // The socket that created this object.
97 UDPSocketWin* socket_; 108 UDPSocketWin* socket_;
109 const bool use_overlapped_io_;
98 110
99 // |reader_| handles the signals from |read_watcher_|. 111 // |reader_| handles the signals from |read_watcher_|.
100 ReadDelegate reader_; 112 ReadDelegate reader_;
101 // |writer_| handles the signals from |write_watcher_|. 113 // |writer_| handles the signals from |write_watcher_|.
102 WriteDelegate writer_; 114 WriteDelegate writer_;
103 115
104 // |read_watcher_| watches for events from Read(). 116 // |read_watcher_| watches for events from Read().
105 base::win::ObjectWatcher read_watcher_; 117 base::win::ObjectWatcher read_watcher_;
106 // |write_watcher_| watches for events from Write(); 118 // |write_watcher_| watches for events from Write();
107 base::win::ObjectWatcher write_watcher_; 119 base::win::ObjectWatcher write_watcher_;
108 120
121 // The separate OVERLAPPED variables for asynchronous operation.
122 OVERLAPPED read_overlapped_;
123 OVERLAPPED write_overlapped_;
124
125 // Separate events for non-blocking IO operations.
126 WSAEVENT read_event_;
rvargas (doing something else) 2015/01/22 23:45:14 Don't do manual handle management. Use a version o
Alpha Left Google 2015/01/23 02:29:47 I don't see one for WSAEvent. I also think it's st
rvargas (doing something else) 2015/01/23 20:05:24 You can use a ScopedHandle directly, or define ty
Alpha Left Google 2015/01/27 01:52:26 In this case I'll leave the handle management in C
127 WSAEVENT write_event_;
128
129 // The buffers used in Read() and Write().
130 scoped_refptr<IOBuffer> read_iobuffer_;
131 scoped_refptr<IOBuffer> write_iobuffer_;
132 int read_iobuffer_len_;
133 int write_iobuffer_len_;
134
135 // The address storage passed to WSARecvFrom().
136 SockaddrStorage recv_addr_storage_;
137
109 DISALLOW_COPY_AND_ASSIGN(Core); 138 DISALLOW_COPY_AND_ASSIGN(Core);
110 }; 139 };
111 140
112 UDPSocketWin::Core::Core(UDPSocketWin* socket) 141 UDPSocketWin::Core::Core(UDPSocketWin* socket, bool use_overlapped_io)
113 : socket_(socket), 142 : socket_(socket),
143 use_overlapped_io_(use_overlapped_io),
114 reader_(this), 144 reader_(this),
115 writer_(this) { 145 writer_(this),
146 read_iobuffer_len_(0),
147 write_iobuffer_len_(0) {
116 memset(&read_overlapped_, 0, sizeof(read_overlapped_)); 148 memset(&read_overlapped_, 0, sizeof(read_overlapped_));
117 memset(&write_overlapped_, 0, sizeof(write_overlapped_)); 149 memset(&write_overlapped_, 0, sizeof(write_overlapped_));
118 150
119 read_overlapped_.hEvent = WSACreateEvent(); 151 if (use_overlapped_io) {
120 write_overlapped_.hEvent = WSACreateEvent(); 152 read_overlapped_.hEvent = WSACreateEvent();
153 write_overlapped_.hEvent = WSACreateEvent();
154 } else {
155 read_event_ = WSACreateEvent();
156 write_event_ = WSACreateEvent();
157 }
121 } 158 }
122 159
123 UDPSocketWin::Core::~Core() { 160 UDPSocketWin::Core::~Core() {
124 // Make sure the message loop is not watching this object anymore. 161 // Make sure the message loop is not watching this object anymore.
125 read_watcher_.StopWatching(); 162 read_watcher_.StopWatching();
126 write_watcher_.StopWatching(); 163 write_watcher_.StopWatching();
127 164
128 WSACloseEvent(read_overlapped_.hEvent); 165 if (use_overlapped_io_) {
129 memset(&read_overlapped_, 0xaf, sizeof(read_overlapped_)); 166 WSACloseEvent(read_overlapped_.hEvent);
130 WSACloseEvent(write_overlapped_.hEvent); 167 memset(&read_overlapped_, 0xaf, sizeof(read_overlapped_));
rvargas (doing something else) 2015/01/22 23:45:14 I know this is the old behavior, but we should not
Alpha Left Google 2015/01/23 02:29:47 Done.
131 memset(&write_overlapped_, 0xaf, sizeof(write_overlapped_)); 168 WSACloseEvent(write_overlapped_.hEvent);
169 memset(&write_overlapped_, 0xaf, sizeof(write_overlapped_));
170 } else {
171 WSACloseEvent(read_event_);
172 WSACloseEvent(write_event_);
173 }
132 } 174 }
133 175
134 void UDPSocketWin::Core::WatchForRead() { 176 void UDPSocketWin::Core::WatchForRead() {
135 // We grab an extra reference because there is an IO operation in progress. 177 // We grab an extra reference because there is an IO operation in progress.
136 // Balanced in ReadDelegate::OnObjectSignaled(). 178 // Balanced in ReadDelegate::OnObjectSignaled().
137 AddRef(); 179 AddRef();
rvargas (doing something else) 2015/01/22 23:45:14 I don't think this is needed for non-blocking. In
Alpha Left Google 2015/01/23 02:29:47 Core contains a large amount of member variables t
rvargas (doing something else) 2015/01/23 20:05:24 That's true, but it makes the post-experiment clea
Alpha Left Google 2015/01/27 01:52:26 Okay I'll keep Core untouched but duplicate needed
138 read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_); 180 if (use_overlapped_io_)
181 read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_);
182 else
183 read_watcher_.StartWatching(read_event_, &reader_);
139 } 184 }
140 185
141 void UDPSocketWin::Core::WatchForWrite() { 186 void UDPSocketWin::Core::WatchForWrite() {
142 // We grab an extra reference because there is an IO operation in progress. 187 // We grab an extra reference because there is an IO operation in progress.
143 // Balanced in WriteDelegate::OnObjectSignaled(). 188 // Balanced in WriteDelegate::OnObjectSignaled().
144 AddRef(); 189 AddRef();
145 write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_); 190 if (use_overlapped_io_)
191 write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_);
192 else
193 write_watcher_.StartWatching(write_event_, &writer_);
146 } 194 }
147 195
148 void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) { 196 void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
149 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed. 197 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed.
150 tracked_objects::ScopedTracker tracking_profile( 198 tracked_objects::ScopedTracker tracking_profile(
151 FROM_HERE_WITH_EXPLICIT_FUNCTION( 199 FROM_HERE_WITH_EXPLICIT_FUNCTION(
152 "UDPSocketWin_Core_ReadDelegate_OnObjectSignaled")); 200 "UDPSocketWin_Core_ReadDelegate_OnObjectSignaled"));
153 201
154 DCHECK_EQ(object, core_->read_overlapped_.hEvent); 202 if (core_->use_overlapped_io()) {
155 if (core_->socket_) 203 DCHECK_EQ(object, core_->read_overlapped()->hEvent);
156 core_->socket_->DidCompleteRead(); 204 if (core_->socket_)
205 core_->socket_->DidCompleteReadOverlapped();
206 } else {
207 DCHECK_EQ(object, core_->read_event());
208 if (core_->socket_)
209 core_->socket_->DidCompleteReadNonBlocking();
rvargas (doing something else) 2015/01/22 23:45:14 There is no pending read (no read is completing)
Alpha Left Google 2015/01/23 02:29:47 Changed the naming to OnReadSignaledNonBlocking.
210 }
157 211
158 core_->Release(); 212 core_->Release();
159 } 213 }
160 214
161 void UDPSocketWin::Core::WriteDelegate::OnObjectSignaled(HANDLE object) { 215 void UDPSocketWin::Core::WriteDelegate::OnObjectSignaled(HANDLE object) {
162 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed. 216 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed.
163 tracked_objects::ScopedTracker tracking_profile( 217 tracked_objects::ScopedTracker tracking_profile(
164 FROM_HERE_WITH_EXPLICIT_FUNCTION( 218 FROM_HERE_WITH_EXPLICIT_FUNCTION(
165 "UDPSocketWin_Core_WriteDelegate_OnObjectSignaled")); 219 "UDPSocketWin_Core_WriteDelegate_OnObjectSignaled"));
166 220
167 DCHECK_EQ(object, core_->write_overlapped_.hEvent); 221 if (core_->use_overlapped_io()) {
168 if (core_->socket_) 222 DCHECK_EQ(object, core_->write_overlapped()->hEvent);
169 core_->socket_->DidCompleteWrite(); 223 if (core_->socket_)
224 core_->socket_->DidCompleteWriteOverlapped();
225 } else {
226 DCHECK_EQ(object, core_->write_event());
227 if (core_->socket_)
228 core_->socket_->DidCompleteWriteNonBlocking();
229 }
170 230
171 core_->Release(); 231 core_->Release();
172 } 232 }
173 //----------------------------------------------------------------------------- 233 //-----------------------------------------------------------------------------
174 234
175 QwaveAPI::QwaveAPI() : qwave_supported_(false) { 235 QwaveAPI::QwaveAPI() : qwave_supported_(false) {
176 HMODULE qwave = LoadLibrary(L"qwave.dll"); 236 HMODULE qwave = LoadLibrary(L"qwave.dll");
177 if (!qwave) 237 if (!qwave)
178 return; 238 return;
179 create_handle_func_ = 239 create_handle_func_ =
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 net::NetLog* net_log, 314 net::NetLog* net_log,
255 const net::NetLog::Source& source) 315 const net::NetLog::Source& source)
256 : socket_(INVALID_SOCKET), 316 : socket_(INVALID_SOCKET),
257 addr_family_(0), 317 addr_family_(0),
258 is_connected_(false), 318 is_connected_(false),
259 socket_options_(SOCKET_OPTION_MULTICAST_LOOP), 319 socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
260 multicast_interface_(0), 320 multicast_interface_(0),
261 multicast_time_to_live_(1), 321 multicast_time_to_live_(1),
262 bind_type_(bind_type), 322 bind_type_(bind_type),
263 rand_int_cb_(rand_int_cb), 323 rand_int_cb_(rand_int_cb),
324 non_blocking_reads_initialized_(false),
325 non_blocking_writes_initialized_(false),
264 recv_from_address_(NULL), 326 recv_from_address_(NULL),
265 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)), 327 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)),
266 qos_handle_(NULL), 328 qos_handle_(NULL),
267 qos_flow_id_(0) { 329 qos_flow_id_(0),
330 use_overlapped_io_(true) {
268 EnsureWinsockInit(); 331 EnsureWinsockInit();
269 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE, 332 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
270 source.ToEventParametersCallback()); 333 source.ToEventParametersCallback());
271 if (bind_type == DatagramSocket::RANDOM_BIND) 334 if (bind_type == DatagramSocket::RANDOM_BIND)
272 DCHECK(!rand_int_cb.is_null()); 335 DCHECK(!rand_int_cb.is_null());
273 } 336 }
274 337
275 UDPSocketWin::~UDPSocketWin() { 338 UDPSocketWin::~UDPSocketWin() {
276 Close(); 339 Close();
277 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE); 340 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
278 } 341 }
279 342
280 int UDPSocketWin::Open(AddressFamily address_family) { 343 int UDPSocketWin::Open(AddressFamily address_family) {
281 DCHECK(CalledOnValidThread()); 344 DCHECK(CalledOnValidThread());
282 DCHECK_EQ(socket_, INVALID_SOCKET); 345 DCHECK_EQ(socket_, INVALID_SOCKET);
283 346
284 addr_family_ = ConvertAddressFamily(address_family); 347 addr_family_ = ConvertAddressFamily(address_family);
285 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, IPPROTO_UDP); 348 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, IPPROTO_UDP);
286 if (socket_ == INVALID_SOCKET) 349 if (socket_ == INVALID_SOCKET)
287 return MapSystemError(WSAGetLastError()); 350 return MapSystemError(WSAGetLastError());
288 core_ = new Core(this); 351 core_ = new Core(this, use_overlapped_io_);
289 return OK; 352 return OK;
290 } 353 }
291 354
292 void UDPSocketWin::Close() { 355 void UDPSocketWin::Close() {
293 DCHECK(CalledOnValidThread()); 356 DCHECK(CalledOnValidThread());
294 357
295 if (socket_ == INVALID_SOCKET) 358 if (socket_ == INVALID_SOCKET)
296 return; 359 return;
297 360
298 if (qos_handle_) { 361 if (qos_handle_) {
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
370 int buf_len, 433 int buf_len,
371 IPEndPoint* address, 434 IPEndPoint* address,
372 const CompletionCallback& callback) { 435 const CompletionCallback& callback) {
373 DCHECK(CalledOnValidThread()); 436 DCHECK(CalledOnValidThread());
374 DCHECK_NE(INVALID_SOCKET, socket_); 437 DCHECK_NE(INVALID_SOCKET, socket_);
375 CHECK(read_callback_.is_null()); 438 CHECK(read_callback_.is_null());
376 DCHECK(!recv_from_address_); 439 DCHECK(!recv_from_address_);
377 DCHECK(!callback.is_null()); // Synchronous operation not supported. 440 DCHECK(!callback.is_null()); // Synchronous operation not supported.
378 DCHECK_GT(buf_len, 0); 441 DCHECK_GT(buf_len, 0);
379 442
380 int nread = InternalRecvFrom(buf, buf_len, address); 443 int nread = core_->use_overlapped_io()
rvargas (doing something else) 2015/01/22 23:45:14 use_overlapped_io_ ? I mean, why ask the core abo
Alpha Left Google 2015/01/23 02:29:47 Done.
444 ? InternalRecvFromOverlapped(buf, buf_len, address)
445 : InternalRecvFromNonBlocking(buf, buf_len, address);
381 if (nread != ERR_IO_PENDING) 446 if (nread != ERR_IO_PENDING)
382 return nread; 447 return nread;
383 448
384 read_callback_ = callback; 449 read_callback_ = callback;
385 recv_from_address_ = address; 450 recv_from_address_ = address;
386 return ERR_IO_PENDING; 451 return ERR_IO_PENDING;
387 } 452 }
388 453
389 int UDPSocketWin::Write(IOBuffer* buf, 454 int UDPSocketWin::Write(IOBuffer* buf,
390 int buf_len, 455 int buf_len,
(...skipping 12 matching lines...) Expand all
403 int buf_len, 468 int buf_len,
404 const IPEndPoint* address, 469 const IPEndPoint* address,
405 const CompletionCallback& callback) { 470 const CompletionCallback& callback) {
406 DCHECK(CalledOnValidThread()); 471 DCHECK(CalledOnValidThread());
407 DCHECK_NE(INVALID_SOCKET, socket_); 472 DCHECK_NE(INVALID_SOCKET, socket_);
408 CHECK(write_callback_.is_null()); 473 CHECK(write_callback_.is_null());
409 DCHECK(!callback.is_null()); // Synchronous operation not supported. 474 DCHECK(!callback.is_null()); // Synchronous operation not supported.
410 DCHECK_GT(buf_len, 0); 475 DCHECK_GT(buf_len, 0);
411 DCHECK(!send_to_address_.get()); 476 DCHECK(!send_to_address_.get());
412 477
413 int nwrite = InternalSendTo(buf, buf_len, address); 478 int nwrite = core_->use_overlapped_io()
479 ? InternalSendToOverlapped(buf, buf_len, address)
480 : InternalSendToNonBlocking(buf, buf_len, address);
414 if (nwrite != ERR_IO_PENDING) 481 if (nwrite != ERR_IO_PENDING)
415 return nwrite; 482 return nwrite;
416 483
417 if (address) 484 if (address)
418 send_to_address_.reset(new IPEndPoint(*address)); 485 send_to_address_.reset(new IPEndPoint(*address));
419 write_callback_ = callback; 486 write_callback_ = callback;
420 return ERR_IO_PENDING; 487 return ERR_IO_PENDING;
421 } 488 }
422 489
423 int UDPSocketWin::Connect(const IPEndPoint& address) { 490 int UDPSocketWin::Connect(const IPEndPoint& address) {
(...skipping 135 matching lines...) Expand 10 before | Expand all | Expand 10 after
559 void UDPSocketWin::DoWriteCallback(int rv) { 626 void UDPSocketWin::DoWriteCallback(int rv) {
560 DCHECK_NE(rv, ERR_IO_PENDING); 627 DCHECK_NE(rv, ERR_IO_PENDING);
561 DCHECK(!write_callback_.is_null()); 628 DCHECK(!write_callback_.is_null());
562 629
563 // since Run may result in Write being called, clear write_callback_ up front. 630 // since Run may result in Write being called, clear write_callback_ up front.
564 CompletionCallback c = write_callback_; 631 CompletionCallback c = write_callback_;
565 write_callback_.Reset(); 632 write_callback_.Reset();
566 c.Run(rv); 633 c.Run(rv);
567 } 634 }
568 635
569 void UDPSocketWin::DidCompleteRead() { 636 void UDPSocketWin::DidCompleteReadOverlapped() {
570 DWORD num_bytes, flags; 637 DWORD num_bytes, flags;
571 BOOL ok = WSAGetOverlappedResult(socket_, &core_->read_overlapped_, 638 BOOL ok = WSAGetOverlappedResult(socket_, core_->read_overlapped(),
572 &num_bytes, FALSE, &flags); 639 &num_bytes, FALSE, &flags);
573 WSAResetEvent(core_->read_overlapped_.hEvent); 640 WSAResetEvent(core_->read_overlapped()->hEvent);
574 int result = ok ? num_bytes : MapSystemError(WSAGetLastError()); 641 int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
575 // Convert address. 642 // Convert address.
576 if (recv_from_address_ && result >= 0) { 643 if (recv_from_address_ && result >= 0) {
577 if (!ReceiveAddressToIPEndpoint(recv_from_address_)) 644 if (!ReceiveAddressToIPEndpoint(recv_from_address_))
578 result = ERR_ADDRESS_INVALID; 645 result = ERR_ADDRESS_INVALID;
579 } 646 }
580 LogRead(result, core_->read_iobuffer_->data()); 647 LogRead(result, core_->read_iobuffer()->data());
581 core_->read_iobuffer_ = NULL; 648 core_->set_read_iobuffer(NULL, 0);
582 recv_from_address_ = NULL; 649 recv_from_address_ = NULL;
583 DoReadCallback(result); 650 DoReadCallback(result);
584 } 651 }
585 652
653 void UDPSocketWin::DidCompleteReadNonBlocking() {
rvargas (doing something else) 2015/01/22 23:45:13 follow declaration order.
rvargas (doing something else) 2015/01/22 23:45:14 DCHECK(buffer)? And we should dcheck(!buffer) on R
Alpha Left Google 2015/01/23 02:29:47 Done.
Alpha Left Google 2015/01/23 02:29:47 Done for the first one. InternalRecvFromNonBlocki
rvargas (doing something else) 2015/01/23 20:05:24 Right. That's why I suggested adding a dcehck on R
Alpha Left Google 2015/01/27 01:52:26 Acknowledged.
654 WSANETWORKEVENTS network_events;
655 int os_error = 0;
656 int rv = WSAEnumNetworkEvents(socket_, core_->read_event(), &network_events);
657 if (rv == SOCKET_ERROR) {
658 os_error = WSAGetLastError();
659 rv = MapSystemError(os_error);
660 DoReadCallback(rv);
661 return;
662 }
663 if (network_events.lNetworkEvents) {
664 DCHECK(network_events.lNetworkEvents & FD_READ);
665 // If network_events.iErrorCode[FD_READ_BIT] is nonzero, still call
rvargas (doing something else) 2015/01/22 23:45:14 I don't really follow this comment.
Alpha Left Google 2015/01/23 02:29:47 This is taken from tcp_socket_win but I updated th
rvargas (doing something else) 2015/01/23 20:05:24 ok. It still seems somewhat red-herringish as the
Alpha Left Google 2015/01/27 01:52:26 It sounds to me you think the comment is bogus. In
666 // InternalRecvFromNonBlocking() because it reports a more accurate error
667 // code.
668 rv = InternalRecvFromNonBlocking(core_->read_iobuffer().get(),
669 core_->read_iobuffer_len(),
670 recv_from_address_);
671 if (rv == ERR_IO_PENDING)
672 return;
673 } else {
674 core_->WatchForRead();
675 return;
676 }
677 // Convert address if there's data received.
678 if (recv_from_address_ && rv >= 0) {
rvargas (doing something else) 2015/01/22 23:45:14 Wasn't this (and logging the read), already done i
Alpha Left Google 2015/01/23 02:29:47 Done.
679 if (!ReceiveAddressToIPEndpoint(recv_from_address_))
680 rv = ERR_ADDRESS_INVALID;
681 }
682 LogRead(rv, core_->read_iobuffer()->data());
683 core_->set_read_iobuffer(NULL, 0);
684 recv_from_address_ = NULL;
685 DoReadCallback(rv);
686 }
687
586 void UDPSocketWin::LogRead(int result, const char* bytes) const { 688 void UDPSocketWin::LogRead(int result, const char* bytes) const {
587 if (result < 0) { 689 if (result < 0) {
588 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result); 690 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
589 return; 691 return;
590 } 692 }
591 693
592 if (net_log_.IsLogging()) { 694 if (net_log_.IsLogging()) {
593 // Get address for logging, if |address| is NULL. 695 // Get address for logging, if |address| is NULL.
594 IPEndPoint address; 696 IPEndPoint address;
595 bool is_address_valid = ReceiveAddressToIPEndpoint(&address); 697 bool is_address_valid = ReceiveAddressToIPEndpoint(&address);
596 net_log_.AddEvent( 698 net_log_.AddEvent(
597 NetLog::TYPE_UDP_BYTES_RECEIVED, 699 NetLog::TYPE_UDP_BYTES_RECEIVED,
598 CreateNetLogUDPDataTranferCallback( 700 CreateNetLogUDPDataTranferCallback(
599 result, bytes, 701 result, bytes,
600 is_address_valid ? &address : NULL)); 702 is_address_valid ? &address : NULL));
601 } 703 }
602 704
603 base::StatsCounter read_bytes("udp.read_bytes"); 705 base::StatsCounter read_bytes("udp.read_bytes");
604 read_bytes.Add(result); 706 read_bytes.Add(result);
605 NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result); 707 NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
606 } 708 }
607 709
608 void UDPSocketWin::DidCompleteWrite() { 710 void UDPSocketWin::DidCompleteWriteOverlapped() {
609 DWORD num_bytes, flags; 711 DWORD num_bytes, flags;
610 BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_, 712 BOOL ok = WSAGetOverlappedResult(socket_, core_->write_overlapped(),
611 &num_bytes, FALSE, &flags); 713 &num_bytes, FALSE, &flags);
612 WSAResetEvent(core_->write_overlapped_.hEvent); 714 WSAResetEvent(core_->write_overlapped()->hEvent);
613 int result = ok ? num_bytes : MapSystemError(WSAGetLastError()); 715 int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
614 LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get()); 716 LogWrite(result, core_->write_iobuffer()->data(), send_to_address_.get());
615 717
616 send_to_address_.reset(); 718 send_to_address_.reset();
617 core_->write_iobuffer_ = NULL; 719 core_->set_write_iobuffer(NULL, 0);
618 DoWriteCallback(result); 720 DoWriteCallback(result);
619 } 721 }
620 722
723 void UDPSocketWin::DidCompleteWriteNonBlocking() {
724 WSANETWORKEVENTS network_events;
725 int os_error = 0;
726 int rv = WSAEnumNetworkEvents(socket_, core_->write_event(), &network_events);
727 if (rv == SOCKET_ERROR) {
728 os_error = WSAGetLastError();
729 rv = MapSystemError(os_error);
730 DoWriteCallback(rv);
731 return;
732 } else if (network_events.lNetworkEvents) {
733 DCHECK(network_events.lNetworkEvents & FD_WRITE);
734 // If network_events.iErrorCode[FD_WRITE_BIT] is nonzero, still call
735 // InternalSendtoNonBlocking() because it reports a more accurate error
736 // code.
737 rv = InternalSendToNonBlocking(core_->write_iobuffer().get(),
738 core_->write_iobuffer_len(),
739 send_to_address_.get());
740 if (rv == ERR_IO_PENDING)
741 return;
742 } else {
743 core_->WatchForWrite();
744 return;
745 }
746 LogWrite(rv, core_->write_iobuffer()->data(), send_to_address_.get());
747 core_->set_write_iobuffer(NULL, 0);
748 send_to_address_.reset();
749 DoWriteCallback(rv);
750 }
751
621 void UDPSocketWin::LogWrite(int result, 752 void UDPSocketWin::LogWrite(int result,
622 const char* bytes, 753 const char* bytes,
623 const IPEndPoint* address) const { 754 const IPEndPoint* address) const {
624 if (result < 0) { 755 if (result < 0) {
625 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result); 756 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
626 return; 757 return;
627 } 758 }
628 759
629 if (net_log_.IsLogging()) { 760 if (net_log_.IsLogging()) {
630 net_log_.AddEvent( 761 net_log_.AddEvent(
631 NetLog::TYPE_UDP_BYTES_SENT, 762 NetLog::TYPE_UDP_BYTES_SENT,
632 CreateNetLogUDPDataTranferCallback(result, bytes, address)); 763 CreateNetLogUDPDataTranferCallback(result, bytes, address));
633 } 764 }
634 765
635 base::StatsCounter write_bytes("udp.write_bytes"); 766 base::StatsCounter write_bytes("udp.write_bytes");
636 write_bytes.Add(result); 767 write_bytes.Add(result);
637 NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result); 768 NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
638 } 769 }
639 770
640 int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len, 771 int UDPSocketWin::InternalRecvFromOverlapped(IOBuffer* buf,
641 IPEndPoint* address) { 772 int buf_len,
642 DCHECK(!core_->read_iobuffer_.get()); 773 IPEndPoint* address) {
643 SockaddrStorage& storage = core_->recv_addr_storage_; 774 DCHECK(!core_->read_iobuffer());
644 storage.addr_len = sizeof(storage.addr_storage); 775 SockaddrStorage* storage = core_->recv_addr_storage();
776 storage->addr_len = sizeof(storage->addr_storage);
645 777
646 WSABUF read_buffer; 778 WSABUF read_buffer;
647 read_buffer.buf = buf->data(); 779 read_buffer.buf = buf->data();
648 read_buffer.len = buf_len; 780 read_buffer.len = buf_len;
649 781
650 DWORD flags = 0; 782 DWORD flags = 0;
651 DWORD num; 783 DWORD num;
652 CHECK_NE(INVALID_SOCKET, socket_); 784 CHECK_NE(INVALID_SOCKET, socket_);
653 AssertEventNotSignaled(core_->read_overlapped_.hEvent); 785 AssertEventNotSignaled(core_->read_overlapped()->hEvent);
654 int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage.addr, 786 int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage->addr,
655 &storage.addr_len, &core_->read_overlapped_, NULL); 787 &storage->addr_len, core_->read_overlapped(), NULL);
656 if (rv == 0) { 788 if (rv == 0) {
657 if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) { 789 if (ResetEventIfSignaled(core_->read_overlapped()->hEvent)) {
658 int result = num; 790 int result = num;
659 // Convert address. 791 // Convert address.
660 if (address && result >= 0) { 792 if (address && result >= 0) {
661 if (!ReceiveAddressToIPEndpoint(address)) 793 if (!ReceiveAddressToIPEndpoint(address))
662 result = ERR_ADDRESS_INVALID; 794 result = ERR_ADDRESS_INVALID;
663 } 795 }
664 LogRead(result, buf->data()); 796 LogRead(result, buf->data());
665 return result; 797 return result;
666 } 798 }
667 } else { 799 } else {
668 int os_error = WSAGetLastError(); 800 int os_error = WSAGetLastError();
669 if (os_error != WSA_IO_PENDING) { 801 if (os_error != WSA_IO_PENDING) {
670 int result = MapSystemError(os_error); 802 int result = MapSystemError(os_error);
671 LogRead(result, NULL); 803 LogRead(result, NULL);
672 return result; 804 return result;
673 } 805 }
674 } 806 }
675 core_->WatchForRead(); 807 core_->WatchForRead();
676 core_->read_iobuffer_ = buf; 808 core_->set_read_iobuffer(buf, buf_len);
677 return ERR_IO_PENDING; 809 return ERR_IO_PENDING;
678 } 810 }
679 811
680 int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len, 812 int UDPSocketWin::InternalSendToOverlapped(IOBuffer* buf,
681 const IPEndPoint* address) { 813 int buf_len,
682 DCHECK(!core_->write_iobuffer_.get()); 814 const IPEndPoint* address) {
815 DCHECK(!core_->write_iobuffer());
683 SockaddrStorage storage; 816 SockaddrStorage storage;
684 struct sockaddr* addr = storage.addr; 817 struct sockaddr* addr = storage.addr;
685 // Convert address. 818 // Convert address.
686 if (!address) { 819 if (!address) {
687 addr = NULL; 820 addr = NULL;
688 storage.addr_len = 0; 821 storage.addr_len = 0;
689 } else { 822 } else {
690 if (!address->ToSockAddr(addr, &storage.addr_len)) { 823 if (!address->ToSockAddr(addr, &storage.addr_len)) {
691 int result = ERR_ADDRESS_INVALID; 824 int result = ERR_ADDRESS_INVALID;
692 LogWrite(result, NULL, NULL); 825 LogWrite(result, NULL, NULL);
693 return result; 826 return result;
694 } 827 }
695 } 828 }
696 829
697 WSABUF write_buffer; 830 WSABUF write_buffer;
698 write_buffer.buf = buf->data(); 831 write_buffer.buf = buf->data();
699 write_buffer.len = buf_len; 832 write_buffer.len = buf_len;
700 833
701 DWORD flags = 0; 834 DWORD flags = 0;
702 DWORD num; 835 DWORD num;
703 AssertEventNotSignaled(core_->write_overlapped_.hEvent); 836 AssertEventNotSignaled(core_->write_overlapped()->hEvent);
704 int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags, 837 int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags, addr,
705 addr, storage.addr_len, &core_->write_overlapped_, NULL); 838 storage.addr_len, core_->write_overlapped(), NULL);
706 if (rv == 0) { 839 if (rv == 0) {
707 if (ResetEventIfSignaled(core_->write_overlapped_.hEvent)) { 840 if (ResetEventIfSignaled(core_->write_overlapped()->hEvent)) {
708 int result = num; 841 int result = num;
709 LogWrite(result, buf->data(), address); 842 LogWrite(result, buf->data(), address);
710 return result; 843 return result;
711 } 844 }
712 } else { 845 } else {
713 int os_error = WSAGetLastError(); 846 int os_error = WSAGetLastError();
714 if (os_error != WSA_IO_PENDING) { 847 if (os_error != WSA_IO_PENDING) {
715 int result = MapSystemError(os_error); 848 int result = MapSystemError(os_error);
716 LogWrite(result, NULL, NULL); 849 LogWrite(result, NULL, NULL);
717 return result; 850 return result;
718 } 851 }
719 } 852 }
720 853
721 core_->WatchForWrite(); 854 core_->WatchForWrite();
722 core_->write_iobuffer_ = buf; 855 core_->set_write_iobuffer(buf, 0);
723 return ERR_IO_PENDING; 856 return ERR_IO_PENDING;
724 } 857 }
725 858
859 int UDPSocketWin::InternalRecvFromNonBlocking(IOBuffer* buf,
860 int buf_len,
861 IPEndPoint* address) {
862 if (!non_blocking_reads_initialized_) {
863 // After this call the event will be signaled when asynchronous read is
864 // completed.
865 WSAEventSelect(socket_, core_->read_event(), FD_READ);
866 non_blocking_reads_initialized_ = true;
867 }
868 SockaddrStorage* storage = core_->recv_addr_storage();
869 storage->addr_len = sizeof(storage->addr_storage);
870
871 WSABUF read_buffer;
rvargas (doing something else) 2015/01/22 23:45:14 Not needed
Alpha Left Google 2015/01/23 02:29:47 Done.
872 read_buffer.buf = buf->data();
873 read_buffer.len = buf_len;
874
875 CHECK_NE(INVALID_SOCKET, socket_);
876 int rv = recvfrom(socket_, buf->data(), buf_len, 0, storage->addr,
877 &storage->addr_len);
878 if (rv == SOCKET_ERROR) {
879 int os_error = WSAGetLastError();
880 if (os_error != WSAEWOULDBLOCK) {
rvargas (doing something else) 2015/01/22 23:45:13 The code will flow better if processing of wouldbl
Alpha Left Google 2015/01/23 02:29:47 Done.
881 rv = MapSystemError(os_error);
882 LogRead(rv, NULL);
883 return rv;
884 }
885 } else {
886 // Convert address.
887 if (address && rv >= 0 && !ReceiveAddressToIPEndpoint(address)) {
888 rv = ERR_ADDRESS_INVALID;
889 }
890 LogRead(rv, buf->data());
891 return rv;
892 }
893 core_->WatchForRead();
894 core_->set_read_iobuffer(buf, buf_len);
895 return ERR_IO_PENDING;
896 }
897
898 int UDPSocketWin::InternalSendToNonBlocking(IOBuffer* buf,
899 int buf_len,
900 const IPEndPoint* address) {
901 if (!non_blocking_writes_initialized_) {
902 // After this call the event will be signaled when asynchronous write is
903 // completed.
904 WSAEventSelect(socket_, core_->write_event(), FD_WRITE);
905 non_blocking_writes_initialized_ = true;
906 }
907 SockaddrStorage storage;
908 struct sockaddr* addr = storage.addr;
909 // Convert address.
910 if (!address) {
rvargas (doing something else) 2015/01/22 23:45:14 nit: if (address)
Alpha Left Google 2015/01/23 02:29:47 Done.
911 addr = NULL;
912 storage.addr_len = 0;
913 } else {
914 if (!address->ToSockAddr(addr, &storage.addr_len)) {
915 int result = ERR_ADDRESS_INVALID;
916 LogWrite(result, NULL, NULL);
917 return result;
918 }
919 }
920
921 WSABUF write_buffer;
rvargas (doing something else) 2015/01/22 23:45:13 not needed
Alpha Left Google 2015/01/23 02:29:47 Done.
922 write_buffer.buf = buf->data();
923 write_buffer.len = buf_len;
924
925 int rv = sendto(socket_, buf->data(), buf_len, 0, addr, storage.addr_len);
926 if (rv == SOCKET_ERROR) {
927 int os_error = WSAGetLastError();
928 if (os_error != WSAEWOULDBLOCK) {
rvargas (doing something else) 2015/01/22 23:45:13 ditto
Alpha Left Google 2015/01/23 02:29:46 Done.
929 rv = MapSystemError(os_error);
930 LogWrite(rv, NULL, NULL);
931 return rv;
932 }
933 } else {
934 LogWrite(rv, buf->data(), address);
935 return rv;
936 }
937 core_->WatchForWrite();
938 core_->set_write_iobuffer(buf, buf_len);
939 return ERR_IO_PENDING;
940 }
941
726 int UDPSocketWin::SetMulticastOptions() { 942 int UDPSocketWin::SetMulticastOptions() {
727 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) { 943 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
728 DWORD loop = 0; 944 DWORD loop = 0;
729 int protocol_level = 945 int protocol_level =
730 addr_family_ == AF_INET ? IPPROTO_IP : IPPROTO_IPV6; 946 addr_family_ == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
731 int option = 947 int option =
732 addr_family_ == AF_INET ? IP_MULTICAST_LOOP: IPV6_MULTICAST_LOOP; 948 addr_family_ == AF_INET ? IP_MULTICAST_LOOP: IPV6_MULTICAST_LOOP;
733 int rv = setsockopt(socket_, protocol_level, option, 949 int rv = setsockopt(socket_, protocol_level, option,
734 reinterpret_cast<const char*>(&loop), sizeof(loop)); 950 reinterpret_cast<const char*>(&loop), sizeof(loop));
735 if (rv < 0) 951 if (rv < 0)
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
801 for (int i = 0; i < kBindRetries; ++i) { 1017 for (int i = 0; i < kBindRetries; ++i) {
802 int rv = DoBind(IPEndPoint( 1018 int rv = DoBind(IPEndPoint(
803 address, static_cast<uint16>(rand_int_cb_.Run(kPortStart, kPortEnd)))); 1019 address, static_cast<uint16>(rand_int_cb_.Run(kPortStart, kPortEnd))));
804 if (rv == OK || rv != ERR_ADDRESS_IN_USE) 1020 if (rv == OK || rv != ERR_ADDRESS_IN_USE)
805 return rv; 1021 return rv;
806 } 1022 }
807 return DoBind(IPEndPoint(address, 0)); 1023 return DoBind(IPEndPoint(address, 0));
808 } 1024 }
809 1025
810 bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const { 1026 bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const {
811 SockaddrStorage& storage = core_->recv_addr_storage_; 1027 SockaddrStorage* storage = core_->recv_addr_storage();
812 return address->FromSockAddr(storage.addr, storage.addr_len); 1028 return address->FromSockAddr(storage->addr, storage->addr_len);
813 } 1029 }
814 1030
815 int UDPSocketWin::JoinGroup( 1031 int UDPSocketWin::JoinGroup(
816 const IPAddressNumber& group_address) const { 1032 const IPAddressNumber& group_address) const {
817 DCHECK(CalledOnValidThread()); 1033 DCHECK(CalledOnValidThread());
818 if (!is_connected()) 1034 if (!is_connected())
819 return ERR_SOCKET_NOT_CONNECTED; 1035 return ERR_SOCKET_NOT_CONNECTED;
820 1036
821 switch (group_address.size()) { 1037 switch (group_address.size()) {
822 case kIPv4AddressSize: { 1038 case kIPv4AddressSize: {
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
1009 0, 1225 0,
1010 NULL); 1226 NULL);
1011 1227
1012 return OK; 1228 return OK;
1013 } 1229 }
1014 1230
1015 void UDPSocketWin::DetachFromThread() { 1231 void UDPSocketWin::DetachFromThread() {
1016 base::NonThreadSafe::DetachFromThread(); 1232 base::NonThreadSafe::DetachFromThread();
1017 } 1233 }
1018 1234
1235 void UDPSocketWin::UseNonBlockingIO() {
1236 DCHECK(!core_);
1237 use_overlapped_io_ = false;
1238 }
1239
1019 } // namespace net 1240 } // namespace net
OLDNEW
« net/udp/udp_socket_unittest.cc ('K') | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698