OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "bin/eventhandler.h" | 5 #include "bin/eventhandler.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <poll.h> | |
9 #include <pthread.h> | 8 #include <pthread.h> |
10 #include <stdio.h> | 9 #include <stdio.h> |
11 #include <string.h> | 10 #include <string.h> |
| 11 #include <sys/event.h> |
12 #include <sys/time.h> | 12 #include <sys/time.h> |
13 #include <unistd.h> | 13 #include <unistd.h> |
14 | 14 |
15 #include "bin/dartutils.h" | 15 #include "bin/dartutils.h" |
16 #include "bin/fdutils.h" | 16 #include "bin/fdutils.h" |
17 #include "bin/hashmap.h" | 17 #include "bin/hashmap.h" |
18 #include "platform/utils.h" | 18 #include "platform/utils.h" |
19 | 19 |
20 | 20 |
21 int64_t GetCurrentTimeMilliseconds() { | 21 int64_t GetCurrentTimeMilliseconds() { |
22 struct timeval tv; | 22 struct timeval tv; |
23 if (gettimeofday(&tv, NULL) < 0) { | 23 if (gettimeofday(&tv, NULL) < 0) { |
24 UNREACHABLE(); | 24 UNREACHABLE(); |
25 return 0; | 25 return 0; |
26 } | 26 } |
27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
28 } | 28 } |
29 | 29 |
30 | 30 |
31 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 31 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
32 static const int kInfinityTimeout = -1; | 32 static const int kInfinityTimeout = -1; |
33 static const int kTimerId = -1; | 33 static const int kTimerId = -1; |
34 | 34 |
35 | 35 |
36 intptr_t SocketData::GetPollEvents() { | 36 bool SocketData::HasReadEvent() { |
37 // Do not ask for POLLERR and POLLHUP explicitly as they are | 37 return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0); |
38 // triggered anyway. | |
39 intptr_t events = 0; | |
40 if (!IsClosedRead()) { | |
41 if ((mask_ & (1 << kInEvent)) != 0) { | |
42 events |= POLLIN; | |
43 } | |
44 } | |
45 if (!IsClosedWrite()) { | |
46 if ((mask_ & (1 << kOutEvent)) != 0) { | |
47 events |= POLLOUT; | |
48 } | |
49 } | |
50 return events; | |
51 } | 38 } |
52 | 39 |
53 | 40 |
| 41 bool SocketData::HasWriteEvent() { |
| 42 return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0); |
| 43 } |
| 44 |
| 45 |
| 46 // Unregister the file descriptor for a SocketData structure with kqueue. |
| 47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
| 48 static const intptr_t kMaxChanges = 2; |
| 49 intptr_t changes = 0; |
| 50 struct kevent events[kMaxChanges]; |
| 51 if (sd->read_tracked_by_kqueue()) { |
| 52 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
| 53 ++changes; |
| 54 sd->set_read_tracked_by_kqueue(false); |
| 55 } |
| 56 if (sd->write_tracked_by_kqueue()) { |
| 57 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd); |
| 58 ++changes; |
| 59 sd->set_write_tracked_by_kqueue(false); |
| 60 } |
| 61 if (changes > 0) { |
| 62 ASSERT(changes < kMaxChanges); |
| 63 int status = |
| 64 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
| 65 if (status == -1) { |
| 66 FATAL("Failed deleting events from kqueue"); |
| 67 } |
| 68 } |
| 69 } |
| 70 |
| 71 |
| 72 // Update the kqueue registration for SocketData structure to reflect |
| 73 // the events currently of interest. |
| 74 static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) { |
| 75 static const intptr_t kMaxChanges = 2; |
| 76 intptr_t changes = 0; |
| 77 struct kevent events[kMaxChanges]; |
| 78 if (sd->port() != 0) { |
| 79 // Register or unregister READ filter if needed. |
| 80 if (sd->HasReadEvent()) { |
| 81 if (!sd->read_tracked_by_kqueue()) { |
| 82 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd); |
| 83 ++changes; |
| 84 sd->set_read_tracked_by_kqueue(true); |
| 85 } |
| 86 } else if (sd->read_tracked_by_kqueue()) { |
| 87 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); |
| 88 ++changes; |
| 89 sd->set_read_tracked_by_kqueue(false); |
| 90 } |
| 91 // Register or unregister WRITE filter if needed. |
| 92 if (sd->HasWriteEvent()) { |
| 93 if (!sd->write_tracked_by_kqueue()) { |
| 94 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd); |
| 95 ++changes; |
| 96 sd->set_write_tracked_by_kqueue(true); |
| 97 } |
| 98 } else if (sd->write_tracked_by_kqueue()) { |
| 99 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); |
| 100 ++changes; |
| 101 sd->set_write_tracked_by_kqueue(false); |
| 102 } |
| 103 } |
| 104 if (changes > 0) { |
| 105 ASSERT(changes < kMaxChanges); |
| 106 int status = |
| 107 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); |
| 108 if (status == -1) { |
| 109 FATAL("Failed updating kqueue"); |
| 110 } |
| 111 } |
| 112 } |
| 113 |
| 114 |
54 EventHandlerImplementation::EventHandlerImplementation() | 115 EventHandlerImplementation::EventHandlerImplementation() |
55 : socket_map_(&HashMap::SamePointerValue, 16) { | 116 : socket_map_(&HashMap::SamePointerValue, 16) { |
56 intptr_t result; | 117 intptr_t result; |
57 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 118 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
58 if (result != 0) { | 119 if (result != 0) { |
59 FATAL("Pipe creation failed"); | 120 FATAL("Pipe creation failed"); |
60 } | 121 } |
61 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 122 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
62 timeout_ = kInfinityTimeout; | 123 timeout_ = kInfinityTimeout; |
63 timeout_port_ = 0; | 124 timeout_port_ = 0; |
| 125 |
| 126 kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue()); |
| 127 if (kqueue_fd_ == -1) { |
| 128 FATAL("Failed creating kqueue"); |
| 129 } |
| 130 // Register the interrupt_fd with the kqueue. |
| 131 struct kevent event; |
| 132 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL); |
| 133 int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL)); |
| 134 if (status == -1) { |
| 135 FATAL("Failed adding interrupt fd to kqueue"); |
| 136 } |
64 } | 137 } |
65 | 138 |
66 | 139 |
67 EventHandlerImplementation::~EventHandlerImplementation() { | 140 EventHandlerImplementation::~EventHandlerImplementation() { |
68 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 141 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
69 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 142 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
70 } | 143 } |
71 | 144 |
72 | 145 |
73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 146 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
74 ASSERT(fd >= 0); | 147 ASSERT(fd >= 0); |
75 HashMap::Entry* entry = socket_map_.Lookup( | 148 HashMap::Entry* entry = socket_map_.Lookup( |
76 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); | 149 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
77 ASSERT(entry != NULL); | 150 ASSERT(entry != NULL); |
78 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | 151 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
79 if (sd == NULL) { | 152 if (sd == NULL) { |
80 // If there is no data in the hash map for this file descriptor | 153 // If there is no data in the hash map for this file descriptor a |
81 // then this is inserting a new SocketData for the file descriptor. | 154 // new SocketData for the file descriptor is inserted. |
82 sd = new SocketData(fd); | 155 sd = new SocketData(fd); |
83 entry->value = sd; | 156 entry->value = sd; |
84 } | 157 } |
85 ASSERT(fd == sd->fd()); | 158 ASSERT(fd == sd->fd()); |
86 return sd; | 159 return sd; |
87 } | 160 } |
88 | 161 |
89 | 162 |
90 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 163 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
91 Dart_Port dart_port, | 164 Dart_Port dart_port, |
92 int64_t data) { | 165 int64_t data) { |
93 InterruptMessage msg; | 166 InterruptMessage msg; |
94 msg.id = id; | 167 msg.id = id; |
95 msg.dart_port = dart_port; | 168 msg.dart_port = dart_port; |
96 msg.data = data; | 169 msg.data = data; |
97 intptr_t result = | 170 intptr_t result = |
98 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 171 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
99 if (result != kInterruptMessageSize) { | 172 if (result != kInterruptMessageSize) { |
100 if (result == -1) { | 173 if (result == -1) { |
101 perror("Interrupt message failure:"); | 174 perror("Interrupt message failure:"); |
102 } | 175 } |
103 FATAL1("Interrupt message failure. Wrote %d bytes.", result); | 176 FATAL1("Interrupt message failure. Wrote %d bytes.", result); |
104 } | 177 } |
105 } | 178 } |
106 | 179 |
107 | 180 |
108 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | |
109 struct pollfd* pollfds; | |
110 | |
111 // Calculate the number of file descriptors to poll on. | |
112 intptr_t numPollfds = 1; | |
113 for (HashMap::Entry* entry = socket_map_.Start(); | |
114 entry != NULL; | |
115 entry = socket_map_.Next(entry)) { | |
116 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
117 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | |
118 } | |
119 | |
120 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | |
121 numPollfds)); | |
122 pollfds[0].fd = interrupt_fds_[0]; | |
123 pollfds[0].events |= POLLIN; | |
124 | |
125 int i = 1; | |
126 for (HashMap::Entry* entry = socket_map_.Start(); | |
127 entry != NULL; | |
128 entry = socket_map_.Next(entry)) { | |
129 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
130 intptr_t events = sd->GetPollEvents(); | |
131 if (sd->port() > 0 && events != 0) { | |
132 // Fd is added to the poll set. | |
133 pollfds[i].fd = sd->fd(); | |
134 pollfds[i].events = events; | |
135 i++; | |
136 } | |
137 } | |
138 ASSERT(numPollfds == i); | |
139 *pollfds_size = i; | |
140 | |
141 return pollfds; | |
142 } | |
143 | |
144 | |
145 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 181 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
146 int total_read = 0; | 182 int total_read = 0; |
147 int bytes_read = | 183 int bytes_read = |
148 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 184 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
149 if (bytes_read < 0) { | 185 if (bytes_read < 0) { |
150 return false; | 186 return false; |
151 } | 187 } |
152 total_read = bytes_read; | 188 total_read = bytes_read; |
153 while (total_read < kInterruptMessageSize) { | 189 while (total_read < kInterruptMessageSize) { |
154 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], | 190 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], |
(...skipping 11 matching lines...) Expand all Loading... |
166 while (GetInterruptMessage(&msg)) { | 202 while (GetInterruptMessage(&msg)) { |
167 if (msg.id == kTimerId) { | 203 if (msg.id == kTimerId) { |
168 timeout_ = msg.data; | 204 timeout_ = msg.data; |
169 timeout_port_ = msg.dart_port; | 205 timeout_port_ = msg.dart_port; |
170 } else { | 206 } else { |
171 SocketData* sd = GetSocketData(msg.id); | 207 SocketData* sd = GetSocketData(msg.id); |
172 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { | 208 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
173 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 209 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
174 // Close the socket for reading. | 210 // Close the socket for reading. |
175 sd->ShutdownRead(); | 211 sd->ShutdownRead(); |
| 212 UpdateKqueue(kqueue_fd_, sd); |
176 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 213 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
177 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 214 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
178 // Close the socket for writing. | 215 // Close the socket for writing. |
179 sd->ShutdownWrite(); | 216 sd->ShutdownWrite(); |
| 217 UpdateKqueue(kqueue_fd_, sd); |
180 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 218 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
181 ASSERT(msg.data == (1 << kCloseCommand)); | 219 ASSERT(msg.data == (1 << kCloseCommand)); |
182 // Close the socket and free system resources. | 220 // Close the socket and free system resources. |
| 221 RemoveFromKqueue(kqueue_fd_, sd); |
183 intptr_t fd = sd->fd(); | 222 intptr_t fd = sd->fd(); |
184 sd->Close(); | 223 sd->Close(); |
185 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | 224 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
186 delete sd; | 225 delete sd; |
187 } else { | 226 } else { |
188 // Setup events to wait for. | 227 // Setup events to wait for. |
189 sd->SetPortAndMask(msg.dart_port, msg.data); | 228 sd->SetPortAndMask(msg.dart_port, msg.data); |
| 229 UpdateKqueue(kqueue_fd_, sd); |
190 } | 230 } |
191 } | 231 } |
192 } | 232 } |
193 } | 233 } |
194 | 234 |
195 #ifdef DEBUG_POLL | |
196 static void PrintEventMask(struct pollfd* pollfd) { | |
197 printf("%d ", pollfd->fd); | |
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); | |
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); | |
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); | |
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); | |
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); | |
203 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); | |
204 int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL; | |
205 if ((pollfd->revents & ~all_events) != 0) { | |
206 printf("(and %08x) ", pollfd->revents & ~all_events); | |
207 } | |
208 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); | |
209 | 235 |
| 236 #ifdef DEBUG_KQUEUE |
| 237 static void PrintEventMask(intptr_t fd, struct kevent* event) { |
| 238 printf("%d ", fd); |
| 239 if (event->filter == EVFILT_READ) printf("EVFILT_READ "); |
| 240 if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE "); |
| 241 printf("flags: %x: ", event->flags); |
| 242 if ((event->flags & EV_EOF) != 0) printf("EV_EOF "); |
| 243 if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR "); |
| 244 printf("(available %d) ", FDUtils::AvailableBytes(fd)); |
210 printf("\n"); | 245 printf("\n"); |
211 } | 246 } |
212 #endif | 247 #endif |
213 | 248 |
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 249 |
215 #ifdef DEBUG_POLL | 250 intptr_t EventHandlerImplementation::GetEvents(struct kevent* event, |
216 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); | 251 SocketData* sd) { |
| 252 #ifdef DEBUG_KQUEUE |
| 253 PrintEventMask(sd->fd(), event); |
217 #endif | 254 #endif |
218 intptr_t event_mask = 0; | 255 intptr_t event_mask = 0; |
219 SocketData* sd = GetSocketData(pollfd->fd); | |
220 if (sd->IsListeningSocket()) { | 256 if (sd->IsListeningSocket()) { |
221 // For listening sockets the POLLIN event indicate that there are | 257 // On a listening socket the READ event means that there are |
222 // connections ready for accept unless accompanied with one of the | 258 // connections ready to be accepted. |
223 // other flags. | 259 if (event->filter == EVFILT_READ) { |
224 if ((pollfd->revents & POLLIN) != 0) { | 260 if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent); |
225 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 261 if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent); |
226 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | |
227 if (event_mask == 0) event_mask |= (1 << kInEvent); | 262 if (event_mask == 0) event_mask |= (1 << kInEvent); |
228 } | 263 } |
229 } else { | 264 } else { |
230 if ((pollfd->revents & POLLNVAL) != 0) { | |
231 return 0; | |
232 } | |
233 | |
234 // Prioritize data events over close and error events. | 265 // Prioritize data events over close and error events. |
235 if ((pollfd->revents & POLLIN) != 0) { | 266 if (event->filter == EVFILT_READ) { |
236 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { | 267 if (FDUtils::AvailableBytes(sd->fd()) != 0) { |
237 event_mask = (1 << kInEvent); | 268 event_mask = (1 << kInEvent); |
238 } else if (((pollfd->revents & POLLHUP) != 0)) { | 269 } else if ((event->flags & EV_EOF) != 0) { |
239 event_mask = (1 << kCloseEvent); | 270 event_mask = (1 << kCloseEvent); |
240 sd->MarkClosedRead(); | 271 sd->MarkClosedRead(); |
241 } else if ((pollfd->revents & POLLERR) != 0) { | 272 } else if ((event->flags & EV_ERROR) != 0) { |
242 event_mask = (1 << kErrorEvent); | 273 event_mask = (1 << kErrorEvent); |
243 } else { | |
244 if (sd->IsPipe()) { | |
245 // When reading from stdin (either from a terminal or piped | |
246 // input) treat POLLIN with 0 available bytes as | |
247 // end-of-file. | |
248 if (sd->fd() == STDIN_FILENO) { | |
249 event_mask = (1 << kCloseEvent); | |
250 sd->MarkClosedRead(); | |
251 } | |
252 } else { | |
253 // If POLLIN is set with no available data and no POLLHUP use | |
254 // recv to peek for whether the other end of the socket | |
255 // actually closed. | |
256 char buffer; | |
257 ssize_t bytesPeeked = | |
258 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK)); | |
259 ASSERT(EAGAIN == EWOULDBLOCK); | |
260 if (bytesPeeked == 0) { | |
261 event_mask = (1 << kCloseEvent); | |
262 sd->MarkClosedRead(); | |
263 } else if (errno != EWOULDBLOCK) { | |
264 fprintf(stderr, "Error recv: %s\n", strerror(errno)); | |
265 } | |
266 } | |
267 } | 274 } |
268 } | 275 } |
269 | 276 |
270 // On pipes POLLHUP is reported without POLLIN when there is no | 277 if (event->filter == EVFILT_WRITE) { |
271 // more data to read. | 278 if ((event->flags & EV_ERROR) != 0) { |
272 if (sd->IsPipe()) { | |
273 if (((pollfd->revents & POLLIN) == 0) && | |
274 ((pollfd->revents & POLLHUP) != 0)) { | |
275 event_mask = (1 << kCloseEvent); | |
276 sd->MarkClosedRead(); | |
277 } | |
278 } | |
279 | |
280 if ((pollfd->revents & POLLOUT) != 0) { | |
281 if ((pollfd->revents & POLLERR) != 0) { | |
282 event_mask = (1 << kErrorEvent); | 279 event_mask = (1 << kErrorEvent); |
283 sd->MarkClosedWrite(); | 280 sd->MarkClosedWrite(); |
| 281 } else if ((event->flags & EV_EOF) != 0) { |
| 282 // If the receiver closed for reading, close for writing, |
| 283 // update the registration with kqueue, and do not report a |
| 284 // write event. |
| 285 sd->MarkClosedWrite(); |
| 286 UpdateKqueue(kqueue_fd_, sd); |
284 } else { | 287 } else { |
285 event_mask |= (1 << kOutEvent); | 288 event_mask |= (1 << kOutEvent); |
286 } | 289 } |
287 } | 290 } |
288 } | 291 } |
289 | 292 |
290 return event_mask; | 293 return event_mask; |
291 } | 294 } |
292 | 295 |
293 | 296 |
294 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, | 297 void EventHandlerImplementation::HandleEvents(struct kevent* events, |
295 int pollfds_size, | 298 int size) { |
296 int result_size) { | 299 for (int i = 0; i < size; i++) { |
297 if ((pollfds[0].revents & POLLIN) != 0) { | 300 if (events[i].udata != NULL) { |
298 result_size -= 1; | 301 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); |
299 } | 302 intptr_t event_mask = GetEvents(events + i, sd); |
300 if (result_size > 0) { | |
301 for (int i = 1; i < pollfds_size; i++) { | |
302 /* | |
303 * The fd is unregistered. It gets re-registered when the request | |
304 * was handled by dart. | |
305 */ | |
306 intptr_t event_mask = GetPollEvents(&pollfds[i]); | |
307 if (event_mask != 0) { | 303 if (event_mask != 0) { |
308 intptr_t fd = pollfds[i].fd; | 304 // Unregister events for the file descriptor. Events will be |
309 SocketData* sd = GetSocketData(fd); | 305 // registered again when the current event has been handled in |
| 306 // Dart code. |
| 307 RemoveFromKqueue(kqueue_fd_, sd); |
310 Dart_Port port = sd->port(); | 308 Dart_Port port = sd->port(); |
311 ASSERT(port != 0); | 309 ASSERT(port != 0); |
312 sd->Unregister(); | |
313 DartUtils::PostInt32(port, event_mask); | 310 DartUtils::PostInt32(port, event_mask); |
314 } | 311 } |
315 } | 312 } |
316 } | 313 } |
317 HandleInterruptFd(); | 314 HandleInterruptFd(); |
318 } | 315 } |
319 | 316 |
320 | 317 |
321 intptr_t EventHandlerImplementation::GetTimeout() { | 318 intptr_t EventHandlerImplementation::GetTimeout() { |
322 if (timeout_ == kInfinityTimeout) { | 319 if (timeout_ == kInfinityTimeout) { |
323 return kInfinityTimeout; | 320 return kInfinityTimeout; |
324 } | 321 } |
325 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 322 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
326 return (millis < 0) ? 0 : millis; | 323 return (millis < 0) ? 0 : millis; |
327 } | 324 } |
328 | 325 |
329 | 326 |
330 void EventHandlerImplementation::HandleTimeout() { | 327 void EventHandlerImplementation::HandleTimeout() { |
331 if (timeout_ != kInfinityTimeout) { | 328 if (timeout_ != kInfinityTimeout) { |
332 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 329 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
333 if (millis <= 0) { | 330 if (millis <= 0) { |
334 DartUtils::PostNull(timeout_port_); | 331 DartUtils::PostNull(timeout_port_); |
335 timeout_ = kInfinityTimeout; | 332 timeout_ = kInfinityTimeout; |
336 timeout_port_ = 0; | 333 timeout_port_ = 0; |
337 } | 334 } |
338 } | 335 } |
339 } | 336 } |
340 | 337 |
341 | 338 |
342 void EventHandlerImplementation::Poll(uword args) { | 339 void EventHandlerImplementation::EventHandlerEntry(uword args) { |
343 intptr_t pollfds_size; | 340 static const intptr_t kMaxEvents = 16; |
344 struct pollfd* pollfds; | 341 struct kevent events[kMaxEvents]; |
345 EventHandlerImplementation* handler = | 342 EventHandlerImplementation* handler = |
346 reinterpret_cast<EventHandlerImplementation*>(args); | 343 reinterpret_cast<EventHandlerImplementation*>(args); |
347 ASSERT(handler != NULL); | 344 ASSERT(handler != NULL); |
348 while (1) { | 345 while (1) { |
349 pollfds = handler->GetPollFds(&pollfds_size); | |
350 intptr_t millis = handler->GetTimeout(); | 346 intptr_t millis = handler->GetTimeout(); |
351 intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); | 347 struct timespec ts; |
352 ASSERT(EAGAIN == EWOULDBLOCK); | 348 int64_t secs = 0; |
| 349 int64_t nanos = 0; |
| 350 if (millis > 0) { |
| 351 secs = millis / 1000; |
| 352 nanos = (millis - (secs * 1000)) * 1000000; |
| 353 } |
| 354 ts.tv_sec = secs; |
| 355 ts.tv_nsec = nanos; |
| 356 intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_, |
| 357 NULL, |
| 358 0, |
| 359 events, |
| 360 kMaxEvents, |
| 361 &ts)); |
353 if (result == -1) { | 362 if (result == -1) { |
354 if (errno != EWOULDBLOCK) { | 363 perror("kevent failed"); |
355 perror("Poll failed"); | 364 FATAL("kevent failed\n"); |
356 } | |
357 } else { | 365 } else { |
358 handler->HandleTimeout(); | 366 handler->HandleTimeout(); |
359 handler->HandleEvents(pollfds, pollfds_size, result); | 367 handler->HandleEvents(events, result); |
360 } | 368 } |
361 free(pollfds); | |
362 } | 369 } |
363 } | 370 } |
364 | 371 |
365 | 372 |
366 void EventHandlerImplementation::StartEventHandler() { | 373 void EventHandlerImplementation::StartEventHandler() { |
367 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, | 374 int result = |
368 reinterpret_cast<uword>(this)); | 375 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry, |
| 376 reinterpret_cast<uword>(this)); |
369 if (result != 0) { | 377 if (result != 0) { |
370 FATAL1("Failed to start event handler thread %d", result); | 378 FATAL1("Failed to start event handler thread %d", result); |
371 } | 379 } |
372 } | 380 } |
373 | 381 |
374 | 382 |
375 void EventHandlerImplementation::SendData(intptr_t id, | 383 void EventHandlerImplementation::SendData(intptr_t id, |
376 Dart_Port dart_port, | 384 Dart_Port dart_port, |
377 intptr_t data) { | 385 intptr_t data) { |
378 WakeupHandler(id, dart_port, data); | 386 WakeupHandler(id, dart_port, data); |
379 } | 387 } |
380 | 388 |
381 | 389 |
382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | 390 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
383 // The hashmap does not support keys with value 0. | 391 // The hashmap does not support keys with value 0. |
384 return reinterpret_cast<void*>(fd + 1); | 392 return reinterpret_cast<void*>(fd + 1); |
385 } | 393 } |
386 | 394 |
387 | 395 |
388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | 396 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
389 // The hashmap does not support keys with value 0. | 397 // The hashmap does not support keys with value 0. |
390 return dart::Utils::WordHash(fd + 1); | 398 return dart::Utils::WordHash(fd + 1); |
391 } | 399 } |
OLD | NEW |