OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "media/audio/shared_mem_synchronizer.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "base/memory/scoped_ptr.h" | |
9 #include "base/threading/platform_thread.h" | |
10 #include "base/win/scoped_handle.h" | |
11 | |
12 SharedMemSynchronizer::~SharedMemSynchronizer() { | |
13 } | |
14 | |
15 SharedMemSynchronizer::SharedMemSynchronizer(IPCHandle handle_1, | |
16 IPCHandle handle_2) | |
17 : mine_(handle_1), other_(handle_2) { | |
18 DCHECK(IsValid()); | |
19 } | |
20 | |
21 void SharedMemSynchronizer::Signal() { | |
22 DCHECK(IsValid()); | |
23 DCHECK_EQ(::WaitForSingleObject(mine_, 0), static_cast<DWORD>(WAIT_TIMEOUT)) | |
24 << "Are you calling Signal() without calling Wait() first?"; | |
25 BOOL ok = ::SetEvent(mine_); | |
26 CHECK(ok); | |
27 } | |
28 | |
29 void SharedMemSynchronizer::Wait() { | |
30 DCHECK(IsValid()); | |
31 DWORD wait = ::WaitForSingleObject(other_, INFINITE); | |
32 DCHECK_EQ(wait, WAIT_OBJECT_0); | |
33 BOOL ok = ::ResetEvent(other_); | |
34 CHECK(ok); | |
35 } | |
36 | |
37 bool SharedMemSynchronizer::IsValid() const { | |
38 return mine_.IsValid() && other_.IsValid(); | |
39 } | |
40 | |
41 bool SharedMemSynchronizer::ShareToProcess(base::ProcessHandle process, | |
42 IPCHandle* handle_1, | |
43 IPCHandle* handle_2) { | |
44 DCHECK(IsValid()); | |
45 HANDLE our_process = ::GetCurrentProcess(); | |
46 if (!::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE, | |
47 DUPLICATE_SAME_ACCESS)) { | |
48 return false; | |
49 } | |
50 | |
51 if (!::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE, | |
52 DUPLICATE_SAME_ACCESS)) { | |
53 // In case we're sharing to ourselves, we can close the handle, but | |
54 // if the target process is a different process, we do nothing. | |
55 if (process == our_process) | |
56 ::CloseHandle(*handle_1); | |
57 *handle_1 = NULL; | |
58 return false; | |
59 } | |
60 | |
61 return true; | |
62 } | |
63 | |
64 // static | |
65 bool SharedMemSynchronizer::InitializePair(SharedMemSynchronizer* a, | |
66 SharedMemSynchronizer* b) { | |
67 DCHECK(!a->IsValid()); | |
68 DCHECK(!b->IsValid()); | |
69 | |
70 bool success = false; | |
71 | |
72 // Create two manually resettable events and give each party a handle | |
73 // to both events. | |
74 HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL); | |
75 HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL); | |
76 if (event_a && event_b) { | |
77 a->mine_.Set(event_a); | |
78 a->other_.Set(event_b); | |
79 success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b); | |
80 if (success) { | |
81 b->mine_.Set(event_b); | |
82 b->other_.Set(event_a); | |
83 } else { | |
84 a->mine_.Close(); | |
85 a->other_.Close(); | |
86 } | |
87 } else { | |
88 if (event_a) | |
89 ::CloseHandle(event_a); | |
90 if (event_b) | |
91 ::CloseHandle(event_b); | |
92 } | |
93 | |
94 DCHECK(!success || a->IsValid()); | |
95 DCHECK(!success || b->IsValid()); | |
96 | |
97 return success; | |
98 } | |
99 | |
100 namespace { | |
101 class ExtraWaitThread : public base::PlatformThread::Delegate { | |
102 public: | |
103 ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count, | |
104 int* signaled_event) | |
105 : stop_(stop), events_(events), count_(count), | |
106 signaled_event_(signaled_event) { | |
107 *signaled_event_ = -1; | |
108 } | |
109 virtual ~ExtraWaitThread() {} | |
110 | |
111 virtual void ThreadMain() OVERRIDE { | |
112 // Store the |stop_| event as the first event. | |
113 HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ }; | |
114 HANDLE next_thread = NULL; | |
115 DWORD event_count = MAXIMUM_WAIT_OBJECTS; | |
116 int thread_signaled_event = -1; | |
117 scoped_ptr<ExtraWaitThread> extra_wait_thread; | |
118 if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) { | |
119 std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]); | |
120 | |
121 extra_wait_thread.reset(new ExtraWaitThread(stop_, | |
122 &events_[MAXIMUM_WAIT_OBJECTS - 2], | |
123 count_ - (MAXIMUM_WAIT_OBJECTS - 2), | |
124 &thread_signaled_event)); | |
125 base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread); | |
126 | |
127 event_count = MAXIMUM_WAIT_OBJECTS; | |
128 events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread; | |
129 } else { | |
130 std::copy(&events_[0], &events_[count_], &events[1]); | |
131 event_count = count_ + 1; | |
132 } | |
133 | |
134 DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE, | |
135 INFINITE); | |
136 if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) { | |
137 wait -= WAIT_OBJECT_0; | |
138 if (wait == 0) { | |
139 // The stop event was signaled. Check if it was signaled by a | |
140 // sub thread. In case our sub thread had to spin another thread (and | |
141 // so on), we must wait for ours to exit before we can check the | |
142 // propagated event offset. | |
143 if (next_thread) { | |
144 base::PlatformThread::Join(next_thread); | |
145 next_thread = NULL; | |
146 } | |
147 if (thread_signaled_event != -1) | |
148 *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2); | |
149 } else if (events[wait] == next_thread) { | |
150 NOTREACHED(); | |
151 } else { | |
152 *signaled_event_ = static_cast<int>(wait); | |
153 SetEvent(stop_); | |
154 } | |
155 } else { | |
156 NOTREACHED(); | |
157 } | |
158 | |
159 if (next_thread) | |
160 base::PlatformThread::Join(next_thread); | |
161 } | |
162 | |
163 private: | |
164 HANDLE stop_; | |
165 HANDLE* events_; | |
166 size_t count_; | |
167 int* signaled_event_; | |
168 DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread); | |
169 }; | |
170 } // end namespace | |
171 | |
172 // static | |
173 int SharedMemSynchronizer::WaitMultiple(const SynchronizerVector& synchronizers, | |
Ami GONE FROM CHROMIUM
2012/03/13 20:08:02
FTR I haven't reviewed the win impl at all (IDK nu
tommi (sloooow) - chröme
2012/03/14 13:32:43
noted.
Andrew - any specific comments on the win
| |
174 size_t last_signaled) { | |
175 DCHECK_LT(last_signaled, synchronizers.size()); | |
176 | |
177 for (size_t i = 0; i < synchronizers.size(); ++i) { | |
178 DCHECK(synchronizers[i]->IsValid()); | |
179 } | |
180 | |
181 int ret = -1; | |
182 | |
183 // TODO(tommi): Should we wait in an alertable state so that we can be | |
184 // canceled via an APC? | |
185 scoped_array<HANDLE> handles(new HANDLE[synchronizers.size()]); | |
186 | |
187 // Because of the way WaitForMultipleObjects works, we do a little trick here. | |
188 // When multiple events are signaled, WaitForMultipleObjects will return the | |
189 // index of the first signaled item (lowest). This means that if we always | |
190 // pass the array the same way to WaitForMultipleObjects, the objects that | |
191 // come first, have higher priority. In times of heavy load, this will cause | |
192 // elements at the back to become DOS-ed. | |
193 // So, we store the location of the item that was last signaled. Then we split | |
194 // up the array and move everything higher than the last signaled index to the | |
195 // front and the rest to the back (meaning that the last signaled item will | |
196 // become the last element in the list). | |
197 // Assuming equally busy events, this approach distributes the priority | |
198 // evenly. | |
199 | |
200 size_t index = 0; | |
201 for (size_t i = last_signaled + 1; i < synchronizers.size(); ++i) | |
202 handles[index++] = synchronizers[i]->other_; | |
203 | |
204 for (size_t i = 0; i <= last_signaled; ++i) | |
205 handles[index++] = synchronizers[i]->other_; | |
206 DCHECK_EQ(index, synchronizers.size()); | |
207 | |
208 DWORD wait = WAIT_FAILED; | |
209 bool wait_failed = false; | |
210 if (synchronizers.size() <= MAXIMUM_WAIT_OBJECTS) { | |
211 wait = ::WaitForMultipleObjects(synchronizers.size(), &handles[0], FALSE, | |
212 INFINITE); | |
213 wait_failed = wait < WAIT_OBJECT_0 || | |
214 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); | |
215 } else { | |
216 // Used to stop the other wait threads when an event has been signaled. | |
217 base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL)); | |
218 | |
219 // Create the first thread and pass a pointer to all handles >63 | |
220 // to the thread + 'stop'. Then implement the thread so that it checks | |
221 // if the number of handles is > 63. If so, spawns a new thread and | |
222 // passes >62 handles to that thread and waits for the 62 handles + stop + | |
223 // next thread. etc etc. | |
224 | |
225 // Create a list of threads so that each thread waits on at most 62 events | |
226 // including one event for when a child thread signals completion and one | |
227 // event for when all of the threads must be stopped (due to some event | |
228 // being signaled). | |
229 | |
230 int thread_signaled_event = -1; | |
231 ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1], | |
232 synchronizers.size() - (MAXIMUM_WAIT_OBJECTS - 1), | |
233 &thread_signaled_event); | |
234 base::PlatformThreadHandle thread; | |
235 base::PlatformThread::Create(0, &wait_thread, &thread); | |
236 HANDLE events[MAXIMUM_WAIT_OBJECTS]; | |
237 std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]); | |
238 events[MAXIMUM_WAIT_OBJECTS - 1] = thread; | |
239 wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE, | |
240 INFINITE); | |
241 wait_failed = wait < WAIT_OBJECT_0 || | |
242 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); | |
243 if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) { | |
244 if (thread_signaled_event < 0) { | |
245 wait_failed = true; | |
246 NOTREACHED(); | |
247 } else { | |
248 wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) + | |
249 thread_signaled_event; | |
250 } | |
251 } else { | |
252 ::SetEvent(stop); | |
253 } | |
254 base::PlatformThread::Join(thread); | |
255 } | |
256 | |
257 if (!wait_failed) { | |
258 // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0). | |
259 wait -= WAIT_OBJECT_0; | |
260 BOOL ok = ::ResetEvent(handles[wait]); | |
261 CHECK(ok); | |
262 ret = (wait + last_signaled + 1) % synchronizers.size(); | |
263 DCHECK_EQ(handles[wait], synchronizers[ret]->other_.Get()); | |
264 } else { | |
265 NOTREACHED(); | |
266 } | |
267 | |
268 CHECK_NE(ret, -1); | |
269 return ret; | |
270 } | |
OLD | NEW |