OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 #include <errno.h> | 5 #include <errno.h> |
6 #include <poll.h> | 6 #include <poll.h> |
7 #include <pthread.h> | 7 #include <pthread.h> |
8 #include <stdio.h> | 8 #include <stdio.h> |
9 #include <string.h> | 9 #include <string.h> |
10 #include <sys/time.h> | 10 #include <sys/time.h> |
11 #include <unistd.h> | 11 #include <unistd.h> |
12 | 12 |
13 #include "bin/eventhandler.h" | 13 #include "bin/eventhandler.h" |
14 #include "bin/fdutils.h" | 14 #include "bin/fdutils.h" |
| 15 #include "bin/hashmap.h" |
| 16 #include "platform/utils.h" |
15 | 17 |
16 | 18 |
17 int64_t GetCurrentTimeMilliseconds() { | 19 int64_t GetCurrentTimeMilliseconds() { |
18 struct timeval tv; | 20 struct timeval tv; |
19 if (gettimeofday(&tv, NULL) < 0) { | 21 if (gettimeofday(&tv, NULL) < 0) { |
20 UNREACHABLE(); | 22 UNREACHABLE(); |
21 return 0; | 23 return 0; |
22 } | 24 } |
23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 25 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
24 } | 26 } |
25 | 27 |
26 | 28 |
27 static const int kInitialPortMapSize = 16; | |
28 static const int kPortMapGrowingFactor = 2; | |
29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
32 | 32 |
33 | 33 |
34 intptr_t SocketData::GetPollEvents() { | 34 intptr_t SocketData::GetPollEvents() { |
35 // Do not ask for POLLERR and POLLHUP explicitly as they are | 35 // Do not ask for POLLERR and POLLHUP explicitly as they are |
36 // triggered anyway. | 36 // triggered anyway. |
37 intptr_t events = 0; | 37 intptr_t events = 0; |
38 if (!IsClosedRead()) { | 38 if (!IsClosedRead()) { |
39 if ((mask_ & (1 << kInEvent)) != 0) { | 39 if ((mask_ & (1 << kInEvent)) != 0) { |
40 events |= POLLIN; | 40 events |= POLLIN; |
41 } | 41 } |
42 } | 42 } |
43 if (!IsClosedWrite()) { | 43 if (!IsClosedWrite()) { |
44 if ((mask_ & (1 << kOutEvent)) != 0) { | 44 if ((mask_ & (1 << kOutEvent)) != 0) { |
45 events |= POLLOUT; | 45 events |= POLLOUT; |
46 } | 46 } |
47 } | 47 } |
48 return events; | 48 return events; |
49 } | 49 } |
50 | 50 |
51 | 51 |
52 EventHandlerImplementation::EventHandlerImplementation() { | 52 EventHandlerImplementation::EventHandlerImplementation() |
| 53 : socket_map_(&HashMap::SamePointerValue, 16) { |
53 intptr_t result; | 54 intptr_t result; |
54 socket_map_size_ = kInitialPortMapSize; | |
55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, | |
56 sizeof(SocketData))); | |
57 ASSERT(socket_map_ != NULL); | |
58 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 55 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
59 if (result != 0) { | 56 if (result != 0) { |
60 FATAL("Pipe creation failed"); | 57 FATAL("Pipe creation failed"); |
61 } | 58 } |
62 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 59 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
63 timeout_ = kInfinityTimeout; | 60 timeout_ = kInfinityTimeout; |
64 timeout_port_ = 0; | 61 timeout_port_ = 0; |
65 } | 62 } |
66 | 63 |
67 | 64 |
68 EventHandlerImplementation::~EventHandlerImplementation() { | 65 EventHandlerImplementation::~EventHandlerImplementation() { |
69 free(socket_map_); | |
70 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 66 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
71 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 67 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
72 } | 68 } |
73 | 69 |
74 | 70 |
75 // TODO(hpayer): Use hash table instead of array. | |
76 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 71 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
77 ASSERT(fd >= 0); | 72 ASSERT(fd >= 0); |
78 if (fd >= socket_map_size_) { | 73 HashMap::Entry* entry = socket_map_.Lookup( |
79 intptr_t new_socket_map_size = socket_map_size_; | 74 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
80 do { | 75 ASSERT(entry != NULL); |
81 new_socket_map_size = new_socket_map_size * kPortMapGrowingFactor; | 76 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
82 } while (fd >= new_socket_map_size); | 77 if (sd == NULL) { |
83 size_t new_socket_map_bytes = new_socket_map_size * sizeof(SocketData); | 78 // If there is no data in the hash map for this file descriptor |
84 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, | 79 // then this is inserting a new SocketData for the file descriptor. |
85 new_socket_map_bytes)); | 80 sd = new SocketData(fd); |
86 ASSERT(socket_map_ != NULL); | 81 entry->value = sd; |
87 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); | |
88 memset(socket_map_ + socket_map_size_, | |
89 0, | |
90 new_socket_map_bytes - socket_map_bytes); | |
91 socket_map_size_ = new_socket_map_size; | |
92 } | 82 } |
93 | 83 ASSERT(fd == sd->fd()); |
94 SocketData* sd = socket_map_ + fd; | |
95 sd->set_fd(fd); // For now just make sure the fd is set. | |
96 return sd; | 84 return sd; |
97 } | 85 } |
98 | 86 |
99 | 87 |
100 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 88 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
101 Dart_Port dart_port, | 89 Dart_Port dart_port, |
102 int64_t data) { | 90 int64_t data) { |
103 InterruptMessage msg; | 91 InterruptMessage msg; |
104 msg.id = id; | 92 msg.id = id; |
105 msg.dart_port = dart_port; | 93 msg.dart_port = dart_port; |
106 msg.data = data; | 94 msg.data = data; |
107 intptr_t result = | 95 intptr_t result = |
108 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 96 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
109 if (result != kInterruptMessageSize) { | 97 if (result != kInterruptMessageSize) { |
110 FATAL("Interrupt message failure"); | 98 FATAL("Interrupt message failure"); |
111 } | 99 } |
112 } | 100 } |
113 | 101 |
114 | 102 |
115 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 103 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
116 struct pollfd* pollfds; | 104 struct pollfd* pollfds; |
117 | 105 |
118 // Calculate the number of file descriptors to poll on. | 106 // Calculate the number of file descriptors to poll on. |
119 intptr_t numPollfds = 1; | 107 intptr_t numPollfds = 1; |
120 for (int i = 0; i < socket_map_size_; i++) { | 108 for (HashMap::Entry* entry = socket_map_.Start(); |
121 SocketData* sd = &socket_map_[i]; | 109 entry != NULL; |
| 110 entry = socket_map_.Next(entry)) { |
| 111 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
122 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | 112 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
123 } | 113 } |
124 | 114 |
125 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 115 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
126 numPollfds)); | 116 numPollfds)); |
127 pollfds[0].fd = interrupt_fds_[0]; | 117 pollfds[0].fd = interrupt_fds_[0]; |
128 pollfds[0].events |= POLLIN; | 118 pollfds[0].events |= POLLIN; |
129 | 119 |
130 // TODO(hpayer): optimize the following iteration over the hash map | 120 int i = 1; |
131 int j = 1; | 121 for (HashMap::Entry* entry = socket_map_.Start(); |
132 for (int i = 0; i < socket_map_size_; i++) { | 122 entry != NULL; |
133 SocketData* sd = &socket_map_[i]; | 123 entry = socket_map_.Next(entry)) { |
| 124 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
134 intptr_t events = sd->GetPollEvents(); | 125 intptr_t events = sd->GetPollEvents(); |
135 if (sd->port() > 0 && events != 0) { | 126 if (sd->port() > 0 && events != 0) { |
136 // Fd is added to the poll set. | 127 // Fd is added to the poll set. |
137 pollfds[j].fd = sd->fd(); | 128 pollfds[i].fd = sd->fd(); |
138 pollfds[j].events = events; | 129 pollfds[i].events = events; |
139 j++; | 130 i++; |
140 } | 131 } |
141 } | 132 } |
142 ASSERT(numPollfds == j); | 133 ASSERT(numPollfds == i); |
143 *pollfds_size = j; | 134 *pollfds_size = i; |
| 135 |
144 return pollfds; | 136 return pollfds; |
145 } | 137 } |
146 | 138 |
147 | 139 |
148 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 140 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
149 int total_read = 0; | 141 int total_read = 0; |
150 int bytes_read = | 142 int bytes_read = |
151 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 143 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
152 if (bytes_read < 0) { | 144 if (bytes_read < 0) { |
153 return false; | 145 return false; |
(...skipping 22 matching lines...) Expand all Loading... |
176 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 168 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
177 // Close the socket for reading. | 169 // Close the socket for reading. |
178 sd->ShutdownRead(); | 170 sd->ShutdownRead(); |
179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 171 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
180 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 172 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
181 // Close the socket for writing. | 173 // Close the socket for writing. |
182 sd->ShutdownWrite(); | 174 sd->ShutdownWrite(); |
183 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 175 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
184 ASSERT(msg.data == (1 << kCloseCommand)); | 176 ASSERT(msg.data == (1 << kCloseCommand)); |
185 // Close the socket and free system resources. | 177 // Close the socket and free system resources. |
| 178 intptr_t fd = sd->fd(); |
186 sd->Close(); | 179 sd->Close(); |
| 180 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| 181 delete sd; |
187 } else { | 182 } else { |
188 // Setup events to wait for. | 183 // Setup events to wait for. |
189 sd->SetPortAndMask(msg.dart_port, msg.data); | 184 sd->SetPortAndMask(msg.dart_port, msg.data); |
190 } | 185 } |
191 } | 186 } |
192 } | 187 } |
193 } | 188 } |
194 | 189 |
195 #ifdef DEBUG_POLL | 190 #ifdef DEBUG_POLL |
196 static void PrintEventMask(struct pollfd* pollfd) { | 191 static void PrintEventMask(struct pollfd* pollfd) { |
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
375 FATAL("Create start event handler thread"); | 370 FATAL("Create start event handler thread"); |
376 } | 371 } |
377 } | 372 } |
378 | 373 |
379 | 374 |
380 void EventHandlerImplementation::SendData(intptr_t id, | 375 void EventHandlerImplementation::SendData(intptr_t id, |
381 Dart_Port dart_port, | 376 Dart_Port dart_port, |
382 intptr_t data) { | 377 intptr_t data) { |
383 WakeupHandler(id, dart_port, data); | 378 WakeupHandler(id, dart_port, data); |
384 } | 379 } |
| 380 |
| 381 |
| 382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| 383 // The hashmap does not support keys with value 0. |
| 384 return reinterpret_cast<void*>(fd + 1); |
| 385 } |
| 386 |
| 387 |
| 388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| 389 // The hashmap does not support keys with value 0. |
| 390 return dart::Utils::WordHash(fd + 1); |
| 391 } |
OLD | NEW |