Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(964)

Unified Diff: net/udp/udp_socket_win.cc

Issue 861963002: UDP: Windows implementation using non-blocking IO (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: use IsWatching() Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/udp/udp_socket_win.cc
diff --git a/net/udp/udp_socket_win.cc b/net/udp/udp_socket_win.cc
index 3c121b2bd91a6beabbb86f6e6764eee11f06135c..3ef3e864b45a435f76828884f536ef47486dfbae 100644
--- a/net/udp/udp_socket_win.cc
+++ b/net/udp/udp_socket_win.cc
@@ -261,6 +261,9 @@ UDPSocketWin::UDPSocketWin(DatagramSocket::BindType bind_type,
multicast_time_to_live_(1),
bind_type_(bind_type),
rand_int_cb_(rand_int_cb),
+ use_non_blocking_io_(false),
+ read_iobuffer_len_(0),
+ write_iobuffer_len_(0),
recv_from_address_(NULL),
net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)),
qos_handle_(NULL),
@@ -285,7 +288,12 @@ int UDPSocketWin::Open(AddressFamily address_family) {
socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, IPPROTO_UDP);
if (socket_ == INVALID_SOCKET)
return MapSystemError(WSAGetLastError());
- core_ = new Core(this);
+ if (!use_non_blocking_io_) {
+ core_ = new Core(this);
+ } else {
+ read_write_event_.Set(WSACreateEvent());
+ WSAEventSelect(socket_, read_write_event_.Get(), FD_READ | FD_WRITE);
+ }
return OK;
}
@@ -312,8 +320,13 @@ void UDPSocketWin::Close() {
addr_family_ = 0;
is_connected_ = false;
- core_->Detach();
- core_ = NULL;
+ read_write_watcher_.StopWatching();
+ read_write_event_.Close();
+
+ if (core_) {
+ core_->Detach();
+ core_ = NULL;
+ }
}
int UDPSocketWin::GetPeerAddress(IPEndPoint* address) const {
@@ -377,7 +390,8 @@ int UDPSocketWin::RecvFrom(IOBuffer* buf,
DCHECK(!callback.is_null()); // Synchronous operation not supported.
DCHECK_GT(buf_len, 0);
- int nread = InternalRecvFrom(buf, buf_len, address);
+ int nread = core_ ? InternalRecvFromOverlapped(buf, buf_len, address)
+ : InternalRecvFromNonBlocking(buf, buf_len, address);
if (nread != ERR_IO_PENDING)
return nread;
@@ -410,7 +424,8 @@ int UDPSocketWin::SendToOrWrite(IOBuffer* buf,
DCHECK_GT(buf_len, 0);
DCHECK(!send_to_address_.get());
- int nwrite = InternalSendTo(buf, buf_len, address);
+ int nwrite = core_ ? InternalSendToOverlapped(buf, buf_len, address)
+ : InternalSendToNonBlocking(buf, buf_len, address);
if (nwrite != ERR_IO_PENDING)
return nwrite;
@@ -573,31 +588,115 @@ void UDPSocketWin::DidCompleteRead() {
WSAResetEvent(core_->read_overlapped_.hEvent);
int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
// Convert address.
- if (recv_from_address_ && result >= 0) {
- if (!ReceiveAddressToIPEndpoint(recv_from_address_))
+ IPEndPoint address;
+ IPEndPoint* address_to_log = NULL;
+ if (result >= 0) {
+ if (address.FromSockAddr(core_->recv_addr_storage_.addr,
+ core_->recv_addr_storage_.addr_len)) {
+ if (recv_from_address_)
+ *recv_from_address_ = address;
+ address_to_log = &address;
+ } else {
result = ERR_ADDRESS_INVALID;
+ }
}
- LogRead(result, core_->read_iobuffer_->data());
+ LogRead(result, core_->read_iobuffer_->data(), address_to_log);
core_->read_iobuffer_ = NULL;
recv_from_address_ = NULL;
DoReadCallback(result);
}
-void UDPSocketWin::LogRead(int result, const char* bytes) const {
+void UDPSocketWin::DidCompleteWrite() {
+ DWORD num_bytes, flags;
+ BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
+ &num_bytes, FALSE, &flags);
+ WSAResetEvent(core_->write_overlapped_.hEvent);
+ int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
+ LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
+
+ send_to_address_.reset();
+ core_->write_iobuffer_ = NULL;
+ DoWriteCallback(result);
+}
+
+void UDPSocketWin::OnObjectSignaled(HANDLE object) {
+ DCHECK(object == read_write_event_.Get());
+ WSANETWORKEVENTS network_events;
+ int os_error = 0;
+ int rv =
+ WSAEnumNetworkEvents(socket_, read_write_event_.Get(), &network_events);
+ if (rv == SOCKET_ERROR) {
+ os_error = WSAGetLastError();
+ rv = MapSystemError(os_error);
+ if (read_iobuffer_) {
+ read_iobuffer_ = NULL;
+ read_iobuffer_len_ = 0;
+ recv_from_address_ = NULL;
+ DoReadCallback(rv);
+ }
+ if (write_iobuffer_) {
+ write_iobuffer_ = NULL;
+ write_iobuffer_len_ = 0;
+ send_to_address_.reset();
+ DoWriteCallback(rv);
+ }
+ return;
+ }
+ if ((network_events.lNetworkEvents & FD_READ) && read_iobuffer_) {
+ OnReadSignaled();
+ }
+ if ((network_events.lNetworkEvents & FD_WRITE) && write_iobuffer_) {
+ OnWriteSignaled();
+ }
+
+ // There's still pending read / write. Watch for further events.
+ if (read_iobuffer_ || write_iobuffer_) {
+ WatchForReadWrite();
+ }
+}
+
+void UDPSocketWin::OnReadSignaled() {
+ int rv = InternalRecvFromNonBlocking(read_iobuffer_.get(), read_iobuffer_len_,
+ recv_from_address_);
+ if (rv == ERR_IO_PENDING)
+ return;
+ read_iobuffer_ = NULL;
+ read_iobuffer_len_ = 0;
+ recv_from_address_ = NULL;
+ DoReadCallback(rv);
+}
+
+void UDPSocketWin::OnWriteSignaled() {
+ int rv = InternalSendToNonBlocking(write_iobuffer_.get(), write_iobuffer_len_,
+ send_to_address_.get());
+ if (rv == ERR_IO_PENDING)
+ return;
+ write_iobuffer_ = NULL;
+ write_iobuffer_len_ = 0;
+ send_to_address_.reset();
+ DoWriteCallback(rv);
+}
+
+void UDPSocketWin::WatchForReadWrite() {
+ if (read_write_watcher_.IsWatching())
+ return;
+ bool watched =
+ read_write_watcher_.StartWatching(read_write_event_.Get(), this);
+ DCHECK(watched);
+}
+
+void UDPSocketWin::LogRead(int result,
+ const char* bytes,
+ const IPEndPoint* address) const {
if (result < 0) {
net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
return;
}
if (net_log_.IsLogging()) {
- // Get address for logging, if |address| is NULL.
- IPEndPoint address;
- bool is_address_valid = ReceiveAddressToIPEndpoint(&address);
net_log_.AddEvent(
NetLog::TYPE_UDP_BYTES_RECEIVED,
- CreateNetLogUDPDataTranferCallback(
- result, bytes,
- is_address_valid ? &address : NULL));
+ CreateNetLogUDPDataTranferCallback(result, bytes, address));
}
base::StatsCounter read_bytes("udp.read_bytes");
@@ -605,19 +704,6 @@ void UDPSocketWin::LogRead(int result, const char* bytes) const {
NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(result);
}
-void UDPSocketWin::DidCompleteWrite() {
- DWORD num_bytes, flags;
- BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
- &num_bytes, FALSE, &flags);
- WSAResetEvent(core_->write_overlapped_.hEvent);
- int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
- LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
-
- send_to_address_.reset();
- core_->write_iobuffer_ = NULL;
- DoWriteCallback(result);
-}
-
void UDPSocketWin::LogWrite(int result,
const char* bytes,
const IPEndPoint* address) const {
@@ -637,8 +723,9 @@ void UDPSocketWin::LogWrite(int result,
NetworkActivityMonitor::GetInstance()->IncrementBytesSent(result);
}
-int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
- IPEndPoint* address) {
+int UDPSocketWin::InternalRecvFromOverlapped(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
DCHECK(!core_->read_iobuffer_.get());
SockaddrStorage& storage = core_->recv_addr_storage_;
storage.addr_len = sizeof(storage.addr_storage);
@@ -657,18 +744,26 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) {
int result = num;
// Convert address.
- if (address && result >= 0) {
- if (!ReceiveAddressToIPEndpoint(address))
+ IPEndPoint address_storage;
+ IPEndPoint* address_to_log = NULL;
+ if (result >= 0) {
+ if (address_storage.FromSockAddr(core_->recv_addr_storage_.addr,
+ core_->recv_addr_storage_.addr_len)) {
+ if (address)
+ *address = address_storage;
+ address_to_log = &address_storage;
+ } else {
result = ERR_ADDRESS_INVALID;
+ }
}
- LogRead(result, buf->data());
+ LogRead(result, buf->data(), address_to_log);
return result;
}
} else {
int os_error = WSAGetLastError();
if (os_error != WSA_IO_PENDING) {
int result = MapSystemError(os_error);
- LogRead(result, NULL);
+ LogRead(result, NULL, NULL);
return result;
}
}
@@ -677,8 +772,9 @@ int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
return ERR_IO_PENDING;
}
-int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
- const IPEndPoint* address) {
+int UDPSocketWin::InternalSendToOverlapped(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
DCHECK(!core_->write_iobuffer_.get());
SockaddrStorage storage;
struct sockaddr* addr = storage.addr;
@@ -723,6 +819,78 @@ int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
return ERR_IO_PENDING;
}
+int UDPSocketWin::InternalRecvFromNonBlocking(IOBuffer* buf,
+ int buf_len,
+ IPEndPoint* address) {
+ DCHECK(!read_iobuffer_ || read_iobuffer_.get() == buf);
+ SockaddrStorage storage;
+ storage.addr_len = sizeof(storage.addr_storage);
+
+ CHECK_NE(INVALID_SOCKET, socket_);
+ int rv = recvfrom(socket_, buf->data(), buf_len, 0, storage.addr,
+ &storage.addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ read_iobuffer_ = buf;
+ read_iobuffer_len_ = buf_len;
+ WatchForReadWrite();
+ return ERR_IO_PENDING;
+ }
+ rv = MapSystemError(os_error);
+ LogRead(rv, NULL, NULL);
+ return rv;
+ }
+ IPEndPoint address_storage;
+ IPEndPoint* address_to_log = NULL;
+ if (rv >= 0) {
+ if (address_storage.FromSockAddr(storage.addr, storage.addr_len)) {
+ if (address)
+ *address = address_storage;
+ address_to_log = &address_storage;
+ } else {
+ rv = ERR_ADDRESS_INVALID;
+ }
+ }
+ LogRead(rv, buf->data(), address_to_log);
+ return rv;
+}
+
+int UDPSocketWin::InternalSendToNonBlocking(IOBuffer* buf,
+ int buf_len,
+ const IPEndPoint* address) {
+ DCHECK(!write_iobuffer_ || write_iobuffer_.get() == buf);
+ SockaddrStorage storage;
+ struct sockaddr* addr = storage.addr;
+ // Convert address.
+ if (address) {
+ if (!address->ToSockAddr(addr, &storage.addr_len)) {
+ int result = ERR_ADDRESS_INVALID;
+ LogWrite(result, NULL, NULL);
+ return result;
+ }
+ } else {
+ addr = NULL;
+ storage.addr_len = 0;
+ }
+
+ int rv = sendto(socket_, buf->data(), buf_len, 0, addr, storage.addr_len);
+ if (rv == SOCKET_ERROR) {
+ int os_error = WSAGetLastError();
+ if (os_error == WSAEWOULDBLOCK) {
+ write_iobuffer_ = buf;
+ write_iobuffer_len_ = buf_len;
+ WatchForReadWrite();
+ return ERR_IO_PENDING;
+ }
+ rv = MapSystemError(os_error);
+ LogWrite(rv, NULL, NULL);
+ return rv;
+ }
+ LogWrite(rv, buf->data(), address);
+ return rv;
+}
+
int UDPSocketWin::SetMulticastOptions() {
if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
DWORD loop = 0;
@@ -807,11 +975,6 @@ int UDPSocketWin::RandomBind(const IPAddressNumber& address) {
return DoBind(IPEndPoint(address, 0));
}
-bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const {
- SockaddrStorage& storage = core_->recv_addr_storage_;
- return address->FromSockAddr(storage.addr, storage.addr_len);
-}
-
int UDPSocketWin::JoinGroup(
const IPAddressNumber& group_address) const {
DCHECK(CalledOnValidThread());
@@ -1016,4 +1179,9 @@ void UDPSocketWin::DetachFromThread() {
base::NonThreadSafe::DetachFromThread();
}
+void UDPSocketWin::UseNonBlockingIO() {
+ DCHECK(!core_);
+ use_non_blocking_io_ = true;
+}
+
} // namespace net
« no previous file with comments | « net/udp/udp_socket_win.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698