OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include <errno.h> | 5 #include <errno.h> |
6 #include <poll.h> | 6 #include <poll.h> |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <stdio.h> | 8 #include <stdio.h> |
9 #include <string.h> | 9 #include <string.h> |
10 #include <sys/time.h> | 10 #include <sys/time.h> |
11 #include <unistd.h> | 11 #include <unistd.h> |
12 | 12 |
13 #include "bin/eventhandler.h" | 13 #include "bin/eventhandler.h" |
14 #include "bin/fdutils.h" | 14 #include "bin/fdutils.h" |
15 #include "bin/hashmap.h" | |
16 #include "platform/utils.h" | |
15 | 17 |
16 | 18 |
17 int64_t GetCurrentTimeMilliseconds() { | 19 int64_t GetCurrentTimeMilliseconds() { |
18 struct timeval tv; | 20 struct timeval tv; |
19 if (gettimeofday(&tv, NULL) < 0) { | 21 if (gettimeofday(&tv, NULL) < 0) { |
20 UNREACHABLE(); | 22 UNREACHABLE(); |
21 return 0; | 23 return 0; |
22 } | 24 } |
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 25 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
24 } | 26 } |
25 | 27 |
26 | 28 |
27 static const int kInitialPortMapSize = 16; | |
28 static const int kPortMapGrowingFactor = 2; | |
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
32 | 32 |
33 | 33 |
34 intptr_t SocketData::GetPollEvents() { | 34 intptr_t SocketData::GetPollEvents() { |
35 // Do not ask for POLLERR and POLLHUP explicitly as they are | 35 // Do not ask for POLLERR and POLLHUP explicitly as they are |
36 // triggered anyway. | 36 // triggered anyway. |
37 intptr_t events = 0; | 37 intptr_t events = 0; |
38 if (!IsClosedRead()) { | 38 if (!IsClosedRead()) { |
39 if ((mask_ & (1 << kInEvent)) != 0) { | 39 if ((mask_ & (1 << kInEvent)) != 0) { |
40 events |= POLLIN; | 40 events |= POLLIN; |
41 } | 41 } |
42 } | 42 } |
43 if (!IsClosedWrite()) { | 43 if (!IsClosedWrite()) { |
44 if ((mask_ & (1 << kOutEvent)) != 0) { | 44 if ((mask_ & (1 << kOutEvent)) != 0) { |
45 events |= POLLOUT; | 45 events |= POLLOUT; |
46 } | 46 } |
47 } | 47 } |
48 return events; | 48 return events; |
49 } | 49 } |
50 | 50 |
51 | 51 |
52 EventHandlerImplementation::EventHandlerImplementation() { | 52 EventHandlerImplementation::EventHandlerImplementation() { |
53 intptr_t result; | 53 intptr_t result; |
54 socket_map_size_ = kInitialPortMapSize; | |
55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, | |
56 sizeof(SocketData))); | |
57 ASSERT(socket_map_ != NULL); | |
58 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 54 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
59 if (result != 0) { | 55 if (result != 0) { |
60 FATAL("Pipe creation failed"); | 56 FATAL("Pipe creation failed"); |
61 } | 57 } |
62 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 58 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
63 timeout_ = kInfinityTimeout; | 59 timeout_ = kInfinityTimeout; |
64 timeout_port_ = 0; | 60 timeout_port_ = 0; |
65 } | 61 } |
66 | 62 |
67 | 63 |
68 EventHandlerImplementation::~EventHandlerImplementation() { | 64 EventHandlerImplementation::~EventHandlerImplementation() { |
69 free(socket_map_); | |
70 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 65 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
71 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 66 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
72 } | 67 } |
73 | 68 |
74 | 69 |
75 // TODO(hpayer): Use hash table instead of array. | |
76 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 70 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
77 ASSERT(fd >= 0); | 71 ASSERT(fd >= 0); |
78 if (fd >= socket_map_size_) { | 72 HashMap::Entry* entry = socket_map_.Lookup( |
79 intptr_t new_socket_map_size = socket_map_size_; | 73 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
80 do { | 74 ASSERT(entry != NULL); |
81 new_socket_map_size = new_socket_map_size * kPortMapGrowingFactor; | 75 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
82 } while (fd >= new_socket_map_size); | 76 if (sd == NULL) { |
83 size_t new_socket_map_bytes = new_socket_map_size * sizeof(SocketData); | 77 sd = new SocketData(fd); |
Ivan Posva
2012/01/20 21:42:57
Please add a comment making it clear that you are
Søren Gjesse
2012/01/23 09:03:49
Done.
| |
84 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, | 78 entry->value = sd; |
85 new_socket_map_bytes)); | |
86 ASSERT(socket_map_ != NULL); | |
87 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); | |
88 memset(socket_map_ + socket_map_size_, | |
89 0, | |
90 new_socket_map_bytes - socket_map_bytes); | |
91 socket_map_size_ = new_socket_map_size; | |
92 } | 79 } |
93 | 80 ASSERT(fd == sd->fd()); |
94 SocketData* sd = socket_map_ + fd; | |
95 sd->set_fd(fd); // For now just make sure the fd is set. | |
96 return sd; | 81 return sd; |
97 } | 82 } |
98 | 83 |
99 | 84 |
100 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 85 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
101 Dart_Port dart_port, | 86 Dart_Port dart_port, |
102 int64_t data) { | 87 int64_t data) { |
103 InterruptMessage msg; | 88 InterruptMessage msg; |
104 msg.id = id; | 89 msg.id = id; |
105 msg.dart_port = dart_port; | 90 msg.dart_port = dart_port; |
106 msg.data = data; | 91 msg.data = data; |
107 intptr_t result = | 92 intptr_t result = |
108 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 93 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
109 if (result != kInterruptMessageSize) { | 94 if (result != kInterruptMessageSize) { |
110 FATAL("Interrupt message failure"); | 95 FATAL("Interrupt message failure"); |
111 } | 96 } |
112 } | 97 } |
113 | 98 |
114 | 99 |
115 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 100 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
116 struct pollfd* pollfds; | 101 struct pollfd* pollfds; |
117 | 102 |
118 // Calculate the number of file descriptors to poll on. | 103 // Calculate the number of file descriptors to poll on. |
119 intptr_t numPollfds = 1; | 104 intptr_t numPollfds = 1; |
120 for (int i = 0; i < socket_map_size_; i++) { | 105 for (HashMap::Entry* entry = socket_map_.Start(); |
121 SocketData* sd = &socket_map_[i]; | 106 entry != NULL; |
107 entry = socket_map_.Next(entry)) { | |
108 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
122 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | 109 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
123 } | 110 } |
124 | 111 |
125 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 112 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
126 numPollfds)); | 113 numPollfds)); |
127 pollfds[0].fd = interrupt_fds_[0]; | 114 pollfds[0].fd = interrupt_fds_[0]; |
128 pollfds[0].events |= POLLIN; | 115 pollfds[0].events |= POLLIN; |
129 | 116 |
130 // TODO(hpayer): optimize the following iteration over the hash map | 117 int i = 1; |
131 int j = 1; | 118 for (HashMap::Entry* entry = socket_map_.Start(); |
132 for (int i = 0; i < socket_map_size_; i++) { | 119 entry != NULL; |
133 SocketData* sd = &socket_map_[i]; | 120 entry = socket_map_.Next(entry)) { |
121 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); | |
134 intptr_t events = sd->GetPollEvents(); | 122 intptr_t events = sd->GetPollEvents(); |
135 if (sd->port() > 0 && events != 0) { | 123 if (sd->port() > 0 && events != 0) { |
136 // Fd is added to the poll set. | 124 // Fd is added to the poll set. |
137 pollfds[j].fd = sd->fd(); | 125 pollfds[i].fd = sd->fd(); |
138 pollfds[j].events = events; | 126 pollfds[i].events = events; |
139 j++; | 127 i++; |
140 } | 128 } |
141 } | 129 } |
142 ASSERT(numPollfds == j); | 130 ASSERT(numPollfds == i); |
143 *pollfds_size = j; | 131 *pollfds_size = i; |
132 | |
144 return pollfds; | 133 return pollfds; |
145 } | 134 } |
146 | 135 |
147 | 136 |
148 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 137 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
149 int total_read = 0; | 138 int total_read = 0; |
150 int bytes_read = | 139 int bytes_read = |
151 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 140 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
152 if (bytes_read < 0) { | 141 if (bytes_read < 0) { |
153 return false; | 142 return false; |
(...skipping 22 matching lines...) Expand all Loading... | |
176 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 165 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
177 // Close the socket for reading. | 166 // Close the socket for reading. |
178 sd->ShutdownRead(); | 167 sd->ShutdownRead(); |
179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 168 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
180 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 169 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
181 // Close the socket for writing. | 170 // Close the socket for writing. |
182 sd->ShutdownWrite(); | 171 sd->ShutdownWrite(); |
183 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 172 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
184 ASSERT(msg.data == (1 << kCloseCommand)); | 173 ASSERT(msg.data == (1 << kCloseCommand)); |
185 // Close the socket and free system resources. | 174 // Close the socket and free system resources. |
175 intptr_t fd = sd->fd(); | |
186 sd->Close(); | 176 sd->Close(); |
177 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); | |
178 delete sd; | |
187 } else { | 179 } else { |
188 // Setup events to wait for. | 180 // Setup events to wait for. |
189 sd->SetPortAndMask(msg.dart_port, msg.data); | 181 sd->SetPortAndMask(msg.dart_port, msg.data); |
190 } | 182 } |
191 } | 183 } |
192 } | 184 } |
193 } | 185 } |
194 | 186 |
195 #ifdef DEBUG_POLL | 187 #ifdef DEBUG_POLL |
196 static void PrintEventMask(struct pollfd* pollfd) { | 188 static void PrintEventMask(struct pollfd* pollfd) { |
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
375 FATAL("Create start event handler thread"); | 367 FATAL("Create start event handler thread"); |
376 } | 368 } |
377 } | 369 } |
378 | 370 |
379 | 371 |
380 void EventHandlerImplementation::SendData(intptr_t id, | 372 void EventHandlerImplementation::SendData(intptr_t id, |
381 Dart_Port dart_port, | 373 Dart_Port dart_port, |
382 intptr_t data) { | 374 intptr_t data) { |
383 WakeupHandler(id, dart_port, data); | 375 WakeupHandler(id, dart_port, data); |
384 } | 376 } |
377 | |
378 | |
379 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { | |
380 // The hashmap does not support keys with value 0. | |
381 return reinterpret_cast<void*>(fd + 1); | |
382 } | |
383 | |
384 | |
385 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { | |
386 // The hashmap does not support keys with value 0. | |
387 return dart::Utils::WordHash(fd + 1); | |
388 } | |
OLD | NEW |