| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include "bin/eventhandler.h" | 5 #include "bin/eventhandler.h" |
| 6 | 6 |
| 7 #include <errno.h> | 7 #include <errno.h> |
| 8 #include <poll.h> | |
| 9 #include <pthread.h> | 8 #include <pthread.h> |
| 10 #include <stdio.h> | 9 #include <stdio.h> |
| 11 #include <string.h> | 10 #include <string.h> |
| 11 #include <sys/event.h> |
| 12 #include <sys/time.h> | 12 #include <sys/time.h> |
| 13 #include <unistd.h> | 13 #include <unistd.h> |
| 14 | 14 |
| 15 #include "bin/dartutils.h" | 15 #include "bin/dartutils.h" |
| 16 #include "bin/fdutils.h" | 16 #include "bin/fdutils.h" |
| 17 #include "bin/hashmap.h" | 17 #include "bin/hashmap.h" |
| 18 #include "platform/utils.h" | 18 #include "platform/utils.h" |
| 19 | 19 |
| 20 | 20 |
| 21 int64_t GetCurrentTimeMilliseconds() { | 21 int64_t GetCurrentTimeMilliseconds() { |
| 22 struct timeval tv; | 22 struct timeval tv; |
| 23 if (gettimeofday(&tv, NULL) < 0) { | 23 if (gettimeofday(&tv, NULL) < 0) { |
| 24 UNREACHABLE(); | 24 UNREACHABLE(); |
| 25 return 0; | 25 return 0; |
| 26 } | 26 } |
| 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
| 28 } | 28 } |
| 29 | 29 |
| 30 | 30 |
| 31 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 31 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
| 32 static const int kInfinityTimeout = -1; | 32 static const int kInfinityTimeout = -1; |
| 33 static const int kTimerId = -1; | 33 static const int kTimerId = -1; |
| 34 | 34 |
| 35 | 35 |
| 36 intptr_t SocketData::GetPollEvents() { | 36 bool SocketData::HasReadEvent() { |
| 37 // Do not ask for POLLERR and POLLHUP explicitly as they are | 37 return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0); |
| 38 // triggered anyway. | |
| 39 intptr_t events = 0; | |
| 40 if (!IsClosedRead()) { | |
| 41 if ((mask_ & (1 << kInEvent)) != 0) { | |
| 42 events |= POLLIN; | |
| 43 } | |
| 44 } | |
| 45 if (!IsClosedWrite()) { | |
| 46 if ((mask_ & (1 << kOutEvent)) != 0) { | |
| 47 events |= POLLOUT; | |
| 48 } | |
| 49 } | |
| 50 return events; | |
| 51 } | 38 } |
| 52 | 39 |
| 53 | 40 |
| 41 bool SocketData::HasWriteEvent() { |
| 42 return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0); |
| 43 } |
| 44 |
| 45 |
| 46 // Unregister the file descriptor for a SocketData structure with kqueue. |
| 47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
| 48 static const intptr_t kMaxChanges = 2; |
| 49 intptr_t changes = 0; |
| 50 struct kevent events[kMaxChanges]; |
| 51 if (sd->read_tracked_by_kqueue()) { |
| 52 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
| 53 ++changes; |
| 54 sd->set_read_tracked_by_kqueue(false); |
| 55 } |
| 56 if (sd->write_tracked_by_kqueue()) { |
| 57 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd); |
| 58 ++changes; |
| 59 sd->set_write_tracked_by_kqueue(false); |
| 60 } |
| 61 if (changes > 0) { |
| 62 ASSERT(changes < kMaxChanges); |
| 63 int status = |
| 64 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
| 65 if (status == -1) { |
| 66 FATAL("Failed deleting events from kqueue"); |
| 67 } |
| 68 } |
| 69 } |
| 70 |
| 71 |
| 72 // Update the kqueue registration for SocketData structure to reflect |
| 73 // the events currently of interest. |
| 74 static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
| 75 static const intptr_t kMaxChanges = 2; |
| 76 intptr_t changes = 0; |
| 77 struct kevent events[kMaxChanges]; |
| 78 if (sd->port() != 0) { |
| 79 // Register or unregister READ filter if needed. |
| 80 if (sd->HasReadEvent()) { |
| 81 if (!sd->read_tracked_by_kqueue()) { |
| 82 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd); |
| 83 ++changes; |
| 84 sd->set_read_tracked_by_kqueue(true); |
| 85 } |
| 86 } else if (sd->read_tracked_by_kqueue()) { |
| 87 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
| 88 ++changes; |
| 89 sd->set_read_tracked_by_kqueue(false); |
| 90 } |
| 91 // Register or unregister WRITE filter if needed. |
| 92 if (sd->HasWriteEvent()) { |
| 93 if (!sd->write_tracked_by_kqueue()) { |
| 94 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd); |
| 95 ++changes; |
| 96 sd->set_write_tracked_by_kqueue(true); |
| 97 } |
| 98 } else if (sd->write_tracked_by_kqueue()) { |
| 99 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
| 100 ++changes; |
| 101 sd->set_write_tracked_by_kqueue(false); |
| 102 } |
| 103 } |
| 104 if (changes > 0) { |
| 105 ASSERT(changes < kMaxChanges); |
| 106 int status = |
| 107 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
| 108 if (status == -1) { |
| 109 FATAL("Failed updating kqueue"); |
| 110 } |
| 111 } |
| 112 } |
| 113 |
| 114 |
| 54 EventHandlerImplementation::EventHandlerImplementation() | 115 EventHandlerImplementation::EventHandlerImplementation() |
| 55 : socket_map_(&HashMap::SamePointerValue, 16) { | 116 : socket_map_(&HashMap::SamePointerValue, 16) { |
| 56 intptr_t result; | 117 intptr_t result; |
| 57 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 118 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
| 58 if (result != 0) { | 119 if (result != 0) { |
| 59 FATAL("Pipe creation failed"); | 120 FATAL("Pipe creation failed"); |
| 60 } | 121 } |
| 61 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 122 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
| 62 timeout_ = kInfinityTimeout; | 123 timeout_ = kInfinityTimeout; |
| 63 timeout_port_ = 0; | 124 timeout_port_ = 0; |
| 125 |
| 126 kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue()); |
| 127 if (kqueue_fd_ == -1) { |
| 128 FATAL("Failed creating kqueue"); |
| 129 } |
| 130 // Register the interrupt_fd with the kqueue. |
| 131 struct kevent event; |
| 132 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL); |
| 133 int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL)); |
| 134 if (status == -1) { |
| 135 FATAL("Failed adding interrupt fd to kqueue"); |
| 136 } |
| 64 } | 137 } |
| 65 | 138 |
| 66 | 139 |
| 67 EventHandlerImplementation::~EventHandlerImplementation() { | 140 EventHandlerImplementation::~EventHandlerImplementation() { |
| 68 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 141 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
| 69 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 142 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
| 70 } | 143 } |
| 71 | 144 |
| 72 | 145 |
| 73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 146 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
| 74 ASSERT(fd >= 0); | 147 ASSERT(fd >= 0); |
| 75 HashMap::Entry* entry = socket_map_.Lookup( | 148 HashMap::Entry* entry = socket_map_.Lookup( |
| 76 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); | 149 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
| 77 ASSERT(entry != NULL); | 150 ASSERT(entry != NULL); |
| 78 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | 151 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
| 79 if (sd == NULL) { | 152 if (sd == NULL) { |
| 80 // If there is no data in the hash map for this file descriptor | 153 // If there is no data in the hash map for this file descriptor a |
| 81 // then this is inserting a new SocketData for the file descriptor. | 154 // new SocketData for the file descriptor is inserted. |
| 82 sd = new SocketData(fd); | 155 sd = new SocketData(fd); |
| 83 entry->value = sd; | 156 entry->value = sd; |
| 84 } | 157 } |
| 85 ASSERT(fd == sd->fd()); | 158 ASSERT(fd == sd->fd()); |
| 86 return sd; | 159 return sd; |
| 87 } | 160 } |
| 88 | 161 |
| 89 | 162 |
| 90 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 163 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| 91 Dart_Port dart_port, | 164 Dart_Port dart_port, |
| 92 int64_t data) { | 165 int64_t data) { |
| 93 InterruptMessage msg; | 166 InterruptMessage msg; |
| 94 msg.id = id; | 167 msg.id = id; |
| 95 msg.dart_port = dart_port; | 168 msg.dart_port = dart_port; |
| 96 msg.data = data; | 169 msg.data = data; |
| 97 intptr_t result = | 170 intptr_t result = |
| 98 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 171 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| 99 if (result != kInterruptMessageSize) { | 172 if (result != kInterruptMessageSize) { |
| 100 if (result == -1) { | 173 if (result == -1) { |
| 101 perror("Interrupt message failure:"); | 174 perror("Interrupt message failure:"); |
| 102 } | 175 } |
| 103 FATAL1("Interrupt message failure. Wrote %d bytes.", result); | 176 FATAL1("Interrupt message failure. Wrote %d bytes.", result); |
| 104 } | 177 } |
| 105 } | 178 } |
| 106 | 179 |
| 107 | 180 |
| 108 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | |
| 109 struct pollfd* pollfds; | |
| 110 | |
| 111 // Calculate the number of file descriptors to poll on. | |
| 112 intptr_t numPollfds = 1; | |
| 113 for (HashMap::Entry* entry = socket_map_.Start(); | |
| 114 entry != NULL; | |
| 115 entry = socket_map_.Next(entry)) { | |
| 116 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
| 117 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | |
| 118 } | |
| 119 | |
| 120 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | |
| 121 numPollfds)); | |
| 122 pollfds[0].fd = interrupt_fds_[0]; | |
| 123 pollfds[0].events |= POLLIN; | |
| 124 | |
| 125 int i = 1; | |
| 126 for (HashMap::Entry* entry = socket_map_.Start(); | |
| 127 entry != NULL; | |
| 128 entry = socket_map_.Next(entry)) { | |
| 129 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
| 130 intptr_t events = sd->GetPollEvents(); | |
| 131 if (sd->port() > 0 && events != 0) { | |
| 132 // Fd is added to the poll set. | |
| 133 pollfds[i].fd = sd->fd(); | |
| 134 pollfds[i].events = events; | |
| 135 i++; | |
| 136 } | |
| 137 } | |
| 138 ASSERT(numPollfds == i); | |
| 139 *pollfds_size = i; | |
| 140 | |
| 141 return pollfds; | |
| 142 } | |
| 143 | |
| 144 | |
| 145 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 181 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
| 146 int total_read = 0; | 182 int total_read = 0; |
| 147 int bytes_read = | 183 int bytes_read = |
| 148 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 184 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
| 149 if (bytes_read < 0) { | 185 if (bytes_read < 0) { |
| 150 return false; | 186 return false; |
| 151 } | 187 } |
| 152 total_read = bytes_read; | 188 total_read = bytes_read; |
| 153 while (total_read < kInterruptMessageSize) { | 189 while (total_read < kInterruptMessageSize) { |
| 154 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], | 190 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], |
| (...skipping 11 matching lines...) Expand all Loading... |
| 166 while (GetInterruptMessage(&msg)) { | 202 while (GetInterruptMessage(&msg)) { |
| 167 if (msg.id == kTimerId) { | 203 if (msg.id == kTimerId) { |
| 168 timeout_ = msg.data; | 204 timeout_ = msg.data; |
| 169 timeout_port_ = msg.dart_port; | 205 timeout_port_ = msg.dart_port; |
| 170 } else { | 206 } else { |
| 171 SocketData* sd = GetSocketData(msg.id); | 207 SocketData* sd = GetSocketData(msg.id); |
| 172 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { | 208 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
| 173 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 209 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
| 174 // Close the socket for reading. | 210 // Close the socket for reading. |
| 175 sd->ShutdownRead(); | 211 sd->ShutdownRead(); |
| 212 UpdateKqueue(kqueue_fd_, sd); |
| 176 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 213 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
| 177 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 214 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
| 178 // Close the socket for writing. | 215 // Close the socket for writing. |
| 179 sd->ShutdownWrite(); | 216 sd->ShutdownWrite(); |
| 217 UpdateKqueue(kqueue_fd_, sd); |
| 180 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 218 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
| 181 ASSERT(msg.data == (1 << kCloseCommand)); | 219 ASSERT(msg.data == (1 << kCloseCommand)); |
| 182 // Close the socket and free system resources. | 220 // Close the socket and free system resources. |
| 221 RemoveFromKqueue(kqueue_fd_, sd); |
| 183 intptr_t fd = sd->fd(); | 222 intptr_t fd = sd->fd(); |
| 184 sd->Close(); | 223 sd->Close(); |
| 185 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | 224 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| 186 delete sd; | 225 delete sd; |
| 187 } else { | 226 } else { |
| 188 // Setup events to wait for. | 227 // Setup events to wait for. |
| 189 sd->SetPortAndMask(msg.dart_port, msg.data); | 228 sd->SetPortAndMask(msg.dart_port, msg.data); |
| 229 UpdateKqueue(kqueue_fd_, sd); |
| 190 } | 230 } |
| 191 } | 231 } |
| 192 } | 232 } |
| 193 } | 233 } |
| 194 | 234 |
| 195 #ifdef DEBUG_POLL | |
| 196 static void PrintEventMask(struct pollfd* pollfd) { | |
| 197 printf("%d ", pollfd->fd); | |
| 198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); | |
| 199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); | |
| 200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); | |
| 201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); | |
| 202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); | |
| 203 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); | |
| 204 int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL; | |
| 205 if ((pollfd->revents & ~all_events) != 0) { | |
| 206 printf("(and %08x) ", pollfd->revents & ~all_events); | |
| 207 } | |
| 208 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); | |
| 209 | 235 |
| 236 #ifdef DEBUG_KQUEUE |
| 237 static void PrintEventMask(intptr_t fd, struct kevent* event) { |
| 238 printf("%d ", fd); |
| 239 if (event->filter == EVFILT_READ) printf("EVFILT_READ "); |
| 240 if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE "); |
| 241 printf("flags: %x: ", event->flags); |
| 242 if ((event->flags & EV_EOF) != 0) printf("EV_EOF "); |
| 243 if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR "); |
| 244 printf("(available %d) ", FDUtils::AvailableBytes(fd)); |
| 210 printf("\n"); | 245 printf("\n"); |
| 211 } | 246 } |
| 212 #endif | 247 #endif |
| 213 | 248 |
| 214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 249 |
| 215 #ifdef DEBUG_POLL | 250 intptr_t EventHandlerImplementation::GetEvents(struct kevent* event, |
| 216 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); | 251 SocketData* sd) { |
| 252 #ifdef DEBUG_KQUEUE |
| 253 PrintEventMask(sd->fd(), event); |
| 217 #endif | 254 #endif |
| 218 intptr_t event_mask = 0; | 255 intptr_t event_mask = 0; |
| 219 SocketData* sd = GetSocketData(pollfd->fd); | |
| 220 if (sd->IsListeningSocket()) { | 256 if (sd->IsListeningSocket()) { |
| 221 // For listening sockets the POLLIN event indicate that there are | 257 // On a listening socket the READ event means that there are |
| 222 // connections ready for accept unless accompanied with one of the | 258 // connections ready to be accepted. |
| 223 // other flags. | 259 if (event->filter == EVFILT_READ) { |
| 224 if ((pollfd->revents & POLLIN) != 0) { | 260 if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent); |
| 225 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 261 if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent); |
| 226 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | |
| 227 if (event_mask == 0) event_mask |= (1 << kInEvent); | 262 if (event_mask == 0) event_mask |= (1 << kInEvent); |
| 228 } | 263 } |
| 229 } else { | 264 } else { |
| 230 if ((pollfd->revents & POLLNVAL) != 0) { | |
| 231 return 0; | |
| 232 } | |
| 233 | |
| 234 // Prioritize data events over close and error events. | 265 // Prioritize data events over close and error events. |
| 235 if ((pollfd->revents & POLLIN) != 0) { | 266 if (event->filter == EVFILT_READ) { |
| 236 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { | 267 if (FDUtils::AvailableBytes(sd->fd()) != 0) { |
| 237 event_mask = (1 << kInEvent); | 268 event_mask = (1 << kInEvent); |
| 238 } else if (((pollfd->revents & POLLHUP) != 0)) { | 269 } else if ((event->flags & EV_EOF) != 0) { |
| 239 event_mask = (1 << kCloseEvent); | 270 event_mask = (1 << kCloseEvent); |
| 240 sd->MarkClosedRead(); | 271 sd->MarkClosedRead(); |
| 241 } else if ((pollfd->revents & POLLERR) != 0) { | 272 } else if ((event->flags & EV_ERROR) != 0) { |
| 242 event_mask = (1 << kErrorEvent); | 273 event_mask = (1 << kErrorEvent); |
| 243 } else { | |
| 244 if (sd->IsPipe()) { | |
| 245 // When reading from stdin (either from a terminal or piped | |
| 246 // input) treat POLLIN with 0 available bytes as | |
| 247 // end-of-file. | |
| 248 if (sd->fd() == STDIN_FILENO) { | |
| 249 event_mask = (1 << kCloseEvent); | |
| 250 sd->MarkClosedRead(); | |
| 251 } | |
| 252 } else { | |
| 253 // If POLLIN is set with no available data and no POLLHUP use | |
| 254 // recv to peek for whether the other end of the socket | |
| 255 // actually closed. | |
| 256 char buffer; | |
| 257 ssize_t bytesPeeked = | |
| 258 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK)); | |
| 259 ASSERT(EAGAIN == EWOULDBLOCK); | |
| 260 if (bytesPeeked == 0) { | |
| 261 event_mask = (1 << kCloseEvent); | |
| 262 sd->MarkClosedRead(); | |
| 263 } else if (errno != EWOULDBLOCK) { | |
| 264 fprintf(stderr, "Error recv: %s\n", strerror(errno)); | |
| 265 } | |
| 266 } | |
| 267 } | 274 } |
| 268 } | 275 } |
| 269 | 276 |
| 270 // On pipes POLLHUP is reported without POLLIN when there is no | 277 if (event->filter == EVFILT_WRITE) { |
| 271 // more data to read. | 278 if ((event->flags & EV_ERROR) != 0) { |
| 272 if (sd->IsPipe()) { | |
| 273 if (((pollfd->revents & POLLIN) == 0) && | |
| 274 ((pollfd->revents & POLLHUP) != 0)) { | |
| 275 event_mask = (1 << kCloseEvent); | |
| 276 sd->MarkClosedRead(); | |
| 277 } | |
| 278 } | |
| 279 | |
| 280 if ((pollfd->revents & POLLOUT) != 0) { | |
| 281 if ((pollfd->revents & POLLERR) != 0) { | |
| 282 event_mask = (1 << kErrorEvent); | 279 event_mask = (1 << kErrorEvent); |
| 283 sd->MarkClosedWrite(); | 280 sd->MarkClosedWrite(); |
| 281 } else if ((event->flags & EV_EOF) != 0) { |
| 282 // If the receiver closed for reading, close for writing, |
| 283 // update the registration with kqueue, and do not report a |
| 284 // write event. |
| 285 sd->MarkClosedWrite(); |
| 286 UpdateKqueue(kqueue_fd_, sd); |
| 284 } else { | 287 } else { |
| 285 event_mask |= (1 << kOutEvent); | 288 event_mask |= (1 << kOutEvent); |
| 286 } | 289 } |
| 287 } | 290 } |
| 288 } | 291 } |
| 289 | 292 |
| 290 return event_mask; | 293 return event_mask; |
| 291 } | 294 } |
| 292 | 295 |
| 293 | 296 |
| 294 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, | 297 void EventHandlerImplementation::HandleEvents(struct kevent* events, |
| 295 int pollfds_size, | 298 int size) { |
| 296 int result_size) { | 299 for (int i = 0; i < size; i++) { |
| 297 if ((pollfds[0].revents & POLLIN) != 0) { | 300 if (events[i].udata != NULL) { |
| 298 result_size -= 1; | 301 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); |
| 299 } | 302 intptr_t event_mask = GetEvents(events + i, sd); |
| 300 if (result_size > 0) { | |
| 301 for (int i = 1; i < pollfds_size; i++) { | |
| 302 /* | |
| 303 * The fd is unregistered. It gets re-registered when the request | |
| 304 * was handled by dart. | |
| 305 */ | |
| 306 intptr_t event_mask = GetPollEvents(&pollfds[i]); | |
| 307 if (event_mask != 0) { | 303 if (event_mask != 0) { |
| 308 intptr_t fd = pollfds[i].fd; | 304 // Unregister events for the file descriptor. Events will be |
| 309 SocketData* sd = GetSocketData(fd); | 305 // registered again when the current event has been handled in |
| 306 // Dart code. |
| 307 RemoveFromKqueue(kqueue_fd_, sd); |
| 310 Dart_Port port = sd->port(); | 308 Dart_Port port = sd->port(); |
| 311 ASSERT(port != 0); | 309 ASSERT(port != 0); |
| 312 sd->Unregister(); | |
| 313 DartUtils::PostInt32(port, event_mask); | 310 DartUtils::PostInt32(port, event_mask); |
| 314 } | 311 } |
| 315 } | 312 } |
| 316 } | 313 } |
| 317 HandleInterruptFd(); | 314 HandleInterruptFd(); |
| 318 } | 315 } |
| 319 | 316 |
| 320 | 317 |
| 321 intptr_t EventHandlerImplementation::GetTimeout() { | 318 intptr_t EventHandlerImplementation::GetTimeout() { |
| 322 if (timeout_ == kInfinityTimeout) { | 319 if (timeout_ == kInfinityTimeout) { |
| 323 return kInfinityTimeout; | 320 return kInfinityTimeout; |
| 324 } | 321 } |
| 325 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 322 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
| 326 return (millis < 0) ? 0 : millis; | 323 return (millis < 0) ? 0 : millis; |
| 327 } | 324 } |
| 328 | 325 |
| 329 | 326 |
| 330 void EventHandlerImplementation::HandleTimeout() { | 327 void EventHandlerImplementation::HandleTimeout() { |
| 331 if (timeout_ != kInfinityTimeout) { | 328 if (timeout_ != kInfinityTimeout) { |
| 332 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 329 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
| 333 if (millis <= 0) { | 330 if (millis <= 0) { |
| 334 DartUtils::PostNull(timeout_port_); | 331 DartUtils::PostNull(timeout_port_); |
| 335 timeout_ = kInfinityTimeout; | 332 timeout_ = kInfinityTimeout; |
| 336 timeout_port_ = 0; | 333 timeout_port_ = 0; |
| 337 } | 334 } |
| 338 } | 335 } |
| 339 } | 336 } |
| 340 | 337 |
| 341 | 338 |
| 342 void EventHandlerImplementation::Poll(uword args) { | 339 void EventHandlerImplementation::EventHandlerEntry(uword args) { |
| 343 intptr_t pollfds_size; | 340 static const intptr_t kMaxEvents = 16; |
| 344 struct pollfd* pollfds; | 341 struct kevent events[kMaxEvents]; |
| 345 EventHandlerImplementation* handler = | 342 EventHandlerImplementation* handler = |
| 346 reinterpret_cast<EventHandlerImplementation*>(args); | 343 reinterpret_cast<EventHandlerImplementation*>(args); |
| 347 ASSERT(handler != NULL); | 344 ASSERT(handler != NULL); |
| 348 while (1) { | 345 while (1) { |
| 349 pollfds = handler->GetPollFds(&pollfds_size); | |
| 350 intptr_t millis = handler->GetTimeout(); | 346 intptr_t millis = handler->GetTimeout(); |
| 351 intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); | 347 struct timespec ts; |
| 352 ASSERT(EAGAIN == EWOULDBLOCK); | 348 int64_t secs = 0; |
| 349 int64_t nanos = 0; |
| 350 if (millis > 0) { |
| 351 secs = millis / 1000; |
| 352 nanos = (millis - (secs * 1000)) * 1000000; |
| 353 } |
| 354 ts.tv_sec = secs; |
| 355 ts.tv_nsec = nanos; |
| 356 intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_, |
| 357 NULL, |
| 358 0, |
| 359 events, |
| 360 kMaxEvents, |
| 361 &ts)); |
| 353 if (result == -1) { | 362 if (result == -1) { |
| 354 if (errno != EWOULDBLOCK) { | 363 perror("kevent failed"); |
| 355 perror("Poll failed"); | 364 FATAL("kevent failed\n"); |
| 356 } | |
| 357 } else { | 365 } else { |
| 358 handler->HandleTimeout(); | 366 handler->HandleTimeout(); |
| 359 handler->HandleEvents(pollfds, pollfds_size, result); | 367 handler->HandleEvents(events, result); |
| 360 } | 368 } |
| 361 free(pollfds); | |
| 362 } | 369 } |
| 363 } | 370 } |
| 364 | 371 |
| 365 | 372 |
| 366 void EventHandlerImplementation::StartEventHandler() { | 373 void EventHandlerImplementation::StartEventHandler() { |
| 367 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, | 374 int result = |
| 368 reinterpret_cast<uword>(this)); | 375 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry, |
| 376 reinterpret_cast<uword>(this)); |
| 369 if (result != 0) { | 377 if (result != 0) { |
| 370 FATAL1("Failed to start event handler thread %d", result); | 378 FATAL1("Failed to start event handler thread %d", result); |
| 371 } | 379 } |
| 372 } | 380 } |
| 373 | 381 |
| 374 | 382 |
| 375 void EventHandlerImplementation::SendData(intptr_t id, | 383 void EventHandlerImplementation::SendData(intptr_t id, |
| 376 Dart_Port dart_port, | 384 Dart_Port dart_port, |
| 377 intptr_t data) { | 385 intptr_t data) { |
| 378 WakeupHandler(id, dart_port, data); | 386 WakeupHandler(id, dart_port, data); |
| 379 } | 387 } |
| 380 | 388 |
| 381 | 389 |
| 382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | 390 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| 383 // The hashmap does not support keys with value 0. | 391 // The hashmap does not support keys with value 0. |
| 384 return reinterpret_cast<void*>(fd + 1); | 392 return reinterpret_cast<void*>(fd + 1); |
| 385 } | 393 } |
| 386 | 394 |
| 387 | 395 |
| 388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | 396 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| 389 // The hashmap does not support keys with value 0. | 397 // The hashmap does not support keys with value 0. |
| 390 return dart::Utils::WordHash(fd + 1); | 398 return dart::Utils::WordHash(fd + 1); |
| 391 } | 399 } |
| OLD | NEW |