| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 #include <errno.h> | 5 #include <errno.h> |
| 6 #include <poll.h> | 6 #include <poll.h> |
| 7 #include <pthread.h> | 7 #include <pthread.h> |
| 8 #include <stdio.h> | 8 #include <stdio.h> |
| 9 #include <string.h> | 9 #include <string.h> |
| 10 #include <sys/time.h> | 10 #include <sys/time.h> |
| 11 #include <unistd.h> | 11 #include <unistd.h> |
| 12 | 12 |
| 13 #include "bin/eventhandler.h" | 13 #include "bin/eventhandler.h" |
| 14 #include "bin/fdutils.h" | 14 #include "bin/fdutils.h" |
| 15 #include "bin/hashmap.h" |
| 16 #include "platform/utils.h" |
| 15 | 17 |
| 16 | 18 |
| 17 int64_t GetCurrentTimeMilliseconds() { | 19 int64_t GetCurrentTimeMilliseconds() { |
| 18 struct timeval tv; | 20 struct timeval tv; |
| 19 if (gettimeofday(&tv, NULL) < 0) { | 21 if (gettimeofday(&tv, NULL) < 0) { |
| 20 UNREACHABLE(); | 22 UNREACHABLE(); |
| 21 return 0; | 23 return 0; |
| 22 } | 24 } |
| 23 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; | 25 return ((static_cast<int64_t>(tv.tv_sec) * 1000000) + tv.tv_usec) / 1000; |
| 24 } | 26 } |
| 25 | 27 |
| 26 | 28 |
| 27 static const int kInitialPortMapSize = 16; | |
| 28 static const int kPortMapGrowingFactor = 2; | |
| 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); | 29 static const int kInterruptMessageSize = sizeof(InterruptMessage); |
| 30 static const int kInfinityTimeout = -1; | 30 static const int kInfinityTimeout = -1; |
| 31 static const int kTimerId = -1; | 31 static const int kTimerId = -1; |
| 32 | 32 |
| 33 | 33 |
| 34 intptr_t SocketData::GetPollEvents() { | 34 intptr_t SocketData::GetPollEvents() { |
| 35 // Do not ask for POLLERR and POLLHUP explicitly as they are | 35 // Do not ask for POLLERR and POLLHUP explicitly as they are |
| 36 // triggered anyway. | 36 // triggered anyway. |
| 37 intptr_t events = 0; | 37 intptr_t events = 0; |
| 38 if (!IsClosedRead()) { | 38 if (!IsClosedRead()) { |
| 39 if ((mask_ & (1 << kInEvent)) != 0) { | 39 if ((mask_ & (1 << kInEvent)) != 0) { |
| 40 events |= POLLIN; | 40 events |= POLLIN; |
| 41 } | 41 } |
| 42 } | 42 } |
| 43 if (!IsClosedWrite()) { | 43 if (!IsClosedWrite()) { |
| 44 if ((mask_ & (1 << kOutEvent)) != 0) { | 44 if ((mask_ & (1 << kOutEvent)) != 0) { |
| 45 events |= POLLOUT; | 45 events |= POLLOUT; |
| 46 } | 46 } |
| 47 } | 47 } |
| 48 return events; | 48 return events; |
| 49 } | 49 } |
| 50 | 50 |
| 51 | 51 |
| 52 EventHandlerImplementation::EventHandlerImplementation() { | 52 EventHandlerImplementation::EventHandlerImplementation() |
| 53 : socket_map_(&HashMap::SamePointerValue, 16) { |
| 53 intptr_t result; | 54 intptr_t result; |
| 54 socket_map_size_ = kInitialPortMapSize; | |
| 55 socket_map_ = reinterpret_cast<SocketData*>(calloc(socket_map_size_, | |
| 56 sizeof(SocketData))); | |
| 57 ASSERT(socket_map_ != NULL); | |
| 58 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); | 55 result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_)); |
| 59 if (result != 0) { | 56 if (result != 0) { |
| 60 FATAL("Pipe creation failed"); | 57 FATAL("Pipe creation failed"); |
| 61 } | 58 } |
| 62 FDUtils::SetNonBlocking(interrupt_fds_[0]); | 59 FDUtils::SetNonBlocking(interrupt_fds_[0]); |
| 63 timeout_ = kInfinityTimeout; | 60 timeout_ = kInfinityTimeout; |
| 64 timeout_port_ = 0; | 61 timeout_port_ = 0; |
| 65 } | 62 } |
| 66 | 63 |
| 67 | 64 |
| 68 EventHandlerImplementation::~EventHandlerImplementation() { | 65 EventHandlerImplementation::~EventHandlerImplementation() { |
| 69 free(socket_map_); | |
| 70 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); | 66 TEMP_FAILURE_RETRY(close(interrupt_fds_[0])); |
| 71 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); | 67 TEMP_FAILURE_RETRY(close(interrupt_fds_[1])); |
| 72 } | 68 } |
| 73 | 69 |
| 74 | 70 |
| 75 // TODO(hpayer): Use hash table instead of array. | |
| 76 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { | 71 SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) { |
| 77 ASSERT(fd >= 0); | 72 ASSERT(fd >= 0); |
| 78 if (fd >= socket_map_size_) { | 73 HashMap::Entry* entry = socket_map_.Lookup( |
| 79 intptr_t new_socket_map_size = socket_map_size_; | 74 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true); |
| 80 do { | 75 ASSERT(entry != NULL); |
| 81 new_socket_map_size = new_socket_map_size * kPortMapGrowingFactor; | 76 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
| 82 } while (fd >= new_socket_map_size); | 77 if (sd == NULL) { |
| 83 size_t new_socket_map_bytes = new_socket_map_size * sizeof(SocketData); | 78 // If there is no data in the hash map for this file descriptor |
| 84 socket_map_ = reinterpret_cast<SocketData*>(realloc(socket_map_, | 79 // then this is inserting a new SocketData for the file descriptor. |
| 85 new_socket_map_bytes)); | 80 sd = new SocketData(fd); |
| 86 ASSERT(socket_map_ != NULL); | 81 entry->value = sd; |
| 87 size_t socket_map_bytes = socket_map_size_ * sizeof(SocketData); | |
| 88 memset(socket_map_ + socket_map_size_, | |
| 89 0, | |
| 90 new_socket_map_bytes - socket_map_bytes); | |
| 91 socket_map_size_ = new_socket_map_size; | |
| 92 } | 82 } |
| 93 | 83 ASSERT(fd == sd->fd()); |
| 94 SocketData* sd = socket_map_ + fd; | |
| 95 sd->set_fd(fd); // For now just make sure the fd is set. | |
| 96 return sd; | 84 return sd; |
| 97 } | 85 } |
| 98 | 86 |
| 99 | 87 |
| 100 void EventHandlerImplementation::WakeupHandler(intptr_t id, | 88 void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| 101 Dart_Port dart_port, | 89 Dart_Port dart_port, |
| 102 int64_t data) { | 90 int64_t data) { |
| 103 InterruptMessage msg; | 91 InterruptMessage msg; |
| 104 msg.id = id; | 92 msg.id = id; |
| 105 msg.dart_port = dart_port; | 93 msg.dart_port = dart_port; |
| 106 msg.data = data; | 94 msg.data = data; |
| 107 intptr_t result = | 95 intptr_t result = |
| 108 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); | 96 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize); |
| 109 if (result != kInterruptMessageSize) { | 97 if (result != kInterruptMessageSize) { |
| 110 FATAL("Interrupt message failure"); | 98 FATAL("Interrupt message failure"); |
| 111 } | 99 } |
| 112 } | 100 } |
| 113 | 101 |
| 114 | 102 |
| 115 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { | 103 struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) { |
| 116 struct pollfd* pollfds; | 104 struct pollfd* pollfds; |
| 117 | 105 |
| 118 // Calculate the number of file descriptors to poll on. | 106 // Calculate the number of file descriptors to poll on. |
| 119 intptr_t numPollfds = 1; | 107 intptr_t numPollfds = 1; |
| 120 for (int i = 0; i < socket_map_size_; i++) { | 108 for (HashMap::Entry* entry = socket_map_.Start(); |
| 121 SocketData* sd = &socket_map_[i]; | 109 entry != NULL; |
| 110 entry = socket_map_.Next(entry)) { |
| 111 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
| 122 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; | 112 if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++; |
| 123 } | 113 } |
| 124 | 114 |
| 125 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), | 115 pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd), |
| 126 numPollfds)); | 116 numPollfds)); |
| 127 pollfds[0].fd = interrupt_fds_[0]; | 117 pollfds[0].fd = interrupt_fds_[0]; |
| 128 pollfds[0].events |= POLLIN; | 118 pollfds[0].events |= POLLIN; |
| 129 | 119 |
| 130 // TODO(hpayer): optimize the following iteration over the hash map | 120 int i = 1; |
| 131 int j = 1; | 121 for (HashMap::Entry* entry = socket_map_.Start(); |
| 132 for (int i = 0; i < socket_map_size_; i++) { | 122 entry != NULL; |
| 133 SocketData* sd = &socket_map_[i]; | 123 entry = socket_map_.Next(entry)) { |
| 124 SocketData* sd = reinterpret_cast<SocketData*>(entry->value); |
| 134 intptr_t events = sd->GetPollEvents(); | 125 intptr_t events = sd->GetPollEvents(); |
| 135 if (sd->port() > 0 && events != 0) { | 126 if (sd->port() > 0 && events != 0) { |
| 136 // Fd is added to the poll set. | 127 // Fd is added to the poll set. |
| 137 pollfds[j].fd = sd->fd(); | 128 pollfds[i].fd = sd->fd(); |
| 138 pollfds[j].events = events; | 129 pollfds[i].events = events; |
| 139 j++; | 130 i++; |
| 140 } | 131 } |
| 141 } | 132 } |
| 142 ASSERT(numPollfds == j); | 133 ASSERT(numPollfds == i); |
| 143 *pollfds_size = j; | 134 *pollfds_size = i; |
| 135 |
| 144 return pollfds; | 136 return pollfds; |
| 145 } | 137 } |
| 146 | 138 |
| 147 | 139 |
| 148 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { | 140 bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) { |
| 149 int total_read = 0; | 141 int total_read = 0; |
| 150 int bytes_read = | 142 int bytes_read = |
| 151 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); | 143 TEMP_FAILURE_RETRY(read(interrupt_fds_[0], msg, kInterruptMessageSize)); |
| 152 if (bytes_read < 0) { | 144 if (bytes_read < 0) { |
| 153 return false; | 145 return false; |
| (...skipping 22 matching lines...) Expand all Loading... |
| 176 ASSERT(msg.data == (1 << kShutdownReadCommand)); | 168 ASSERT(msg.data == (1 << kShutdownReadCommand)); |
| 177 // Close the socket for reading. | 169 // Close the socket for reading. |
| 178 sd->ShutdownRead(); | 170 sd->ShutdownRead(); |
| 179 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { | 171 } else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) { |
| 180 ASSERT(msg.data == (1 << kShutdownWriteCommand)); | 172 ASSERT(msg.data == (1 << kShutdownWriteCommand)); |
| 181 // Close the socket for writing. | 173 // Close the socket for writing. |
| 182 sd->ShutdownWrite(); | 174 sd->ShutdownWrite(); |
| 183 } else if ((msg.data & (1 << kCloseCommand)) != 0) { | 175 } else if ((msg.data & (1 << kCloseCommand)) != 0) { |
| 184 ASSERT(msg.data == (1 << kCloseCommand)); | 176 ASSERT(msg.data == (1 << kCloseCommand)); |
| 185 // Close the socket and free system resources. | 177 // Close the socket and free system resources. |
| 178 intptr_t fd = sd->fd(); |
| 186 sd->Close(); | 179 sd->Close(); |
| 180 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| 181 delete sd; |
| 187 } else { | 182 } else { |
| 188 // Setup events to wait for. | 183 // Setup events to wait for. |
| 189 sd->SetPortAndMask(msg.dart_port, msg.data); | 184 sd->SetPortAndMask(msg.dart_port, msg.data); |
| 190 } | 185 } |
| 191 } | 186 } |
| 192 } | 187 } |
| 193 } | 188 } |
| 194 | 189 |
| 195 #ifdef DEBUG_POLL | 190 #ifdef DEBUG_POLL |
| 196 static void PrintEventMask(struct pollfd* pollfd) { | 191 static void PrintEventMask(struct pollfd* pollfd) { |
| (...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 375 FATAL("Create start event handler thread"); | 370 FATAL("Create start event handler thread"); |
| 376 } | 371 } |
| 377 } | 372 } |
| 378 | 373 |
| 379 | 374 |
| 380 void EventHandlerImplementation::SendData(intptr_t id, | 375 void EventHandlerImplementation::SendData(intptr_t id, |
| 381 Dart_Port dart_port, | 376 Dart_Port dart_port, |
| 382 intptr_t data) { | 377 intptr_t data) { |
| 383 WakeupHandler(id, dart_port, data); | 378 WakeupHandler(id, dart_port, data); |
| 384 } | 379 } |
| 380 |
| 381 |
| 382 void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| 383 // The hashmap does not support keys with value 0. |
| 384 return reinterpret_cast<void*>(fd + 1); |
| 385 } |
| 386 |
| 387 |
| 388 uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| 389 // The hashmap does not support keys with value 0. |
| 390 return dart::Utils::WordHash(fd + 1); |
| 391 } |
| OLD | NEW |