OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include "bin/eventhandler.h" | 5 #include "bin/eventhandler.h" |
6 | 6 |
7 #include <errno.h> | 7 #include <errno.h> |
8 #include <poll.h> | |
9 #include <pthread.h> | 8 #include <pthread.h> |
10 #include <stdio.h> | 9 #include <stdio.h> |
11 #include <string.h> | 10 #include <string.h> |
11 #include <sys/event.h> | |
12 #include <sys/time.h> | 12 #include <sys/time.h> |
13 #include <unistd.h> | 13 #include <unistd.h> |
14 | 14 |
15 #include "bin/dartutils.h" | 15 #include "bin/dartutils.h" |
16 #include "bin/fdutils.h" | 16 #include "bin/fdutils.h" |
17 #include "bin/hashmap.h" | 17 #include "bin/hashmap.h" |
18 #include "platform/utils.h" | 18 #include "platform/utils.h" |
19 | 19 |
20 | 20 |
21 int64_t GetCurrentTimeMilliseconds() { | 21 int64_t GetCurrentTimeMilliseconds() { |
22 struct timeval tv; | 22 struct timeval tv; |
23 if (gettimeofday(&tv, NULL) < 0) { | 23 if (gettimeofday(&tv, NULL) < 0) { |
24 UNREACHABLE(); | 24 UNREACHABLE(); |
25 return 0; | 25 return 0; |
26 } | 26 } |
27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 27 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
28 } | 28 } |
29 | 29 |
30 | 30 |
31 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 31 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
32 static const int kInfinityTimeout = -1; | 32 static const int kInfinityTimeout = -1; |
33 static const int kTimerId = -1; | 33 static const int kTimerId = -1; |
34 | 34 |
35 | 35 |
36 intptr_t SocketData::GetPollEvents() { | 36 bool SocketData::HasReadEvent() { |
37 // Do not ask for POLLERR and POLLHUP explicitly as they are | 37 return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0); |
38 // triggered anyway. | |
39 intptr_t events = 0; | |
40 if (!IsClosedRead()) { | |
41 if ((mask_ & (1 << kInEvent)) != 0) { | |
42 events |= POLLIN; | |
43 } | |
44 } | |
45 if (!IsClosedWrite()) { | |
46 if ((mask_ & (1 << kOutEvent)) != 0) { | |
47 events |= POLLOUT; | |
48 } | |
49 } | |
50 return events; | |
51 } | 38 } |
52 | 39 |
53 | 40 |
41 bool SocketData::HasWriteEvent() { | |
42 return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0); | |
43 } | |
44 | |
45 | |
46 // Unregister the file descriptor for a SocketData structure with kqueue. | |
47 static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) { | |
Søren Gjesse
2012/02/08 10:40:36
Maybe add constant kMaxChanges = 2.
Mads Ager (google)
2012/02/08 12:15:15
Done.
| |
48 intptr_t changes = 0; | |
49 struct kevent events[2]; | |
50 if (sd->read_tracked_by_kqueue()) { | |
51 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); | |
52 ++changes; | |
53 sd->set_read_tracked_by_kqueue(false); | |
54 } | |
55 if (sd->write_tracked_by_kqueue()) { | |
56 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd); | |
57 ++changes; | |
58 sd->set_write_tracked_by_kqueue(false); | |
59 } | |
60 if (changes > 0) { | |
Søren Gjesse
2012/02/08 10:40:36
And ASSERT(changes <= kMaxChanges)
Mads Ager (google)
2012/02/08 12:15:15
Done.
| |
61 int status = | |
62 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); | |
63 if (status == -1) { | |
64 FATAL("Failed deleting events from kqueue"); | |
65 } | |
66 } | |
67 } | |
68 | |
69 | |
70 // Update the kqueue registration for SocketData structure to reflect | |
71 // the events currently of interest. | |
72 static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) { | |
73 intptr_t changes = 0; | |
Søren Gjesse
2012/02/08 10:40:36
Ditto constant.
Mads Ager (google)
2012/02/08 12:15:15
Done.
| |
74 struct kevent events[2]; | |
75 if (sd->port() != 0) { | |
76 // Register or unregister READ filter if needed. | |
77 if (sd->HasReadEvent()) { | |
Søren Gjesse
2012/02/08 10:40:36
Is sd->read_tracked_by_kqueue() always false here?
Mads Ager (google)
2012/02/08 12:15:15
It is not always false. What is there now is corre
| |
78 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd); | |
79 ++changes; | |
80 sd->set_read_tracked_by_kqueue(true); | |
81 } else if (sd->read_tracked_by_kqueue()) { | |
82 EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL); | |
83 ++changes; | |
84 sd->set_read_tracked_by_kqueue(false); | |
85 } | |
86 // Register or unregister WRITE filter if needed. | |
87 if (sd->HasWriteEvent()) { | |
Søren Gjesse
2012/02/08 10:40:36
Ditto, is sd->write_tracked_by_kqueue() is always
Mads Ager (google)
2012/02/08 12:15:15
Same as above. Added explicit checking.
| |
88 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd); | |
89 ++changes; | |
90 sd->set_write_tracked_by_kqueue(true); | |
91 } else if (sd->write_tracked_by_kqueue()) { | |
92 EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); | |
93 ++changes; | |
94 sd->set_write_tracked_by_kqueue(false); | |
95 } | |
96 } | |
97 if (changes > 0) { | |
98 int status = | |
99 TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL)); | |
100 if (status == -1) { | |
101 FATAL("Failed updating kqueue"); | |
102 } | |
103 } | |
104 } | |
105 | |
106 | |
54 EventHandlerImplementation::EventHandlerImplementation() | 107 EventHandlerImplementation::EventHandlerImplementation() |
55 : socket_map_(&HashMap::SamePointerValue, 16) { | 108 : socket_map_(&HashMap::SamePointerValue, 16) { |
56 intptr_t result; | 109 intptr_t result; |
57 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 110 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
58 if (result != 0) { | 111 if (result != 0) { |
59 FATAL("Pipe creation failed"); | 112 FATAL("Pipe creation failed"); |
60 } | 113 } |
61 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 114 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
62 timeout_ = kInfinityTimeout; | 115 timeout_ = kInfinityTimeout; |
63 timeout_port_ = 0; | 116 timeout_port_ = 0; |
117 | |
118 kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue()); | |
119 if (kqueue_fd_ == -1) { | |
120 FATAL("Failed creating kqueue"); | |
121 } | |
122 // Register the interrupt_fd with the kqueue. | |
123 struct kevent event; | |
124 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL); | |
125 int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL)); | |
126 if (status == -1) { | |
127 FATAL("Failed adding interrupt fd to kqueue"); | |
128 } | |
64 } | 129 } |
65 | 130 |
66 | 131 |
67 EventHandlerImplementation::~EventHandlerImplementation() { | 132 EventHandlerImplementation::~EventHandlerImplementation() { |
68 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 133 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
69 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 134 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
70 } | 135 } |
71 | 136 |
72 | 137 |
73 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 138 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
74 ASSERT(fd >= 0); | 139 ASSERT(fd >= 0); |
75 HashMap::Entry* entry = socket_map_.Lookup( | 140 HashMap::Entry* entry = socket_map_.Lookup( |
76 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); | 141 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
77 ASSERT(entry != NULL); | 142 ASSERT(entry != NULL); |
78 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | 143 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
79 if (sd == NULL) { | 144 if (sd == NULL) { |
80 // If there is no data in the hash map for this file descriptor | 145 // If there is no data in the hash map for this file descriptor a |
81 // then this is inserting a new SocketData for the file descriptor. | 146 // new SocketData for the file descriptor is inserted. |
82 sd = new SocketData(fd); | 147 sd = new SocketData(fd); |
83 entry->value = sd; | 148 entry->value = sd; |
84 } | 149 } |
85 ASSERT(fd == sd->fd()); | 150 ASSERT(fd == sd->fd()); |
86 return sd; | 151 return sd; |
87 } | 152 } |
88 | 153 |
89 | 154 |
90 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 155 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
91 Dart_Port dart_port, | 156 Dart_Port dart_port, |
92 int64_t data) { | 157 int64_t data) { |
93 InterruptMessage msg; | 158 InterruptMessage msg; |
94 msg.id = id; | 159 msg.id = id; |
95 msg.dart_port = dart_port; | 160 msg.dart_port = dart_port; |
96 msg.data = data; | 161 msg.data = data; |
97 intptr_t result = | 162 intptr_t result = |
98 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 163 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
99 if (result != kInterruptMessageSize) { | 164 if (result != kInterruptMessageSize) { |
100 if (result == -1) { | 165 if (result == -1) { |
101 perror("Interrupt message failure:"); | 166 perror("Interrupt message failure:"); |
102 } | 167 } |
103 FATAL1("Interrupt message failure. Wrote %d bytes.", result); | 168 FATAL1("Interrupt message failure. Wrote %d bytes.", result); |
104 } | 169 } |
105 } | 170 } |
106 | 171 |
107 | 172 |
108 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | |
109 struct pollfd* pollfds; | |
110 | |
111 // Calculate the number of file descriptors to poll on. | |
112 intptr_t numPollfds = 1; | |
113 for (HashMap::Entry* entry = socket_map_.Start(); | |
114 entry != NULL; | |
115 entry = socket_map_.Next(entry)) { | |
116 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
117 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | |
118 } | |
119 | |
120 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | |
121 numPollfds)); | |
122 pollfds[0].fd = interrupt_fds_[0]; | |
123 pollfds[0].events |= POLLIN; | |
124 | |
125 int i = 1; | |
126 for (HashMap::Entry* entry = socket_map_.Start(); | |
127 entry != NULL; | |
128 entry = socket_map_.Next(entry)) { | |
129 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
130 intptr_t events = sd->GetPollEvents(); | |
131 if (sd->port() > 0 && events != 0) { | |
132 // Fd is added to the poll set. | |
133 pollfds[i].fd = sd->fd(); | |
134 pollfds[i].events = events; | |
135 i++; | |
136 } | |
137 } | |
138 ASSERT(numPollfds == i); | |
139 *pollfds_size = i; | |
140 | |
141 return pollfds; | |
142 } | |
143 | |
144 | |
145 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 173 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
146 int total_read = 0; | 174 int total_read = 0; |
147 int bytes_read = | 175 int bytes_read = |
148 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 176 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
149 if (bytes_read < 0) { | 177 if (bytes_read < 0) { |
150 return false; | 178 return false; |
151 } | 179 } |
152 total_read = bytes_read; | 180 total_read = bytes_read; |
153 while (total_read < kInterruptMessageSize) { | 181 while (total_read < kInterruptMessageSize) { |
154 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], | 182 bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0], |
(...skipping 11 matching lines...) Expand all Loading... | |
166 while (GetInterruptMessage(&msg)) { | 194 while (GetInterruptMessage(&msg)) { |
167 if (msg.id == kTimerId) { | 195 if (msg.id == kTimerId) { |
168 timeout_ = msg.data; | 196 timeout_ = msg.data; |
169 timeout_port_ = msg.dart_port; | 197 timeout_port_ = msg.dart_port; |
170 } else { | 198 } else { |
171 SocketData* sd = GetSocketData(msg.id); | 199 SocketData* sd = GetSocketData(msg.id); |
172 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { | 200 if ((msg.data & (1 << kShutdownReadCommand)) != 0) { |
173 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 201 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
174 // Close the socket for reading. | 202 // Close the socket for reading. |
175 sd->ShutdownRead(); | 203 sd->ShutdownRead(); |
204 UpdateKqueue(kqueue_fd_, sd); | |
176 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 205 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
177 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 206 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
178 // Close the socket for writing. | 207 // Close the socket for writing. |
179 sd->ShutdownWrite(); | 208 sd->ShutdownWrite(); |
209 UpdateKqueue(kqueue_fd_, sd); | |
180 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 210 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
181 ASSERT(msg.data == (1 << kCloseCommand)); | 211 ASSERT(msg.data == (1 << kCloseCommand)); |
182 // Close the socket and free system resources. | 212 // Close the socket and free system resources. |
213 RemoveFromKqueue(kqueue_fd_, sd); | |
183 intptr_t fd = sd->fd(); | 214 intptr_t fd = sd->fd(); |
184 sd->Close(); | 215 sd->Close(); |
185 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | 216 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
186 delete sd; | 217 delete sd; |
187 } else { | 218 } else { |
188 // Setup events to wait for. | 219 // Setup events to wait for. |
189 sd->SetPortAndMask(msg.dart_port, msg.data); | 220 sd->SetPortAndMask(msg.dart_port, msg.data); |
221 UpdateKqueue(kqueue_fd_, sd); | |
190 } | 222 } |
191 } | 223 } |
192 } | 224 } |
193 } | 225 } |
194 | 226 |
195 #ifdef DEBUG_POLL | |
196 static void PrintEventMask(struct pollfd* pollfd) { | |
197 printf("%d ", pollfd->fd); | |
198 if ((pollfd->revents & POLLIN) != 0) printf("POLLIN "); | |
199 if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI "); | |
200 if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT "); | |
201 if ((pollfd->revents & POLLERR) != 0) printf("POLLERR "); | |
202 if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP "); | |
203 if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL "); | |
204 int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL; | |
205 if ((pollfd->revents & ~all_events) != 0) { | |
206 printf("(and %08x) ", pollfd->revents & ~all_events); | |
207 } | |
208 printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd)); | |
209 | 227 |
228 #ifdef DEBUG_KQUEUE | |
229 static void PrintEventMask(intptr_t fd, struct kevent* event) { | |
230 printf("%d ", fd); | |
231 if (event->filter == EVFILT_READ) printf("EVFILT_READ "); | |
232 if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE "); | |
233 printf("flags: %x: ", event->flags); | |
234 if ((event->flags & EV_EOF) != 0) printf("EV_EOF "); | |
235 if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR "); | |
236 printf("(available %d) ", FDUtils::AvailableBytes(fd)); | |
210 printf("\n"); | 237 printf("\n"); |
211 } | 238 } |
212 #endif | 239 #endif |
213 | 240 |
214 intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) { | 241 |
215 #ifdef DEBUG_POLL | 242 intptr_t EventHandlerImplementation::GetEvents(struct kevent* event, |
Søren Gjesse
2012/02/08 10:40:36
Nice that we don't need special handling of pipes
Mads Ager (google)
2012/02/08 12:15:15
Yes, this is so much cleaner. :)
| |
216 if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd); | 243 SocketData* sd) { |
244 #ifdef DEBUG_KQUEUE | |
245 PrintEventMask(sd->fd(), event); | |
217 #endif | 246 #endif |
218 intptr_t event_mask = 0; | 247 intptr_t event_mask = 0; |
219 SocketData* sd = GetSocketData(pollfd->fd); | |
220 if (sd->IsListeningSocket()) { | 248 if (sd->IsListeningSocket()) { |
221 // For listening sockets the POLLIN event indicate that there are | 249 // On a listening socket the READ event means that there are |
222 // connections ready for accept unless accompanied with one of the | 250 // connections ready to be accepted. |
223 // other flags. | 251 if (event->filter == EVFILT_READ) { |
224 if ((pollfd->revents & POLLIN) != 0) { | 252 if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent); |
225 if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent); | 253 if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent); |
226 if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent); | |
227 if (event_mask == 0) event_mask |= (1 << kInEvent); | 254 if (event_mask == 0) event_mask |= (1 << kInEvent); |
228 } | 255 } |
229 } else { | 256 } else { |
230 if ((pollfd->revents & POLLNVAL) != 0) { | |
231 return 0; | |
232 } | |
233 | |
234 // Prioritize data events over close and error events. | 257 // Prioritize data events over close and error events. |
235 if ((pollfd->revents & POLLIN) != 0) { | 258 if (event->filter == EVFILT_READ) { |
236 if (FDUtils::AvailableBytes(pollfd->fd) != 0) { | 259 if (FDUtils::AvailableBytes(sd->fd()) != 0) { |
237 event_mask = (1 << kInEvent); | 260 event_mask = (1 << kInEvent); |
238 } else if (((pollfd->revents & POLLHUP) != 0)) { | 261 } else if ((event->flags & EV_EOF) != 0) { |
239 event_mask = (1 << kCloseEvent); | 262 event_mask = (1 << kCloseEvent); |
240 sd->MarkClosedRead(); | 263 sd->MarkClosedRead(); |
241 } else if ((pollfd->revents & POLLERR) != 0) { | 264 } else if ((event->flags & EV_ERROR) != 0) { |
242 event_mask = (1 << kErrorEvent); | 265 event_mask = (1 << kErrorEvent); |
243 } else { | |
244 if (sd->IsPipe()) { | |
245 // When reading from stdin (either from a terminal or piped | |
246 // input) treat POLLIN with 0 available bytes as | |
247 // end-of-file. | |
248 if (sd->fd() == STDIN_FILENO) { | |
249 event_mask = (1 << kCloseEvent); | |
250 sd->MarkClosedRead(); | |
251 } | |
252 } else { | |
253 // If POLLIN is set with no available data and no POLLHUP use | |
254 // recv to peek for whether the other end of the socket | |
255 // actually closed. | |
256 char buffer; | |
257 ssize_t bytesPeeked = | |
258 TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK)); | |
259 ASSERT(EAGAIN == EWOULDBLOCK); | |
260 if (bytesPeeked == 0) { | |
261 event_mask = (1 << kCloseEvent); | |
262 sd->MarkClosedRead(); | |
263 } else if (errno != EWOULDBLOCK) { | |
264 fprintf(stderr, "Error recv: %s\n", strerror(errno)); | |
265 } | |
266 } | |
267 } | 266 } |
268 } | 267 } |
269 | 268 |
270 // On pipes POLLHUP is reported without POLLIN when there is no | 269 if (event->filter == EVFILT_WRITE) { |
271 // more data to read. | 270 if ((event->flags & EV_ERROR) != 0) { |
272 if (sd->IsPipe()) { | |
273 if (((pollfd->revents & POLLIN) == 0) && | |
274 ((pollfd->revents & POLLHUP) != 0)) { | |
275 event_mask = (1 << kCloseEvent); | |
276 sd->MarkClosedRead(); | |
277 } | |
278 } | |
279 | |
280 if ((pollfd->revents & POLLOUT) != 0) { | |
281 if ((pollfd->revents & POLLERR) != 0) { | |
282 event_mask = (1 << kErrorEvent); | 271 event_mask = (1 << kErrorEvent); |
283 sd->MarkClosedWrite(); | 272 sd->MarkClosedWrite(); |
273 } else if ((event->flags & EV_EOF) != 0) { | |
274 // If the read end closed, close the write end as well, | |
Søren Gjesse
2012/02/08 10:40:36
Maybe rephrase "If the read end closed, close the
Mads Ager (google)
2012/02/08 12:15:15
Done.
| |
275 // update the registration with kqueue and do not report | |
276 // a write event. | |
277 sd->MarkClosedWrite(); | |
278 UpdateKqueue(kqueue_fd_, sd); | |
284 } else { | 279 } else { |
285 event_mask |= (1 << kOutEvent); | 280 event_mask |= (1 << kOutEvent); |
286 } | 281 } |
287 } | 282 } |
288 } | 283 } |
289 | 284 |
290 return event_mask; | 285 return event_mask; |
291 } | 286 } |
292 | 287 |
293 | 288 |
294 void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds, | 289 void EventHandlerImplementation::HandleEvents(struct kevent* events, |
295 int pollfds_size, | 290 int size) { |
296 int result_size) { | 291 for (int i = 0; i < size; i++) { |
297 if ((pollfds[0].revents & POLLIN) != 0) { | 292 if (events[i].udata != NULL) { |
298 result_size -= 1; | 293 SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata); |
299 } | 294 intptr_t event_mask = GetEvents(events + i, sd); |
300 if (result_size > 0) { | |
301 for (int i = 1; i < pollfds_size; i++) { | |
302 /* | |
303 * The fd is unregistered. It gets re-registered when the request | |
304 * was handled by dart. | |
305 */ | |
306 intptr_t event_mask = GetPollEvents(&pollfds[i]); | |
307 if (event_mask != 0) { | 295 if (event_mask != 0) { |
308 intptr_t fd = pollfds[i].fd; | 296 // Unregister events for the file descriptor. Events will be |
309 SocketData* sd = GetSocketData(fd); | 297 // registered again when the current event has been handled in |
298 // Dart code. | |
299 RemoveFromKqueue(kqueue_fd_, sd); | |
310 Dart_Port port = sd->port(); | 300 Dart_Port port = sd->port(); |
311 ASSERT(port != 0); | 301 ASSERT(port != 0); |
312 sd->Unregister(); | |
313 DartUtils::PostInt32(port, event_mask); | 302 DartUtils::PostInt32(port, event_mask); |
314 } | 303 } |
315 } | 304 } |
316 } | 305 } |
317 HandleInterruptFd(); | 306 HandleInterruptFd(); |
318 } | 307 } |
319 | 308 |
320 | 309 |
321 intptr_t EventHandlerImplementation::GetTimeout() { | 310 intptr_t EventHandlerImplementation::GetTimeout() { |
322 if (timeout_ == kInfinityTimeout) { | 311 if (timeout_ == kInfinityTimeout) { |
323 return kInfinityTimeout; | 312 return kInfinityTimeout; |
324 } | 313 } |
325 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 314 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
326 return (millis < 0) ? 0 : millis; | 315 return (millis < 0) ? 0 : millis; |
327 } | 316 } |
328 | 317 |
329 | 318 |
330 void EventHandlerImplementation::HandleTimeout() { | 319 void EventHandlerImplementation::HandleTimeout() { |
331 if (timeout_ != kInfinityTimeout) { | 320 if (timeout_ != kInfinityTimeout) { |
332 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); | 321 intptr_t millis = timeout_ - GetCurrentTimeMilliseconds(); |
333 if (millis <= 0) { | 322 if (millis <= 0) { |
334 DartUtils::PostNull(timeout_port_); | 323 DartUtils::PostNull(timeout_port_); |
335 timeout_ = kInfinityTimeout; | 324 timeout_ = kInfinityTimeout; |
336 timeout_port_ = 0; | 325 timeout_port_ = 0; |
337 } | 326 } |
338 } | 327 } |
339 } | 328 } |
340 | 329 |
341 | 330 |
342 void EventHandlerImplementation::Poll(uword args) { | 331 void EventHandlerImplementation::EventHandlerEntry(uword args) { |
343 intptr_t pollfds_size; | 332 static const intptr_t kMaxEvents = 16; |
344 struct pollfd* pollfds; | 333 struct kevent events[kMaxEvents]; |
345 EventHandlerImplementation* handler = | 334 EventHandlerImplementation* handler = |
346 reinterpret_cast<EventHandlerImplementation*>(args); | 335 reinterpret_cast<EventHandlerImplementation*>(args); |
347 ASSERT(handler != NULL); | 336 ASSERT(handler != NULL); |
348 while (1) { | 337 while (1) { |
349 pollfds = handler->GetPollFds(&pollfds_size); | |
350 intptr_t millis = handler->GetTimeout(); | 338 intptr_t millis = handler->GetTimeout(); |
351 intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis)); | 339 struct timespec ts; |
352 ASSERT(EAGAIN == EWOULDBLOCK); | 340 int64_t secs = 0; |
341 int64_t nanos = 0; | |
342 if (millis > 0) { | |
343 secs = millis / 1000; | |
344 nanos = (millis - (secs * 1000)) * 1000000; | |
345 } | |
346 ts.tv_sec = secs; | |
347 ts.tv_nsec = nanos; | |
348 intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_, | |
349 NULL, | |
350 0, | |
351 events, | |
352 kMaxEvents, | |
353 &ts)); | |
353 if (result == -1) { | 354 if (result == -1) { |
354 if (errno != EWOULDBLOCK) { | 355 perror("kevent failed"); |
355 perror("Poll failed"); | 356 FATAL("kevent failed\n"); |
356 } | |
357 } else { | 357 } else { |
358 handler->HandleTimeout(); | 358 handler->HandleTimeout(); |
359 handler->HandleEvents(pollfds, pollfds_size, result); | 359 handler->HandleEvents(events, result); |
360 } | 360 } |
361 free(pollfds); | |
362 } | 361 } |
363 } | 362 } |
364 | 363 |
365 | 364 |
366 void EventHandlerImplementation::StartEventHandler() { | 365 void EventHandlerImplementation::StartEventHandler() { |
367 int result = dart::Thread::Start(&EventHandlerImplementation::Poll, | 366 int result = |
368 reinterpret_cast<uword>(this)); | 367 dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry, |
368 reinterpret_cast<uword>(this)); | |
369 if (result != 0) { | 369 if (result != 0) { |
370 FATAL1("Failed to start event handler thread %d", result); | 370 FATAL1("Failed to start event handler thread %d", result); |
371 } | 371 } |
372 } | 372 } |
373 | 373 |
374 | 374 |
375 void EventHandlerImplementation::SendData(intptr_t id, | 375 void EventHandlerImplementation::SendData(intptr_t id, |
376 Dart_Port dart_port, | 376 Dart_Port dart_port, |
377 intptr_t data) { | 377 intptr_t data) { |
378 WakeupHandler(id, dart_port, data); | 378 WakeupHandler(id, dart_port, data); |
379 } | 379 } |
380 | 380 |
381 | 381 |
382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | 382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
383 // The hashmap does not support keys with value 0. | 383 // The hashmap does not support keys with value 0. |
384 return reinterpret_cast<void*>(fd + 1); | 384 return reinterpret_cast<void*>(fd + 1); |
385 } | 385 } |
386 | 386 |
387 | 387 |
388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | 388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
389 // The hashmap does not support keys with value 0. | 389 // The hashmap does not support keys with value 0. |
390 return dart::Utils::WordHash(fd + 1); | 390 return dart::Utils::WordHash(fd + 1); |
391 } | 391 } |
OLD | NEW |