Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: runtime/bin/eventhandler_macos.cc

Issue 9365010: Move the event handler on macos from poll to kqueue. Simpler and faster. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Avoid name clash Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 #include "bin/eventhandler.h" 5 #include "bin/eventhandler.h"
6 6
7 #include <errno.h> 7 #include <errno.h>
8 #include <poll.h>
9 #include <pthread.h> 8 #include <pthread.h>
10 #include <stdio.h> 9 #include <stdio.h>
11 #include <string.h> 10 #include <string.h>
11 #include <sys/event.h>
12 #include <sys/time.h> 12 #include <sys/time.h>
13 #include <unistd.h> 13 #include <unistd.h>
14 14
15 #include "bin/dartutils.h" 15 #include "bin/dartutils.h"
16 #include "bin/fdutils.h" 16 #include "bin/fdutils.h"
17 #include "bin/hashmap.h" 17 #include "bin/hashmap.h"
18 #include "platform/utils.h" 18 #include "platform/utils.h"
19 19
20 20
21 int64_t GetCurrentTimeMilliseconds() { 21 int64_t GetCurrentTimeMilliseconds() {
22 struct timeval tv; 22 struct timeval tv;
23 if (gettimeofday(&tv, NULL) < 0) { 23 if (gettimeofday(&tv, NULL) < 0) {
24 UNREACHABLE(); 24 UNREACHABLE();
25 return 0; 25 return 0;
26 } 26 }
27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000;
28 } 28 }
29 29
30 30
31 static const int kInterruptMessageSize = sizeof(InterruptMessage); 31 static const int kInterruptMessageSize = sizeof(InterruptMessage);
32 static const int kInfinityTimeout = -1; 32 static const int kInfinityTimeout = -1;
33 static const int kTimerId = -1; 33 static const int kTimerId = -1;
34 34
35 35
36 intptr_t SocketData::GetPollEvents() { 36 bool SocketData::HasReadEvent() {
37 // Do not ask for POLLERR and POLLHUP explicitly as they are 37 return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0);
38 // triggered anyway.
39 intptr_t events = 0;
40 if (!IsClosedRead()) {
41 if ((mask_ & (1 << kInEvent)) != 0) {
42 events |= POLLIN;
43 }
44 }
45 if (!IsClosedWrite()) {
46 if ((mask_ & (1 << kOutEvent)) != 0) {
47 events |= POLLOUT;
48 }
49 }
50 return events;
51 } 38 }
52 39
53 40
41 bool SocketData::HasWriteEvent() {
42 return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0);
43 }
44
45
46 // Unregister the file descriptor for a SocketData structure with kqueue.
47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
Søren Gjesse 2012/02/08 10:40:36 Maybe add constant kMaxChanges = 2.
Mads Ager (google) 2012/02/08 12:15:15 Done.
48 intptr_t changes = 0;
49 struct kevent events[2];
50 if (sd->read_tracked_by_kqueue()) {
51 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
52 ++changes;
53 sd->set_read_tracked_by_kqueue(false);
54 }
55 if (sd->write_tracked_by_kqueue()) {
56 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd);
57 ++changes;
58 sd->set_write_tracked_by_kqueue(false);
59 }
60 if (changes > 0) {
Søren Gjesse 2012/02/08 10:40:36 And ASSERT(changes <= kMaxChanges)
Mads Ager (google) 2012/02/08 12:15:15 Done.
61 int status =
62 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
63 if (status == -1) {
64 FATAL("Failed deleting events from kqueue");
65 }
66 }
67 }
68
69
70 // Update the kqueue registration for SocketData structure to reflect
71 // the events currently of interest.
72 static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) {
73 intptr_t changes = 0;
Søren Gjesse 2012/02/08 10:40:36 Ditto constant.
Mads Ager (google) 2012/02/08 12:15:15 Done.
74 struct kevent events[2];
75 if (sd->port() != 0) {
76 // Register or unregister READ filter if needed.
77 if (sd->HasReadEvent()) {
Søren Gjesse 2012/02/08 10:40:36 Is sd->read_tracked_by_kqueue() always false here?
Mads Ager (google) 2012/02/08 12:15:15 It is not always false. What is there now is corre
78 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd);
79 ++changes;
80 sd->set_read_tracked_by_kqueue(true);
81 } else if (sd->read_tracked_by_kqueue()) {
82 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
83 ++changes;
84 sd->set_read_tracked_by_kqueue(false);
85 }
86 // Register or unregister WRITE filter if needed.
87 if (sd->HasWriteEvent()) {
Søren Gjesse 2012/02/08 10:40:36 Ditto, is sd->write_tracked_by_kqueue() is always
Mads Ager (google) 2012/02/08 12:15:15 Same as above. Added explicit checking.
88 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd);
89 ++changes;
90 sd->set_write_tracked_by_kqueue(true);
91 } else if (sd->write_tracked_by_kqueue()) {
92 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
93 ++changes;
94 sd->set_write_tracked_by_kqueue(false);
95 }
96 }
97 if (changes > 0) {
98 int status =
99 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
100 if (status == -1) {
101 FATAL("Failed updating kqueue");
102 }
103 }
104 }
105
106
54 EventHandlerImplementation::EventHandlerImplementation() 107 EventHandlerImplementation::EventHandlerImplementation()
55 : socket_map_(&HashMap::SamePointerValue, 16) { 108 : socket_map_(&HashMap::SamePointerValue, 16) {
56 intptr_t result; 109 intptr_t result;
57 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); 110 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
58 if (result != 0) { 111 if (result != 0) {
59 FATAL("Pipe creation failed"); 112 FATAL("Pipe creation failed");
60 } 113 }
61 FDUtils::SetNonBlocking(interrupt_fds_[0]); 114 FDUtils::SetNonBlocking(interrupt_fds_[0]);
62 timeout_ = kInfinityTimeout; 115 timeout_ = kInfinityTimeout;
63 timeout_port_ = 0; 116 timeout_port_ = 0;
117
118 kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue());
119 if (kqueue_fd_ == -1) {
120 FATAL("Failed creating kqueue");
121 }
122 // Register the interrupt_fd with the kqueue.
123 struct kevent event;
124 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL);
125 int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL));
126 if (status == -1) {
127 FATAL("Failed adding interrupt fd to kqueue");
128 }
64 } 129 }
65 130
66 131
67 EventHandlerImplementation::~EventHandlerImplementation() { 132 EventHandlerImplementation::~EventHandlerImplementation() {
68 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); 133 TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
69 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); 134 TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
70 } 135 }
71 136
72 137
73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { 138 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
74 ASSERT(fd >= 0); 139 ASSERT(fd >= 0);
75 HashMap::Entry* entry = socket_map_.Lookup( 140 HashMap::Entry* entry = socket_map_.Lookup(
76 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); 141 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
77 ASSERT(entry != NULL); 142 ASSERT(entry != NULL);
78 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); 143 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
79 if (sd == NULL) { 144 if (sd == NULL) {
80 // If there is no data in the hash map for this file descriptor 145 // If there is no data in the hash map for this file descriptor a
81 // then this is inserting a new SocketData for the file descriptor. 146 // new SocketData for the file descriptor is inserted.
82 sd = new SocketData(fd); 147 sd = new SocketData(fd);
83 entry->value = sd; 148 entry->value = sd;
84 } 149 }
85 ASSERT(fd == sd->fd()); 150 ASSERT(fd == sd->fd());
86 return sd; 151 return sd;
87 } 152 }
88 153
89 154
90 void EventHandlerImplementation::WakeupHandler(intptr_t id, 155 void EventHandlerImplementation::WakeupHandler(intptr_t id,
91 Dart_Port dart_port, 156 Dart_Port dart_port,
92 int64_t data) { 157 int64_t data) {
93 InterruptMessage msg; 158 InterruptMessage msg;
94 msg.id = id; 159 msg.id = id;
95 msg.dart_port = dart_port; 160 msg.dart_port = dart_port;
96 msg.data = data; 161 msg.data = data;
97 intptr_t result = 162 intptr_t result =
98 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); 163 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
99 if (result != kInterruptMessageSize) { 164 if (result != kInterruptMessageSize) {
100 if (result == -1) { 165 if (result == -1) {
101 perror("Interrupt message failure:"); 166 perror("Interrupt message failure:");
102 } 167 }
103 FATAL1("Interrupt message failure. Wrote %d bytes.", result); 168 FATAL1("Interrupt message failure. Wrote %d bytes.", result);
104 } 169 }
105 } 170 }
106 171
107 172
108 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
109 struct pollfd* pollfds;
110
111 // Calculate the number of file descriptors to poll on.
112 intptr_t numPollfds = 1;
113 for (HashMap::Entry* entry = socket_map_.Start();
114 entry != NULL;
115 entry = socket_map_.Next(entry)) {
116 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
117 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
118 }
119
120 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
121 numPollfds));
122 pollfds[0].fd = interrupt_fds_[0];
123 pollfds[0].events |= POLLIN;
124
125 int i = 1;
126 for (HashMap::Entry* entry = socket_map_.Start();
127 entry != NULL;
128 entry = socket_map_.Next(entry)) {
129 SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
130 intptr_t events = sd->GetPollEvents();
131 if (sd->port() > 0 && events != 0) {
132 // Fd is added to the poll set.
133 pollfds[i].fd = sd->fd();
134 pollfds[i].events = events;
135 i++;
136 }
137 }
138 ASSERT(numPollfds == i);
139 *pollfds_size = i;
140
141 return pollfds;
142 }
143
144
145 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { 173 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
146 int total_read = 0; 174 int total_read = 0;
147 int bytes_read = 175 int bytes_read =
148 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); 176 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize));
149 if (bytes_read < 0) { 177 if (bytes_read < 0) {
150 return false; 178 return false;
151 } 179 }
152 total_read = bytes_read; 180 total_read = bytes_read;
153 while (total_read < kInterruptMessageSize) { 181 while (total_read < kInterruptMessageSize) {
154 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], 182 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0],
(...skipping 11 matching lines...) Expand all
166 while (GetInterruptMessage(&msg)) { 194 while (GetInterruptMessage(&msg)) {
167 if (msg.id == kTimerId) { 195 if (msg.id == kTimerId) {
168 timeout_ = msg.data; 196 timeout_ = msg.data;
169 timeout_port_ = msg.dart_port; 197 timeout_port_ = msg.dart_port;
170 } else { 198 } else {
171 SocketData* sd = GetSocketData(msg.id); 199 SocketData* sd = GetSocketData(msg.id);
172 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { 200 if ((msg.data & (1 << kShutdownReadCommand)) != 0) {
173 ASSERT(msg.data == (1 << kShutdownReadCommand)); 201 ASSERT(msg.data == (1 << kShutdownReadCommand));
174 // Close the socket for reading. 202 // Close the socket for reading.
175 sd->ShutdownRead(); 203 sd->ShutdownRead();
204 UpdateKqueue(kqueue_fd_, sd);
176 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { 205 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
177 ASSERT(msg.data == (1 << kShutdownWriteCommand)); 206 ASSERT(msg.data == (1 << kShutdownWriteCommand));
178 // Close the socket for writing. 207 // Close the socket for writing.
179 sd->ShutdownWrite(); 208 sd->ShutdownWrite();
209 UpdateKqueue(kqueue_fd_, sd);
180 } else if ((msg.data & (1 << kCloseCommand)) != 0) { 210 } else if ((msg.data & (1 << kCloseCommand)) != 0) {
181 ASSERT(msg.data == (1 << kCloseCommand)); 211 ASSERT(msg.data == (1 << kCloseCommand));
182 // Close the socket and free system resources. 212 // Close the socket and free system resources.
213 RemoveFromKqueue(kqueue_fd_, sd);
183 intptr_t fd = sd->fd(); 214 intptr_t fd = sd->fd();
184 sd->Close(); 215 sd->Close();
185 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); 216 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
186 delete sd; 217 delete sd;
187 } else { 218 } else {
188 // Setup events to wait for. 219 // Setup events to wait for.
189 sd->SetPortAndMask(msg.dart_port, msg.data); 220 sd->SetPortAndMask(msg.dart_port, msg.data);
221 UpdateKqueue(kqueue_fd_, sd);
190 } 222 }
191 } 223 }
192 } 224 }
193 } 225 }
194 226
195 #ifdef DEBUG_POLL
196 static void PrintEventMask(struct pollfd* pollfd) {
197 printf("%d ", pollfd->fd);
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
203 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
204 int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL;
205 if ((pollfd->revents & ~all_events) != 0) {
206 printf("(and %08x) ", pollfd->revents & ~all_events);
207 }
208 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
209 227
228 #ifdef DEBUG_KQUEUE
229 static void PrintEventMask(intptr_t fd, struct kevent* event) {
230 printf("%d ", fd);
231 if (event->filter == EVFILT_READ) printf("EVFILT_READ ");
232 if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE ");
233 printf("flags: %x: ", event->flags);
234 if ((event->flags & EV_EOF) != 0) printf("EV_EOF ");
235 if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR ");
236 printf("(available %d) ", FDUtils::AvailableBytes(fd));
210 printf("\n"); 237 printf("\n");
211 } 238 }
212 #endif 239 #endif
213 240
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { 241
215 #ifdef DEBUG_POLL 242 intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
Søren Gjesse 2012/02/08 10:40:36 Nice that we don't need special handling of pipes
Mads Ager (google) 2012/02/08 12:15:15 Yes, this is so much cleaner. :)
216 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); 243 SocketData* sd) {
244 #ifdef DEBUG_KQUEUE
245 PrintEventMask(sd->fd(), event);
217 #endif 246 #endif
218 intptr_t event_mask = 0; 247 intptr_t event_mask = 0;
219 SocketData* sd = GetSocketData(pollfd->fd);
220 if (sd->IsListeningSocket()) { 248 if (sd->IsListeningSocket()) {
221 // For listening sockets the POLLIN event indicate that there are 249 // On a listening socket the READ event means that there are
222 // connections ready for accept unless accompanied with one of the 250 // connections ready to be accepted.
223 // other flags. 251 if (event->filter == EVFILT_READ) {
224 if ((pollfd->revents & POLLIN) != 0) { 252 if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent);
225 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); 253 if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent);
226 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
227 if (event_mask == 0) event_mask |= (1 << kInEvent); 254 if (event_mask == 0) event_mask |= (1 << kInEvent);
228 } 255 }
229 } else { 256 } else {
230 if ((pollfd->revents & POLLNVAL) != 0) {
231 return 0;
232 }
233
234 // Prioritize data events over close and error events. 257 // Prioritize data events over close and error events.
235 if ((pollfd->revents & POLLIN) != 0) { 258 if (event->filter == EVFILT_READ) {
236 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { 259 if (FDUtils::AvailableBytes(sd->fd()) != 0) {
237 event_mask = (1 << kInEvent); 260 event_mask = (1 << kInEvent);
238 } else if (((pollfd->revents & POLLHUP) != 0)) { 261 } else if ((event->flags & EV_EOF) != 0) {
239 event_mask = (1 << kCloseEvent); 262 event_mask = (1 << kCloseEvent);
240 sd->MarkClosedRead(); 263 sd->MarkClosedRead();
241 } else if ((pollfd->revents & POLLERR) != 0) { 264 } else if ((event->flags & EV_ERROR) != 0) {
242 event_mask = (1 << kErrorEvent); 265 event_mask = (1 << kErrorEvent);
243 } else {
244 if (sd->IsPipe()) {
245 // When reading from stdin (either from a terminal or piped
246 // input) treat POLLIN with 0 available bytes as
247 // end-of-file.
248 if (sd->fd() == STDIN_FILENO) {
249 event_mask = (1 << kCloseEvent);
250 sd->MarkClosedRead();
251 }
252 } else {
253 // If POLLIN is set with no available data and no POLLHUP use
254 // recv to peek for whether the other end of the socket
255 // actually closed.
256 char buffer;
257 ssize_t bytesPeeked =
258 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
259 ASSERT(EAGAIN == EWOULDBLOCK);
260 if (bytesPeeked == 0) {
261 event_mask = (1 << kCloseEvent);
262 sd->MarkClosedRead();
263 } else if (errno != EWOULDBLOCK) {
264 fprintf(stderr, "Error recv: %s\n", strerror(errno));
265 }
266 }
267 } 266 }
268 } 267 }
269 268
270 // On pipes POLLHUP is reported without POLLIN when there is no 269 if (event->filter == EVFILT_WRITE) {
271 // more data to read. 270 if ((event->flags & EV_ERROR) != 0) {
272 if (sd->IsPipe()) {
273 if (((pollfd->revents & POLLIN) == 0) &&
274 ((pollfd->revents & POLLHUP) != 0)) {
275 event_mask = (1 << kCloseEvent);
276 sd->MarkClosedRead();
277 }
278 }
279
280 if ((pollfd->revents & POLLOUT) != 0) {
281 if ((pollfd->revents & POLLERR) != 0) {
282 event_mask = (1 << kErrorEvent); 271 event_mask = (1 << kErrorEvent);
283 sd->MarkClosedWrite(); 272 sd->MarkClosedWrite();
273 } else if ((event->flags & EV_EOF) != 0) {
274 // If the read end closed, close the write end as well,
Søren Gjesse 2012/02/08 10:40:36 Maybe rephrase "If the read end closed, close the
Mads Ager (google) 2012/02/08 12:15:15 Done.
275 // update the registration with kqueue and do not report
276 // a write event.
277 sd->MarkClosedWrite();
278 UpdateKqueue(kqueue_fd_, sd);
284 } else { 279 } else {
285 event_mask |= (1 << kOutEvent); 280 event_mask |= (1 << kOutEvent);
286 } 281 }
287 } 282 }
288 } 283 }
289 284
290 return event_mask; 285 return event_mask;
291 } 286 }
292 287
293 288
294 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, 289 void EventHandlerImplementation::HandleEvents(struct kevent* events,
295 int pollfds_size, 290 int size) {
296 int result_size) { 291 for (int i = 0; i < size; i++) {
297 if ((pollfds[0].revents & POLLIN) != 0) { 292 if (events[i].udata != NULL) {
298 result_size -= 1; 293 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
299 } 294 intptr_t event_mask = GetEvents(events + i, sd);
300 if (result_size > 0) {
301 for (int i = 1; i < pollfds_size; i++) {
302 /*
303 * The fd is unregistered. It gets re-registered when the request
304 * was handled by dart.
305 */
306 intptr_t event_mask = GetPollEvents(&pollfds[i]);
307 if (event_mask != 0) { 295 if (event_mask != 0) {
308 intptr_t fd = pollfds[i].fd; 296 // Unregister events for the file descriptor. Events will be
309 SocketData* sd = GetSocketData(fd); 297 // registered again when the current event has been handled in
298 // Dart code.
299 RemoveFromKqueue(kqueue_fd_, sd);
310 Dart_Port port = sd->port(); 300 Dart_Port port = sd->port();
311 ASSERT(port != 0); 301 ASSERT(port != 0);
312 sd->Unregister();
313 DartUtils::PostInt32(port, event_mask); 302 DartUtils::PostInt32(port, event_mask);
314 } 303 }
315 } 304 }
316 } 305 }
317 HandleInterruptFd(); 306 HandleInterruptFd();
318 } 307 }
319 308
320 309
321 intptr_t EventHandlerImplementation::GetTimeout() { 310 intptr_t EventHandlerImplementation::GetTimeout() {
322 if (timeout_ == kInfinityTimeout) { 311 if (timeout_ == kInfinityTimeout) {
323 return kInfinityTimeout; 312 return kInfinityTimeout;
324 } 313 }
325 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); 314 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds();
326 return (millis < 0) ? 0 : millis; 315 return (millis < 0) ? 0 : millis;
327 } 316 }
328 317
329 318
330 void EventHandlerImplementation::HandleTimeout() { 319 void EventHandlerImplementation::HandleTimeout() {
331 if (timeout_ != kInfinityTimeout) { 320 if (timeout_ != kInfinityTimeout) {
332 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); 321 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds();
333 if (millis <= 0) { 322 if (millis <= 0) {
334 DartUtils::PostNull(timeout_port_); 323 DartUtils::PostNull(timeout_port_);
335 timeout_ = kInfinityTimeout; 324 timeout_ = kInfinityTimeout;
336 timeout_port_ = 0; 325 timeout_port_ = 0;
337 } 326 }
338 } 327 }
339 } 328 }
340 329
341 330
342 void EventHandlerImplementation::Poll(uword args) { 331 void EventHandlerImplementation::EventHandlerEntry(uword args) {
343 intptr_t pollfds_size; 332 static const intptr_t kMaxEvents = 16;
344 struct pollfd* pollfds; 333 struct kevent events[kMaxEvents];
345 EventHandlerImplementation* handler = 334 EventHandlerImplementation* handler =
346 reinterpret_cast<EventHandlerImplementation*>(args); 335 reinterpret_cast<EventHandlerImplementation*>(args);
347 ASSERT(handler != NULL); 336 ASSERT(handler != NULL);
348 while (1) { 337 while (1) {
349 pollfds = handler->GetPollFds(&pollfds_size);
350 intptr_t millis = handler->GetTimeout(); 338 intptr_t millis = handler->GetTimeout();
351 intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); 339 struct timespec ts;
352 ASSERT(EAGAIN == EWOULDBLOCK); 340 int64_t secs = 0;
341 int64_t nanos = 0;
342 if (millis > 0) {
343 secs = millis / 1000;
344 nanos = (millis - (secs * 1000)) * 1000000;
345 }
346 ts.tv_sec = secs;
347 ts.tv_nsec = nanos;
348 intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_,
349 NULL,
350 0,
351 events,
352 kMaxEvents,
353 &ts));
353 if (result == -1) { 354 if (result == -1) {
354 if (errno != EWOULDBLOCK) { 355 perror("kevent failed");
355 perror("Poll failed"); 356 FATAL("kevent failed\n");
356 }
357 } else { 357 } else {
358 handler->HandleTimeout(); 358 handler->HandleTimeout();
359 handler->HandleEvents(pollfds, pollfds_size, result); 359 handler->HandleEvents(events, result);
360 } 360 }
361 free(pollfds);
362 } 361 }
363 } 362 }
364 363
365 364
366 void EventHandlerImplementation::StartEventHandler() { 365 void EventHandlerImplementation::StartEventHandler() {
367 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, 366 int result =
368 reinterpret_cast<uword>(this)); 367 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry,
368 reinterpret_cast<uword>(this));
369 if (result != 0) { 369 if (result != 0) {
370 FATAL1("Failed to start event handler thread %d", result); 370 FATAL1("Failed to start event handler thread %d", result);
371 } 371 }
372 } 372 }
373 373
374 374
375 void EventHandlerImplementation::SendData(intptr_t id, 375 void EventHandlerImplementation::SendData(intptr_t id,
376 Dart_Port dart_port, 376 Dart_Port dart_port,
377 intptr_t data) { 377 intptr_t data) {
378 WakeupHandler(id, dart_port, data); 378 WakeupHandler(id, dart_port, data);
379 } 379 }
380 380
381 381
382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { 382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
383 // The hashmap does not support keys with value 0. 383 // The hashmap does not support keys with value 0.
384 return reinterpret_cast<void*>(fd + 1); 384 return reinterpret_cast<void*>(fd + 1);
385 } 385 }
386 386
387 387
388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { 388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
389 // The hashmap does not support keys with value 0. 389 // The hashmap does not support keys with value 0.
390 return dart::Utils::WordHash(fd + 1); 390 return dart::Utils::WordHash(fd + 1);
391 } 391 }
OLDNEW
« runtime/bin/eventhandler_macos.h ('K') | « runtime/bin/eventhandler_macos.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698