Index: runtime/bin/eventhandler_macos.cc |
diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc |
index 1dff244c21b4018c1b2ff691c8e908b23251663b..14d67fceeccba0b3c0ac67e152086d9f213e52ba 100644 |
--- a/runtime/bin/eventhandler_macos.cc |
+++ b/runtime/bin/eventhandler_macos.cc |
@@ -5,10 +5,10 @@ |
#include "bin/eventhandler.h" |
#include <errno.h> |
-#include <poll.h> |
#include <pthread.h> |
#include <stdio.h> |
#include <string.h> |
+#include <sys/event.h> |
#include <sys/time.h> |
#include <unistd.h> |
@@ -33,21 +33,82 @@ static const int kInfinityTimeout = -1; |
static const int kTimerId = -1; |
-intptr_t SocketData::GetPollEvents() { |
- // Do not ask for POLLERR and POLLHUP explicitly as they are |
- // triggered anyway. |
- intptr_t events = 0; |
- if (!IsClosedRead()) { |
- if ((mask_ & (1 << kInEvent)) != 0) { |
- events |= POLLIN; |
+bool SocketData::HasReadEvent() { |
+ return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0); |
+} |
+ |
+ |
+bool SocketData::HasWriteEvent() { |
+ return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0); |
+} |
+ |
+ |
+// Unregister the file descriptor for a SocketData structure with kqueue. |
+static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
+ static const intptr_t kMaxChanges = 2; |
+ intptr_t changes = 0; |
+ struct kevent events[kMaxChanges]; |
+ if (sd->read_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
+ ++changes; |
+ sd->set_read_tracked_by_kqueue(false); |
+ } |
+ if (sd->write_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd); |
+ ++changes; |
+ sd->set_write_tracked_by_kqueue(false); |
+ } |
+ if (changes > 0) { |
+ ASSERT(changes < kMaxChanges); |
+ int status = |
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
+ if (status == -1) { |
+ FATAL("Failed deleting events from kqueue"); |
} |
} |
- if (!IsClosedWrite()) { |
- if ((mask_ & (1 << kOutEvent)) != 0) { |
- events |= POLLOUT; |
+} |
+ |
+ |
+// Update the kqueue registration for SocketData structure to reflect |
+// the events currently of interest. |
+static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
+ static const intptr_t kMaxChanges = 2; |
+ intptr_t changes = 0; |
+ struct kevent events[kMaxChanges]; |
+ if (sd->port() != 0) { |
+ // Register or unregister READ filter if needed. |
+ if (sd->HasReadEvent()) { |
+ if (!sd->read_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd); |
+ ++changes; |
+ sd->set_read_tracked_by_kqueue(true); |
+ } |
+ } else if (sd->read_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
+ ++changes; |
+ sd->set_read_tracked_by_kqueue(false); |
+ } |
+ // Register or unregister WRITE filter if needed. |
+ if (sd->HasWriteEvent()) { |
+ if (!sd->write_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd); |
+ ++changes; |
+ sd->set_write_tracked_by_kqueue(true); |
+ } |
+ } else if (sd->write_tracked_by_kqueue()) { |
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
+ ++changes; |
+ sd->set_write_tracked_by_kqueue(false); |
+ } |
+ } |
+ if (changes > 0) { |
+ ASSERT(changes < kMaxChanges); |
+ int status = |
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
+ if (status == -1) { |
+ FATAL("Failed updating kqueue"); |
} |
} |
- return events; |
} |
@@ -61,6 +122,18 @@ EventHandlerImplementation::EventHandlerImplementation() |
FDUtils::SetNonBlocking(interrupt_fds_[0]); |
timeout_ = kInfinityTimeout; |
timeout_port_ = 0; |
+ |
+ kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue()); |
+ if (kqueue_fd_ == -1) { |
+ FATAL("Failed creating kqueue"); |
+ } |
+ // Register the interrupt_fd with the kqueue. |
+ struct kevent event; |
+ EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL); |
+ int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL)); |
+ if (status == -1) { |
+ FATAL("Failed adding interrupt fd to kqueue"); |
+ } |
} |
@@ -77,8 +150,8 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
ASSERT(entry != NULL); |
SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
if (sd == NULL) { |
- // If there is no data in the hash map for this file descriptor |
- // then this is inserting a new SocketData for the file descriptor. |
+ // If there is no data in the hash map for this file descriptor a |
+ // new SocketData for the file descriptor is inserted. |
sd = new SocketData(fd); |
entry->value = sd; |
} |
@@ -105,43 +178,6 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id, |
} |
-struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
- struct pollfd* pollfds; |
- |
- // Calculate the number of file descriptors to poll on. |
- intptr_t numPollfds = 1; |
- for (HashMap::Entry* entry = socket_map_.Start(); |
- entry != NULL; |
- entry = socket_map_.Next(entry)) { |
- SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
- if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
- } |
- |
- pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
- numPollfds)); |
- pollfds[0].fd = interrupt_fds_[0]; |
- pollfds[0].events |= POLLIN; |
- |
- int i = 1; |
- for (HashMap::Entry* entry = socket_map_.Start(); |
- entry != NULL; |
- entry = socket_map_.Next(entry)) { |
- SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
- intptr_t events = sd->GetPollEvents(); |
- if (sd->port() > 0 && events != 0) { |
- // Fd is added to the poll set. |
- pollfds[i].fd = sd->fd(); |
- pollfds[i].events = events; |
- i++; |
- } |
- } |
- ASSERT(numPollfds == i); |
- *pollfds_size = i; |
- |
- return pollfds; |
-} |
- |
- |
bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
int total_read = 0; |
int bytes_read = |
@@ -173,13 +209,16 @@ void EventHandlerImplementation::HandleInterruptFd() { |
ASSERT(msg.data == (1 << kShutdownReadCommand)); |
// Close the socket for reading. |
sd->ShutdownRead(); |
+ UpdateKqueue(kqueue_fd_, sd); |
} else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
// Close the socket for writing. |
sd->ShutdownWrite(); |
+ UpdateKqueue(kqueue_fd_, sd); |
} else if ((msg.data & (1 << kCloseCommand)) != 0) { |
ASSERT(msg.data == (1 << kCloseCommand)); |
// Close the socket and free system resources. |
+ RemoveFromKqueue(kqueue_fd_, sd); |
intptr_t fd = sd->fd(); |
sd->Close(); |
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
@@ -187,100 +226,64 @@ void EventHandlerImplementation::HandleInterruptFd() { |
} else { |
// Setup events to wait for. |
sd->SetPortAndMask(msg.dart_port, msg.data); |
+ UpdateKqueue(kqueue_fd_, sd); |
} |
} |
} |
} |
-#ifdef DEBUG_POLL |
-static void PrintEventMask(struct pollfd* pollfd) { |
- printf("%d ", pollfd->fd); |
- if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); |
- if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); |
- if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); |
- if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); |
- if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); |
- if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); |
- int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL; |
- if ((pollfd->revents & ~all_events) != 0) { |
- printf("(and %08x) ", pollfd->revents & ~all_events); |
- } |
- printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); |
+#ifdef DEBUG_KQUEUE |
+static void PrintEventMask(intptr_t fd, struct kevent* event) { |
+ printf("%d ", fd); |
+ if (event->filter == EVFILT_READ) printf("EVFILT_READ "); |
+ if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE "); |
+ printf("flags: %x: ", event->flags); |
+ if ((event->flags & EV_EOF) != 0) printf("EV_EOF "); |
+ if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR "); |
+ printf("(available %d) ", FDUtils::AvailableBytes(fd)); |
printf("\n"); |
} |
#endif |
-intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
-#ifdef DEBUG_POLL |
- if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); |
+ |
+intptr_t EventHandlerImplementation::GetEvents(struct kevent* event, |
+ SocketData* sd) { |
+#ifdef DEBUG_KQUEUE |
+ PrintEventMask(sd->fd(), event); |
#endif |
intptr_t event_mask = 0; |
- SocketData* sd = GetSocketData(pollfd->fd); |
if (sd->IsListeningSocket()) { |
- // For listening sockets the POLLIN event indicate that there are |
- // connections ready for accept unless accompanied with one of the |
- // other flags. |
- if ((pollfd->revents & POLLIN) != 0) { |
- if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); |
- if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); |
+ // On a listening socket the READ event means that there are |
+ // connections ready to be accepted. |
+ if (event->filter == EVFILT_READ) { |
+ if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent); |
+ if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent); |
if (event_mask == 0) event_mask |= (1 << kInEvent); |
} |
} else { |
- if ((pollfd->revents & POLLNVAL) != 0) { |
- return 0; |
- } |
- |
// Prioritize data events over close and error events. |
- if ((pollfd->revents & POLLIN) != 0) { |
- if (FDUtils::AvailableBytes(pollfd->fd) != 0) { |
- event_mask = (1 << kInEvent); |
- } else if (((pollfd->revents & POLLHUP) != 0)) { |
+ if (event->filter == EVFILT_READ) { |
+ if (FDUtils::AvailableBytes(sd->fd()) != 0) { |
+ event_mask = (1 << kInEvent); |
+ } else if ((event->flags & EV_EOF) != 0) { |
event_mask = (1 << kCloseEvent); |
sd->MarkClosedRead(); |
- } else if ((pollfd->revents & POLLERR) != 0) { |
+ } else if ((event->flags & EV_ERROR) != 0) { |
event_mask = (1 << kErrorEvent); |
- } else { |
- if (sd->IsPipe()) { |
- // When reading from stdin (either from a terminal or piped |
- // input) treat POLLIN with 0 available bytes as |
- // end-of-file. |
- if (sd->fd() == STDIN_FILENO) { |
- event_mask = (1 << kCloseEvent); |
- sd->MarkClosedRead(); |
- } |
- } else { |
- // If POLLIN is set with no available data and no POLLHUP use |
- // recv to peek for whether the other end of the socket |
- // actually closed. |
- char buffer; |
- ssize_t bytesPeeked = |
- TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK)); |
- ASSERT(EAGAIN == EWOULDBLOCK); |
- if (bytesPeeked == 0) { |
- event_mask = (1 << kCloseEvent); |
- sd->MarkClosedRead(); |
- } else if (errno != EWOULDBLOCK) { |
- fprintf(stderr, "Error recv: %s\n", strerror(errno)); |
- } |
- } |
- } |
- } |
- |
- // On pipes POLLHUP is reported without POLLIN when there is no |
- // more data to read. |
- if (sd->IsPipe()) { |
- if (((pollfd->revents & POLLIN) == 0) && |
- ((pollfd->revents & POLLHUP) != 0)) { |
- event_mask = (1 << kCloseEvent); |
- sd->MarkClosedRead(); |
} |
} |
- if ((pollfd->revents & POLLOUT) != 0) { |
- if ((pollfd->revents & POLLERR) != 0) { |
+ if (event->filter == EVFILT_WRITE) { |
+ if ((event->flags & EV_ERROR) != 0) { |
event_mask = (1 << kErrorEvent); |
sd->MarkClosedWrite(); |
+ } else if ((event->flags & EV_EOF) != 0) { |
+ // If the receiver closed for reading, close for writing, |
+ // update the registration with kqueue, and do not report a |
+ // write event. |
+ sd->MarkClosedWrite(); |
+ UpdateKqueue(kqueue_fd_, sd); |
} else { |
event_mask |= (1 << kOutEvent); |
} |
@@ -291,25 +294,19 @@ intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { |
} |
-void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, |
- int pollfds_size, |
- int result_size) { |
- if ((pollfds[0].revents & POLLIN) != 0) { |
- result_size -= 1; |
- } |
- if (result_size > 0) { |
- for (int i = 1; i < pollfds_size; i++) { |
- /* |
- * The fd is unregistered. It gets re-registered when the request |
- * was handled by dart. |
- */ |
- intptr_t event_mask = GetPollEvents(&pollfds[i]); |
+void EventHandlerImplementation::HandleEvents(struct kevent* events, |
+ int size) { |
+ for (int i = 0; i < size; i++) { |
+ if (events[i].udata != NULL) { |
+ SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); |
+ intptr_t event_mask = GetEvents(events + i, sd); |
if (event_mask != 0) { |
- intptr_t fd = pollfds[i].fd; |
- SocketData* sd = GetSocketData(fd); |
+ // Unregister events for the file descriptor. Events will be |
+ // registered again when the current event has been handled in |
+ // Dart code. |
+ RemoveFromKqueue(kqueue_fd_, sd); |
Dart_Port port = sd->port(); |
ASSERT(port != 0); |
- sd->Unregister(); |
DartUtils::PostInt32(port, event_mask); |
} |
} |
@@ -339,33 +336,44 @@ void EventHandlerImplementation::HandleTimeout() { |
} |
-void EventHandlerImplementation::Poll(uword args) { |
- intptr_t pollfds_size; |
- struct pollfd* pollfds; |
+void EventHandlerImplementation::EventHandlerEntry(uword args) { |
+ static const intptr_t kMaxEvents = 16; |
+ struct kevent events[kMaxEvents]; |
EventHandlerImplementation* handler = |
reinterpret_cast<EventHandlerImplementation*>(args); |
ASSERT(handler != NULL); |
while (1) { |
- pollfds = handler->GetPollFds(&pollfds_size); |
intptr_t millis = handler->GetTimeout(); |
- intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); |
- ASSERT(EAGAIN == EWOULDBLOCK); |
+ struct timespec ts; |
+ int64_t secs = 0; |
+ int64_t nanos = 0; |
+ if (millis > 0) { |
+ secs = millis / 1000; |
+ nanos = (millis - (secs * 1000)) * 1000000; |
+ } |
+ ts.tv_sec = secs; |
+ ts.tv_nsec = nanos; |
+ intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_, |
+ NULL, |
+ 0, |
+ events, |
+ kMaxEvents, |
+ &ts)); |
if (result == -1) { |
- if (errno != EWOULDBLOCK) { |
- perror("Poll failed"); |
- } |
+ perror("kevent failed"); |
+ FATAL("kevent failed\n"); |
} else { |
handler->HandleTimeout(); |
- handler->HandleEvents(pollfds, pollfds_size, result); |
+ handler->HandleEvents(events, result); |
} |
- free(pollfds); |
} |
} |
void EventHandlerImplementation::StartEventHandler() { |
- int result = dart::Thread::Start(&EventHandlerImplementation::Poll, |
- reinterpret_cast<uword>(this)); |
+ int result = |
+ dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry, |
+ reinterpret_cast<uword>(this)); |
if (result != 0) { |
FATAL1("Failed to start event handler thread %d", result); |
} |