Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(418)

Unified Diff: runtime/bin/eventhandler_macos.cc

Issue 9365010: Move the event handler on macos from poll to kqueue. Simpler and faster. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: runtime/bin/eventhandler_macos.cc
diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc
index 1dff244c21b4018c1b2ff691c8e908b23251663b..14d67fceeccba0b3c0ac67e152086d9f213e52ba 100644
--- a/runtime/bin/eventhandler_macos.cc
+++ b/runtime/bin/eventhandler_macos.cc
@@ -5,10 +5,10 @@
#include "bin/eventhandler.h"
#include <errno.h>
-#include <poll.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
+#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
@@ -33,21 +33,82 @@ static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
-intptr_t SocketData::GetPollEvents() {
- // Do not ask for POLLERR and POLLHUP explicitly as they are
- // triggered anyway.
- intptr_t events = 0;
- if (!IsClosedRead()) {
- if ((mask_ & (1 << kInEvent)) != 0) {
- events |= POLLIN;
+bool SocketData::HasReadEvent() {
+ return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0);
+}
+
+
+bool SocketData::HasWriteEvent() {
+ return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0);
+}
+
+
+// Unregister the file descriptor for a SocketData structure with kqueue.
+static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
+ static const intptr_t kMaxChanges = 2;
+ intptr_t changes = 0;
+ struct kevent events[kMaxChanges];
+ if (sd->read_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ ++changes;
+ sd->set_read_tracked_by_kqueue(false);
+ }
+ if (sd->write_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd);
+ ++changes;
+ sd->set_write_tracked_by_kqueue(false);
+ }
+ if (changes > 0) {
+ ASSERT(changes < kMaxChanges);
+ int status =
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
+ if (status == -1) {
+ FATAL("Failed deleting events from kqueue");
}
}
- if (!IsClosedWrite()) {
- if ((mask_ & (1 << kOutEvent)) != 0) {
- events |= POLLOUT;
+}
+
+
+// Update the kqueue registration for SocketData structure to reflect
+// the events currently of interest.
+static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) {
+ static const intptr_t kMaxChanges = 2;
+ intptr_t changes = 0;
+ struct kevent events[kMaxChanges];
+ if (sd->port() != 0) {
+ // Register or unregister READ filter if needed.
+ if (sd->HasReadEvent()) {
+ if (!sd->read_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd);
+ ++changes;
+ sd->set_read_tracked_by_kqueue(true);
+ }
+ } else if (sd->read_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ ++changes;
+ sd->set_read_tracked_by_kqueue(false);
+ }
+ // Register or unregister WRITE filter if needed.
+ if (sd->HasWriteEvent()) {
+ if (!sd->write_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd);
+ ++changes;
+ sd->set_write_tracked_by_kqueue(true);
+ }
+ } else if (sd->write_tracked_by_kqueue()) {
+ EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ ++changes;
+ sd->set_write_tracked_by_kqueue(false);
+ }
+ }
+ if (changes > 0) {
+ ASSERT(changes < kMaxChanges);
+ int status =
+ TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
+ if (status == -1) {
+ FATAL("Failed updating kqueue");
}
}
- return events;
}
@@ -61,6 +122,18 @@ EventHandlerImplementation::EventHandlerImplementation()
FDUtils::SetNonBlocking(interrupt_fds_[0]);
timeout_ = kInfinityTimeout;
timeout_port_ = 0;
+
+ kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue());
+ if (kqueue_fd_ == -1) {
+ FATAL("Failed creating kqueue");
+ }
+ // Register the interrupt_fd with the kqueue.
+ struct kevent event;
+ EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL);
+ int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL));
+ if (status == -1) {
+ FATAL("Failed adding interrupt fd to kqueue");
+ }
}
@@ -77,8 +150,8 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(entry != NULL);
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
- // If there is no data in the hash map for this file descriptor
- // then this is inserting a new SocketData for the file descriptor.
+ // If there is no data in the hash map for this file descriptor a
+ // new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
@@ -105,43 +178,6 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id,
}
-struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
- struct pollfd* pollfds;
-
- // Calculate the number of file descriptors to poll on.
- intptr_t numPollfds = 1;
- for (HashMap::Entry* entry = socket_map_.Start();
- entry != NULL;
- entry = socket_map_.Next(entry)) {
- SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
- if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
- }
-
- pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
- numPollfds));
- pollfds[0].fd = interrupt_fds_[0];
- pollfds[0].events |= POLLIN;
-
- int i = 1;
- for (HashMap::Entry* entry = socket_map_.Start();
- entry != NULL;
- entry = socket_map_.Next(entry)) {
- SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
- intptr_t events = sd->GetPollEvents();
- if (sd->port() > 0 && events != 0) {
- // Fd is added to the poll set.
- pollfds[i].fd = sd->fd();
- pollfds[i].events = events;
- i++;
- }
- }
- ASSERT(numPollfds == i);
- *pollfds_size = i;
-
- return pollfds;
-}
-
-
bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
int total_read = 0;
int bytes_read =
@@ -173,13 +209,16 @@ void EventHandlerImplementation::HandleInterruptFd() {
ASSERT(msg.data == (1 << kShutdownReadCommand));
// Close the socket for reading.
sd->ShutdownRead();
+ UpdateKqueue(kqueue_fd_, sd);
} else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg.data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
sd->ShutdownWrite();
+ UpdateKqueue(kqueue_fd_, sd);
} else if ((msg.data & (1 << kCloseCommand)) != 0) {
ASSERT(msg.data == (1 << kCloseCommand));
// Close the socket and free system resources.
+ RemoveFromKqueue(kqueue_fd_, sd);
intptr_t fd = sd->fd();
sd->Close();
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
@@ -187,100 +226,64 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else {
// Setup events to wait for.
sd->SetPortAndMask(msg.dart_port, msg.data);
+ UpdateKqueue(kqueue_fd_, sd);
}
}
}
}
-#ifdef DEBUG_POLL
-static void PrintEventMask(struct pollfd* pollfd) {
- printf("%d ", pollfd->fd);
- if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
- if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
- if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
- if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
- if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
- if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
- int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL;
- if ((pollfd->revents & ~all_events) != 0) {
- printf("(and %08x) ", pollfd->revents & ~all_events);
- }
- printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
+#ifdef DEBUG_KQUEUE
+static void PrintEventMask(intptr_t fd, struct kevent* event) {
+ printf("%d ", fd);
+ if (event->filter == EVFILT_READ) printf("EVFILT_READ ");
+ if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE ");
+ printf("flags: %x: ", event->flags);
+ if ((event->flags & EV_EOF) != 0) printf("EV_EOF ");
+ if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR ");
+ printf("(available %d) ", FDUtils::AvailableBytes(fd));
printf("\n");
}
#endif
-intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
-#ifdef DEBUG_POLL
- if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd);
+
+intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
+ SocketData* sd) {
+#ifdef DEBUG_KQUEUE
+ PrintEventMask(sd->fd(), event);
#endif
intptr_t event_mask = 0;
- SocketData* sd = GetSocketData(pollfd->fd);
if (sd->IsListeningSocket()) {
- // For listening sockets the POLLIN event indicate that there are
- // connections ready for accept unless accompanied with one of the
- // other flags.
- if ((pollfd->revents & POLLIN) != 0) {
- if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent);
- if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
+ // On a listening socket the READ event means that there are
+ // connections ready to be accepted.
+ if (event->filter == EVFILT_READ) {
+ if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent);
+ if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent);
if (event_mask == 0) event_mask |= (1 << kInEvent);
}
} else {
- if ((pollfd->revents & POLLNVAL) != 0) {
- return 0;
- }
-
// Prioritize data events over close and error events.
- if ((pollfd->revents & POLLIN) != 0) {
- if (FDUtils::AvailableBytes(pollfd->fd) != 0) {
- event_mask = (1 << kInEvent);
- } else if (((pollfd->revents & POLLHUP) != 0)) {
+ if (event->filter == EVFILT_READ) {
+ if (FDUtils::AvailableBytes(sd->fd()) != 0) {
+ event_mask = (1 << kInEvent);
+ } else if ((event->flags & EV_EOF) != 0) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
- } else if ((pollfd->revents & POLLERR) != 0) {
+ } else if ((event->flags & EV_ERROR) != 0) {
event_mask = (1 << kErrorEvent);
- } else {
- if (sd->IsPipe()) {
- // When reading from stdin (either from a terminal or piped
- // input) treat POLLIN with 0 available bytes as
- // end-of-file.
- if (sd->fd() == STDIN_FILENO) {
- event_mask = (1 << kCloseEvent);
- sd->MarkClosedRead();
- }
- } else {
- // If POLLIN is set with no available data and no POLLHUP use
- // recv to peek for whether the other end of the socket
- // actually closed.
- char buffer;
- ssize_t bytesPeeked =
- TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
- ASSERT(EAGAIN == EWOULDBLOCK);
- if (bytesPeeked == 0) {
- event_mask = (1 << kCloseEvent);
- sd->MarkClosedRead();
- } else if (errno != EWOULDBLOCK) {
- fprintf(stderr, "Error recv: %s\n", strerror(errno));
- }
- }
- }
- }
-
- // On pipes POLLHUP is reported without POLLIN when there is no
- // more data to read.
- if (sd->IsPipe()) {
- if (((pollfd->revents & POLLIN) == 0) &&
- ((pollfd->revents & POLLHUP) != 0)) {
- event_mask = (1 << kCloseEvent);
- sd->MarkClosedRead();
}
}
- if ((pollfd->revents & POLLOUT) != 0) {
- if ((pollfd->revents & POLLERR) != 0) {
+ if (event->filter == EVFILT_WRITE) {
+ if ((event->flags & EV_ERROR) != 0) {
event_mask = (1 << kErrorEvent);
sd->MarkClosedWrite();
+ } else if ((event->flags & EV_EOF) != 0) {
+ // If the receiver closed for reading, close for writing,
+ // update the registration with kqueue, and do not report a
+ // write event.
+ sd->MarkClosedWrite();
+ UpdateKqueue(kqueue_fd_, sd);
} else {
event_mask |= (1 << kOutEvent);
}
@@ -291,25 +294,19 @@ intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
}
-void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds,
- int pollfds_size,
- int result_size) {
- if ((pollfds[0].revents & POLLIN) != 0) {
- result_size -= 1;
- }
- if (result_size > 0) {
- for (int i = 1; i < pollfds_size; i++) {
- /*
- * The fd is unregistered. It gets re-registered when the request
- * was handled by dart.
- */
- intptr_t event_mask = GetPollEvents(&pollfds[i]);
+void EventHandlerImplementation::HandleEvents(struct kevent* events,
+ int size) {
+ for (int i = 0; i < size; i++) {
+ if (events[i].udata != NULL) {
+ SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
+ intptr_t event_mask = GetEvents(events + i, sd);
if (event_mask != 0) {
- intptr_t fd = pollfds[i].fd;
- SocketData* sd = GetSocketData(fd);
+ // Unregister events for the file descriptor. Events will be
+ // registered again when the current event has been handled in
+ // Dart code.
+ RemoveFromKqueue(kqueue_fd_, sd);
Dart_Port port = sd->port();
ASSERT(port != 0);
- sd->Unregister();
DartUtils::PostInt32(port, event_mask);
}
}
@@ -339,33 +336,44 @@ void EventHandlerImplementation::HandleTimeout() {
}
-void EventHandlerImplementation::Poll(uword args) {
- intptr_t pollfds_size;
- struct pollfd* pollfds;
+void EventHandlerImplementation::EventHandlerEntry(uword args) {
+ static const intptr_t kMaxEvents = 16;
+ struct kevent events[kMaxEvents];
EventHandlerImplementation* handler =
reinterpret_cast<EventHandlerImplementation*>(args);
ASSERT(handler != NULL);
while (1) {
- pollfds = handler->GetPollFds(&pollfds_size);
intptr_t millis = handler->GetTimeout();
- intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis));
- ASSERT(EAGAIN == EWOULDBLOCK);
+ struct timespec ts;
+ int64_t secs = 0;
+ int64_t nanos = 0;
+ if (millis > 0) {
+ secs = millis / 1000;
+ nanos = (millis - (secs * 1000)) * 1000000;
+ }
+ ts.tv_sec = secs;
+ ts.tv_nsec = nanos;
+ intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_,
+ NULL,
+ 0,
+ events,
+ kMaxEvents,
+ &ts));
if (result == -1) {
- if (errno != EWOULDBLOCK) {
- perror("Poll failed");
- }
+ perror("kevent failed");
+ FATAL("kevent failed\n");
} else {
handler->HandleTimeout();
- handler->HandleEvents(pollfds, pollfds_size, result);
+ handler->HandleEvents(events, result);
}
- free(pollfds);
}
}
void EventHandlerImplementation::StartEventHandler() {
- int result = dart::Thread::Start(&EventHandlerImplementation::Poll,
- reinterpret_cast<uword>(this));
+ int result =
+ dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry,
+ reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698