Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: runtime/bin/eventhandler_macos.cc

Issue 9365010: Move the event handler on macos from poll to kqueue. Simpler and faster. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "bin/eventhandler.h" 5 #include "bin/eventhandler.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <poll.h>
9 #include <pthread.h> 8 #include <pthread.h>
10 #include <stdio.h> 9 #include <stdio.h>
11 #include <string.h> 10 #include <string.h>
11 #include <sys/event.h>
12 #include <sys/time.h> 12 #include <sys/time.h>
13 #include <unistd.h> 13 #include <unistd.h>
14 14
15 #include "bin/dartutils.h" 15 #include "bin/dartutils.h"
16 #include "bin/fdutils.h" 16 #include "bin/fdutils.h"
17 #include "bin/hashmap.h" 17 #include "bin/hashmap.h"
18 #include "platform/utils.h" 18 #include "platform/utils.h"
19 19
20 20
21 int64_t GetCurrentTimeMilliseconds() { 21 int64_t GetCurrentTimeMilliseconds() {
22 struct timeval tv; 22 struct timeval tv;
23 if (gettimeofday(&tv, NULL) < 0) { 23 if (gettimeofday(&tv, NULL) < 0) {
24 UNREACHABLE(); 24 UNREACHABLE();
25 return 0; 25 return 0;
26 } 26 }
27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000;
28 } 28 }
29 29
30 30
31 static const int kInterruptMessageSize = sizeof(InterruptMessage); 31 static const int kInterruptMessageSize = sizeof(InterruptMessage);
32 static const int kInfinityTimeout = -1; 32 static const int kInfinityTimeout = -1;
33 static const int kTimerId = -1; 33 static const int kTimerId = -1;
34 34
35 35
36 intptr_t SocketData::GetPollEvents() { 36 bool SocketData::HasReadEvent() {
37 // Do not ask for POLLERR and POLLHUP explicitly as they are 37 return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0);
38 // triggered anyway.
39 intptr_t events = 0;
40 if (!IsClosedRead()) {
41 if ((mask_ & (1 << kInEvent)) != 0) {
42 events |= POLLIN;
43 }
44 }
45 if (!IsClosedWrite()) {
46 if ((mask_ & (1 << kOutEvent)) != 0) {
47 events |= POLLOUT;
48 }
49 }
50 return events;
51 } 38 }
52 39
53 40
41 bool SocketData::HasWriteEvent() {
42 return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0);
43 }
44
45
46 // Unregister the file descriptor for a SocketData structure with kqueue.
47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
48 static const intptr_t kMaxChanges = 2;
49 intptr_t changes = 0;
50 struct kevent events[kMaxChanges];
51 if (sd->read_tracked_by_kqueue()) {
52 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
53 ++changes;
54 sd->set_read_tracked_by_kqueue(false);
55 }
56 if (sd->write_tracked_by_kqueue()) {
57 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd);
58 ++changes;
59 sd->set_write_tracked_by_kqueue(false);
60 }
61 if (changes > 0) {
62 ASSERT(changes < kMaxChanges);
63 int status =
64 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
65 if (status == -1) {
66 FATAL("Failed deleting events from kqueue");
67 }
68 }
69 }
70
71
72 // Update the kqueue registration for SocketData structure to reflect
73 // the events currently of interest.
74 static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) {
75 static const intptr_t kMaxChanges = 2;
76 intptr_t changes = 0;
77 struct kevent events[kMaxChanges];
78 if (sd->port() != 0) {
79 // Register or unregister READ filter if needed.
80 if (sd->HasReadEvent()) {
81 if (!sd->read_tracked_by_kqueue()) {
82 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd);
83 ++changes;
84 sd->set_read_tracked_by_kqueue(true);
85 }
86 } else if (sd->read_tracked_by_kqueue()) {
87 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
88 ++changes;
89 sd->set_read_tracked_by_kqueue(false);
90 }
91 // Register or unregister WRITE filter if needed.
92 if (sd->HasWriteEvent()) {
93 if (!sd->write_tracked_by_kqueue()) {
94 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd);
95 ++changes;
96 sd->set_write_tracked_by_kqueue(true);
97 }
98 } else if (sd->write_tracked_by_kqueue()) {
99 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
100 ++changes;
101 sd->set_write_tracked_by_kqueue(false);
102 }
103 }
104 if (changes > 0) {
105 ASSERT(changes < kMaxChanges);
106 int status =
107 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
108 if (status == -1) {
109 FATAL("Failed updating kqueue");
110 }
111 }
112 }
113
114
54 EventHandlerImplementation::EventHandlerImplementation() 115 EventHandlerImplementation::EventHandlerImplementation()
55 : socket_map_(&HashMap::SamePointerValue, 16) { 116 : socket_map_(&HashMap::SamePointerValue, 16) {
56 intptr_t result; 117 intptr_t result;
57 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); 118 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
58 if (result != 0) { 119 if (result != 0) {
59 FATAL("Pipe creation failed"); 120 FATAL("Pipe creation failed");
60 } 121 }
61 FDUtils::SetNonBlocking(interrupt_fds_[0]); 122 FDUtils::SetNonBlocking(interrupt_fds_[0]);
62 timeout_ = kInfinityTimeout; 123 timeout_ = kInfinityTimeout;
63 timeout_port_ = 0; 124 timeout_port_ = 0;
125
126 kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue());
127 if (kqueue_fd_ == -1) {
128 FATAL("Failed creating kqueue");
129 }
130 // Register the interrupt_fd with the kqueue.
131 struct kevent event;
132 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL);
133 int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL));
134 if (status == -1) {
135 FATAL("Failed adding interrupt fd to kqueue");
136 }
64 } 137 }
65 138
66 139
67 EventHandlerImplementation::~EventHandlerImplementation() { 140 EventHandlerImplementation::~EventHandlerImplementation() {
68 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); 141 TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
69 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); 142 TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
70 } 143 }
71 144
72 145
73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { 146 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
74 ASSERT(fd >= 0); 147 ASSERT(fd >= 0);
75 HashMap::Entry* entry = socket_map_.Lookup( 148 HashMap::Entry* entry = socket_map_.Lookup(
76 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); 149 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
77 ASSERT(entry != NULL); 150 ASSERT(entry != NULL);
78 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); 151 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
79 if (sd == NULL) { 152 if (sd == NULL) {
80 // If there is no data in the hash map for this file descriptor 153 // If there is no data in the hash map for this file descriptor a
81 // then this is inserting a new SocketData for the file descriptor. 154 // new SocketData for the file descriptor is inserted.
82 sd = new SocketData(fd); 155 sd = new SocketData(fd);
83 entry->value = sd; 156 entry->value = sd;
84 } 157 }
85 ASSERT(fd == sd->fd()); 158 ASSERT(fd == sd->fd());
86 return sd; 159 return sd;
87 } 160 }
88 161
89 162
90 void EventHandlerImplementation::WakeupHandler(intptr_t id, 163 void EventHandlerImplementation::WakeupHandler(intptr_t id,
91 Dart_Port dart_port, 164 Dart_Port dart_port,
92 int64_t data) { 165 int64_t data) {
93 InterruptMessage msg; 166 InterruptMessage msg;
94 msg.id = id; 167 msg.id = id;
95 msg.dart_port = dart_port; 168 msg.dart_port = dart_port;
96 msg.data = data; 169 msg.data = data;
97 intptr_t result = 170 intptr_t result =
98 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); 171 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
99 if (result != kInterruptMessageSize) { 172 if (result != kInterruptMessageSize) {
100 if (result == -1) { 173 if (result == -1) {
101 perror("Interrupt message failure:"); 174 perror("Interrupt message failure:");
102 } 175 }
103 FATAL1("Interrupt message failure. Wrote %d bytes.", result); 176 FATAL1("Interrupt message failure. Wrote %d bytes.", result);
104 } 177 }
105 } 178 }
106 179
107 180
108 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
109 struct pollfd* pollfds;
110
111 // Calculate the number of file descriptors to poll on.
112 intptr_t numPollfds = 1;
113 for (HashMap::Entry* entry = socket_map_.Start();
114 entry != NULL;
115 entry = socket_map_.Next(entry)) {
116 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
117 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
118 }
119
120 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
121 numPollfds));
122 pollfds[0].fd = interrupt_fds_[0];
123 pollfds[0].events |= POLLIN;
124
125 int i = 1;
126 for (HashMap::Entry* entry = socket_map_.Start();
127 entry != NULL;
128 entry = socket_map_.Next(entry)) {
129 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
130 intptr_t events = sd->GetPollEvents();
131 if (sd->port() > 0 && events != 0) {
132 // Fd is added to the poll set.
133 pollfds[i].fd = sd->fd();
134 pollfds[i].events = events;
135 i++;
136 }
137 }
138 ASSERT(numPollfds == i);
139 *pollfds_size = i;
140
141 return pollfds;
142 }
143
144
145 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { 181 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
146 int total_read = 0; 182 int total_read = 0;
147 int bytes_read = 183 int bytes_read =
148 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); 184 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize));
149 if (bytes_read < 0) { 185 if (bytes_read < 0) {
150 return false; 186 return false;
151 } 187 }
152 total_read = bytes_read; 188 total_read = bytes_read;
153 while (total_read < kInterruptMessageSize) { 189 while (total_read < kInterruptMessageSize) {
154 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], 190 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0],
(...skipping 11 matching lines...) Expand all
166 while (GetInterruptMessage(&msg)) { 202 while (GetInterruptMessage(&msg)) {
167 if (msg.id == kTimerId) { 203 if (msg.id == kTimerId) {
168 timeout_ = msg.data; 204 timeout_ = msg.data;
169 timeout_port_ = msg.dart_port; 205 timeout_port_ = msg.dart_port;
170 } else { 206 } else {
171 SocketData* sd = GetSocketData(msg.id); 207 SocketData* sd = GetSocketData(msg.id);
172 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { 208 if ((msg.data & (1 << kShutdownReadCommand)) != 0) {
173 ASSERT(msg.data == (1 << kShutdownReadCommand)); 209 ASSERT(msg.data == (1 << kShutdownReadCommand));
174 // Close the socket for reading. 210 // Close the socket for reading.
175 sd->ShutdownRead(); 211 sd->ShutdownRead();
212 UpdateKqueue(kqueue_fd_, sd);
176 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { 213 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
177 ASSERT(msg.data == (1 << kShutdownWriteCommand)); 214 ASSERT(msg.data == (1 << kShutdownWriteCommand));
178 // Close the socket for writing. 215 // Close the socket for writing.
179 sd->ShutdownWrite(); 216 sd->ShutdownWrite();
217 UpdateKqueue(kqueue_fd_, sd);
180 } else if ((msg.data & (1 << kCloseCommand)) != 0) { 218 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
181 ASSERT(msg.data == (1 << kCloseCommand)); 219 ASSERT(msg.data == (1 << kCloseCommand));
182 // Close the socket and free system resources. 220 // Close the socket and free system resources.
221 RemoveFromKqueue(kqueue_fd_, sd);
183 intptr_t fd = sd->fd(); 222 intptr_t fd = sd->fd();
184 sd->Close(); 223 sd->Close();
185 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); 224 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
186 delete sd; 225 delete sd;
187 } else { 226 } else {
188 // Setup events to wait for. 227 // Setup events to wait for.
189 sd->SetPortAndMask(msg.dart_port, msg.data); 228 sd->SetPortAndMask(msg.dart_port, msg.data);
229 UpdateKqueue(kqueue_fd_, sd);
190 } 230 }
191 } 231 }
192 } 232 }
193 } 233 }
194 234
195 #ifdef DEBUG_POLL
196 static void PrintEventMask(struct pollfd* pollfd) {
197 printf("%d ", pollfd->fd);
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
203 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
204 int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL;
205 if ((pollfd->revents & ~all_events) != 0) {
206 printf("(and %08x) ", pollfd->revents & ~all_events);
207 }
208 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
209 235
236 #ifdef DEBUG_KQUEUE
237 static void PrintEventMask(intptr_t fd, struct kevent* event) {
238 printf("%d ", fd);
239 if (event->filter == EVFILT_READ) printf("EVFILT_READ ");
240 if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE ");
241 printf("flags: %x: ", event->flags);
242 if ((event->flags & EV_EOF) != 0) printf("EV_EOF ");
243 if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR ");
244 printf("(available %d) ", FDUtils::AvailableBytes(fd));
210 printf("\n"); 245 printf("\n");
211 } 246 }
212 #endif 247 #endif
213 248
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { 249
215 #ifdef DEBUG_POLL 250 intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
216 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); 251 SocketData* sd) {
252 #ifdef DEBUG_KQUEUE
253 PrintEventMask(sd->fd(), event);
217 #endif 254 #endif
218 intptr_t event_mask = 0; 255 intptr_t event_mask = 0;
219 SocketData* sd = GetSocketData(pollfd->fd);
220 if (sd->IsListeningSocket()) { 256 if (sd->IsListeningSocket()) {
221 // For listening sockets the POLLIN event indicate that there are 257 // On a listening socket the READ event means that there are
222 // connections ready for accept unless accompanied with one of the 258 // connections ready to be accepted.
223 // other flags. 259 if (event->filter == EVFILT_READ) {
224 if ((pollfd->revents & POLLIN) != 0) { 260 if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent);
225 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); 261 if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent);
226 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
227 if (event_mask == 0) event_mask |= (1 << kInEvent); 262 if (event_mask == 0) event_mask |= (1 << kInEvent);
228 } 263 }
229 } else { 264 } else {
230 if ((pollfd->revents & POLLNVAL) != 0) {
231 return 0;
232 }
233
234 // Prioritize data events over close and error events. 265 // Prioritize data events over close and error events.
235 if ((pollfd->revents & POLLIN) != 0) { 266 if (event->filter == EVFILT_READ) {
236 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { 267 if (FDUtils::AvailableBytes(sd->fd()) != 0) {
237 event_mask = (1 << kInEvent); 268 event_mask = (1 << kInEvent);
238 } else if (((pollfd->revents & POLLHUP) != 0)) { 269 } else if ((event->flags & EV_EOF) != 0) {
239 event_mask = (1 << kCloseEvent); 270 event_mask = (1 << kCloseEvent);
240 sd->MarkClosedRead(); 271 sd->MarkClosedRead();
241 } else if ((pollfd->revents & POLLERR) != 0) { 272 } else if ((event->flags & EV_ERROR) != 0) {
242 event_mask = (1 << kErrorEvent); 273 event_mask = (1 << kErrorEvent);
243 } else {
244 if (sd->IsPipe()) {
245 // When reading from stdin (either from a terminal or piped
246 // input) treat POLLIN with 0 available bytes as
247 // end-of-file.
248 if (sd->fd() == STDIN_FILENO) {
249 event_mask = (1 << kCloseEvent);
250 sd->MarkClosedRead();
251 }
252 } else {
253 // If POLLIN is set with no available data and no POLLHUP use
254 // recv to peek for whether the other end of the socket
255 // actually closed.
256 char buffer;
257 ssize_t bytesPeeked =
258 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
259 ASSERT(EAGAIN == EWOULDBLOCK);
260 if (bytesPeeked == 0) {
261 event_mask = (1 << kCloseEvent);
262 sd->MarkClosedRead();
263 } else if (errno != EWOULDBLOCK) {
264 fprintf(stderr, "Error recv: %s\n", strerror(errno));
265 }
266 }
267 } 274 }
268 } 275 }
269 276
270 // On pipes POLLHUP is reported without POLLIN when there is no 277 if (event->filter == EVFILT_WRITE) {
271 // more data to read. 278 if ((event->flags & EV_ERROR) != 0) {
272 if (sd->IsPipe()) {
273 if (((pollfd->revents & POLLIN) == 0) &&
274 ((pollfd->revents & POLLHUP) != 0)) {
275 event_mask = (1 << kCloseEvent);
276 sd->MarkClosedRead();
277 }
278 }
279
280 if ((pollfd->revents & POLLOUT) != 0) {
281 if ((pollfd->revents & POLLERR) != 0) {
282 event_mask = (1 << kErrorEvent); 279 event_mask = (1 << kErrorEvent);
283 sd->MarkClosedWrite(); 280 sd->MarkClosedWrite();
281 } else if ((event->flags & EV_EOF) != 0) {
282 // If the receiver closed for reading, close for writing,
283 // update the registration with kqueue, and do not report a
284 // write event.
285 sd->MarkClosedWrite();
286 UpdateKqueue(kqueue_fd_, sd);
284 } else { 287 } else {
285 event_mask |= (1 << kOutEvent); 288 event_mask |= (1 << kOutEvent);
286 } 289 }
287 } 290 }
288 } 291 }
289 292
290 return event_mask; 293 return event_mask;
291 } 294 }
292 295
293 296
294 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, 297 void EventHandlerImplementation::HandleEvents(struct kevent* events,
295 int pollfds_size, 298 int size) {
296 int result_size) { 299 for (int i = 0; i < size; i++) {
297 if ((pollfds[0].revents & POLLIN) != 0) { 300 if (events[i].udata != NULL) {
298 result_size -= 1; 301 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
299 } 302 intptr_t event_mask = GetEvents(events + i, sd);
300 if (result_size > 0) {
301 for (int i = 1; i < pollfds_size; i++) {
302 /*
303 * The fd is unregistered. It gets re-registered when the request
304 * was handled by dart.
305 */
306 intptr_t event_mask = GetPollEvents(&pollfds[i]);
307 if (event_mask != 0) { 303 if (event_mask != 0) {
308 intptr_t fd = pollfds[i].fd; 304 // Unregister events for the file descriptor. Events will be
309 SocketData* sd = GetSocketData(fd); 305 // registered again when the current event has been handled in
306 // Dart code.
307 RemoveFromKqueue(kqueue_fd_, sd);
310 Dart_Port port = sd->port(); 308 Dart_Port port = sd->port();
311 ASSERT(port != 0); 309 ASSERT(port != 0);
312 sd->Unregister();
313 DartUtils::PostInt32(port, event_mask); 310 DartUtils::PostInt32(port, event_mask);
314 } 311 }
315 } 312 }
316 } 313 }
317 HandleInterruptFd(); 314 HandleInterruptFd();
318 } 315 }
319 316
320 317
321 intptr_t EventHandlerImplementation::GetTimeout() { 318 intptr_t EventHandlerImplementation::GetTimeout() {
322 if (timeout_ == kInfinityTimeout) { 319 if (timeout_ == kInfinityTimeout) {
323 return kInfinityTimeout; 320 return kInfinityTimeout;
324 } 321 }
325 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); 322 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds();
326 return (millis < 0) ? 0 : millis; 323 return (millis < 0) ? 0 : millis;
327 } 324 }
328 325
329 326
330 void EventHandlerImplementation::HandleTimeout() { 327 void EventHandlerImplementation::HandleTimeout() {
331 if (timeout_ != kInfinityTimeout) { 328 if (timeout_ != kInfinityTimeout) {
332 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); 329 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds();
333 if (millis <= 0) { 330 if (millis <= 0) {
334 DartUtils::PostNull(timeout_port_); 331 DartUtils::PostNull(timeout_port_);
335 timeout_ = kInfinityTimeout; 332 timeout_ = kInfinityTimeout;
336 timeout_port_ = 0; 333 timeout_port_ = 0;
337 } 334 }
338 } 335 }
339 } 336 }
340 337
341 338
342 void EventHandlerImplementation::Poll(uword args) { 339 void EventHandlerImplementation::EventHandlerEntry(uword args) {
343 intptr_t pollfds_size; 340 static const intptr_t kMaxEvents = 16;
344 struct pollfd* pollfds; 341 struct kevent events[kMaxEvents];
345 EventHandlerImplementation* handler = 342 EventHandlerImplementation* handler =
346 reinterpret_cast<EventHandlerImplementation*>(args); 343 reinterpret_cast<EventHandlerImplementation*>(args);
347 ASSERT(handler != NULL); 344 ASSERT(handler != NULL);
348 while (1) { 345 while (1) {
349 pollfds = handler->GetPollFds(&pollfds_size);
350 intptr_t millis = handler->GetTimeout(); 346 intptr_t millis = handler->GetTimeout();
351 intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); 347 struct timespec ts;
352 ASSERT(EAGAIN == EWOULDBLOCK); 348 int64_t secs = 0;
349 int64_t nanos = 0;
350 if (millis > 0) {
351 secs = millis / 1000;
352 nanos = (millis - (secs * 1000)) * 1000000;
353 }
354 ts.tv_sec = secs;
355 ts.tv_nsec = nanos;
356 intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_,
357 NULL,
358 0,
359 events,
360 kMaxEvents,
361 &ts));
353 if (result == -1) { 362 if (result == -1) {
354 if (errno != EWOULDBLOCK) { 363 perror("kevent failed");
355 perror("Poll failed"); 364 FATAL("kevent failed\n");
356 }
357 } else { 365 } else {
358 handler->HandleTimeout(); 366 handler->HandleTimeout();
359 handler->HandleEvents(pollfds, pollfds_size, result); 367 handler->HandleEvents(events, result);
360 } 368 }
361 free(pollfds);
362 } 369 }
363 } 370 }
364 371
365 372
366 void EventHandlerImplementation::StartEventHandler() { 373 void EventHandlerImplementation::StartEventHandler() {
367 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, 374 int result =
368 reinterpret_cast<uword>(this)); 375 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry,
376 reinterpret_cast<uword>(this));
369 if (result != 0) { 377 if (result != 0) {
370 FATAL1("Failed to start event handler thread %d", result); 378 FATAL1("Failed to start event handler thread %d", result);
371 } 379 }
372 } 380 }
373 381
374 382
375 void EventHandlerImplementation::SendData(intptr_t id, 383 void EventHandlerImplementation::SendData(intptr_t id,
376 Dart_Port dart_port, 384 Dart_Port dart_port,
377 intptr_t data) { 385 intptr_t data) {
378 WakeupHandler(id, dart_port, data); 386 WakeupHandler(id, dart_port, data);
379 } 387 }
380 388
381 389
382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { 390 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
383 // The hashmap does not support keys with value 0. 391 // The hashmap does not support keys with value 0.
384 return reinterpret_cast<void*>(fd + 1); 392 return reinterpret_cast<void*>(fd + 1);
385 } 393 }
386 394
387 395
388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { 396 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
389 // The hashmap does not support keys with value 0. 397 // The hashmap does not support keys with value 0.
390 return dart::Utils::WordHash(fd + 1); 398 return dart::Utils::WordHash(fd + 1);
391 } 399 }
OLDNEW
« no previous file with comments | « runtime/bin/eventhandler_macos.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698