OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 #include "media/audio/shared_mem_synchronizer.h" | |
scherkus (not reviewing)
2012/03/08 02:37:08
blank line above
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
5 | |
6 #include "base/logging.h" | |
7 #include "base/memory/scoped_ptr.h" | |
8 #include "base/threading/platform_thread.h" | |
9 #include "base/win/scoped_handle.h" | |
10 | |
11 SharedMemSynchronizer::~SharedMemSynchronizer() { | |
12 } | |
13 | |
14 SharedMemSynchronizer::SharedMemSynchronizer(IPCHandle handle_1, | |
15 IPCHandle handle_2) | |
16 : mine_(handle_1), other_(handle_2) { | |
17 DCHECK(IsValid()); | |
18 } | |
19 | |
20 void SharedMemSynchronizer::Signal() { | |
21 DCHECK(IsValid()); | |
22 DCHECK(::WaitForSingleObject(mine_, 0) == WAIT_TIMEOUT); | |
scherkus (not reviewing)
2012/03/08 02:37:08
can we DCHECK_EQ it up here + below?
also is this
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
23 ::SetEvent(mine_); | |
scherkus (not reviewing)
2012/03/08 02:37:08
FYI this could fail -- but not sure what we'd do t
tommi (sloooow) - chröme
2012/03/08 16:10:57
Paranoid is good in this case, so Done.
| |
24 } | |
25 | |
26 void SharedMemSynchronizer::Wait() { | |
27 DCHECK(IsValid()); | |
28 DWORD wait = ::WaitForSingleObject(other_, INFINITE); | |
29 DCHECK(wait == WAIT_OBJECT_0); | |
scherkus (not reviewing)
2012/03/08 02:37:08
ditto
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
30 ::ResetEvent(other_); | |
scherkus (not reviewing)
2012/03/08 02:37:08
ditto on failing here + way below
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
31 } | |
32 | |
33 bool SharedMemSynchronizer::IsValid() const { | |
34 return mine_.IsValid() && other_.IsValid(); | |
35 } | |
36 | |
37 bool SharedMemSynchronizer::ShareToProcess(base::ProcessHandle process, | |
38 IPCHandle* handle_1, | |
39 IPCHandle* handle_2) { | |
40 DCHECK(IsValid()); | |
41 bool success = false; | |
42 HANDLE our_process = ::GetCurrentProcess(); | |
43 if (::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE, | |
scherkus (not reviewing)
2012/03/08 02:37:08
nit: return early on fail instead of nesting ifs a
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
44 DUPLICATE_SAME_ACCESS)) { | |
45 if (::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE, | |
46 DUPLICATE_SAME_ACCESS)) { | |
47 success = true; | |
48 } else { | |
49 // In case we're sharing to ourselves, we can close the handle, but | |
50 // if the target process is a different process, we do nothing. | |
51 if (process == our_process) | |
52 ::CloseHandle(*handle_1); | |
53 *handle_1 = NULL; | |
54 } | |
55 } | |
56 return success; | |
57 } | |
58 | |
59 // static | |
60 bool SharedMemSynchronizer::Create(SharedMemSynchronizer* a, | |
61 SharedMemSynchronizer* b) { | |
62 DCHECK(!a->IsValid()); | |
63 DCHECK(!b->IsValid()); | |
64 | |
65 bool success = false; | |
66 | |
67 // Create two manually resettable events and give each party a handle | |
68 // to both events. | |
69 HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL); | |
70 HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL); | |
71 if (event_a && event_b) { | |
72 a->mine_.Set(event_a); | |
73 a->other_.Set(event_b); | |
74 success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b); | |
75 if (success) { | |
76 b->mine_.Set(event_b); | |
77 b->other_.Set(event_a); | |
78 } else { | |
79 a->mine_.Close(); | |
80 a->other_.Close(); | |
81 } | |
82 } else { | |
83 if (event_a) | |
84 ::CloseHandle(event_a); | |
85 if (event_b) | |
86 ::CloseHandle(event_b); | |
87 } | |
88 | |
89 DCHECK(!success || a->IsValid()); | |
90 DCHECK(!success || b->IsValid()); | |
91 | |
92 return success; | |
93 } | |
94 | |
95 namespace { | |
96 class ExtraWaitThread : public base::PlatformThread::Delegate { | |
97 public: | |
98 ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count, | |
99 int* signaled_event) | |
100 : stop_(stop), events_(events), count_(count), | |
101 signaled_event_(signaled_event) { | |
102 *signaled_event_ = -1; | |
103 } | |
104 virtual ~ExtraWaitThread() {} | |
105 | |
106 virtual void ThreadMain() OVERRIDE { | |
107 // Store the |stop_| event as the first event. | |
108 HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ }; | |
109 HANDLE next_thread = NULL; | |
110 DWORD event_count = MAXIMUM_WAIT_OBJECTS; | |
111 int thread_signaled_event = -1; | |
112 scoped_ptr<ExtraWaitThread> extra_wait_thread; | |
113 if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) { | |
114 std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]); | |
115 | |
116 extra_wait_thread.reset(new ExtraWaitThread(stop_, | |
117 &events_[MAXIMUM_WAIT_OBJECTS - 2], | |
118 count_ - (MAXIMUM_WAIT_OBJECTS - 2), | |
119 &thread_signaled_event)); | |
120 base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread); | |
121 | |
122 event_count = MAXIMUM_WAIT_OBJECTS; | |
123 events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread; | |
124 } else { | |
125 std::copy(&events_[0], &events_[count_], &events[1]); | |
126 event_count = count_ + 1; | |
127 } | |
128 | |
129 DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE, | |
130 INFINITE); | |
131 if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) { | |
132 wait -= WAIT_OBJECT_0; | |
133 if (wait == 0) { | |
134 // The stop event was signaled. Check if it was signaled by a | |
135 // sub thread. In case our sub thread had to spin another thread (and | |
136 // so on), we must wait for ours to exit before we can check the | |
137 // propagated event offset. | |
138 if (next_thread) { | |
139 base::PlatformThread::Join(next_thread); | |
140 next_thread = NULL; | |
141 } | |
142 if (thread_signaled_event != -1) | |
143 *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2); | |
144 } else if (events[wait] == next_thread) { | |
145 NOTREACHED(); | |
146 } else { | |
147 *signaled_event_ = static_cast<int>(wait); | |
148 SetEvent(stop_); | |
149 } | |
150 } else { | |
151 NOTREACHED(); | |
152 } | |
153 | |
154 if (next_thread) | |
155 base::PlatformThread::Join(next_thread); | |
156 } | |
157 | |
158 private: | |
159 HANDLE stop_; | |
160 HANDLE* events_; | |
161 size_t count_; | |
162 int* signaled_event_; | |
163 DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread); | |
164 }; | |
165 } // end namespace | |
166 | |
167 // static | |
168 int SharedMemSynchronizer::WaitMultiple(SharedMemSynchronizer* synchronizers, | |
169 size_t count, | |
170 size_t last_signaled) { | |
171 DCHECK(last_signaled < count); | |
scherkus (not reviewing)
2012/03/08 02:37:08
ditto here+below
tommi (sloooow) - chröme
2012/03/08 16:10:57
Done.
| |
172 #ifndef NDEBUG | |
173 for (size_t i = 0; i < count; ++i) | |
174 DCHECK(synchronizers[i].IsValid()); | |
175 #endif | |
176 | |
177 int ret = -1; | |
178 | |
179 // TODO(tommi): Should we wait in an alertable state so that we can be | |
180 // canceled via an APC? | |
181 scoped_array<HANDLE> handles(new HANDLE[count]); | |
182 | |
183 // Because of the way WaitForMultipleObjects works, we do a little trick here. | |
184 // When multiple events are signaled, WaitForMultipleObjects will return the | |
185 // index of the first signaled item (lowest). This means that if we always | |
186 // pass the array the same way to WaitForMultipleObjects, the objects that | |
187 // come first, have higher priority. In times of heavy load, this will cause | |
188 // elements at the back to become DOS-ed. | |
189 // So, we store the location of the item that was last signaled. Then we split | |
190 // up the array and move everything higher than the last signaled index to the | |
191 // front and the rest to the back (meaning that the last signaled item will | |
192 // become the last element in the list). | |
193 // Assuming equally busy events, this approach distributes the priority | |
scherkus (not reviewing)
2012/03/08 02:37:08
sanity check: aren't we using the same trick in po
tommi (sloooow) - chröme
2012/03/08 16:10:57
Yes. There's one very subtle difference which is
| |
194 // evenly. | |
195 | |
196 size_t index = 0; | |
197 for (size_t i = last_signaled + 1; i < count; ++i) | |
198 handles[index++] = synchronizers[i].other_; | |
199 | |
200 for (size_t i = 0; i <= last_signaled; ++i) | |
201 handles[index++] = synchronizers[i].other_; | |
202 DCHECK_EQ(index, count); | |
203 | |
204 DWORD wait = WAIT_FAILED; | |
205 bool wait_failed = false; | |
206 if (count <= MAXIMUM_WAIT_OBJECTS) { | |
scherkus (not reviewing)
2012/03/08 02:37:08
stepping back a bit is this something we want/need
tommi (sloooow) - chröme
2012/03/08 16:10:57
Right! So, I thought quite a bit about all of thi
| |
207 wait = ::WaitForMultipleObjects(count, handles.get(), FALSE, INFINITE); | |
208 wait_failed = wait < WAIT_OBJECT_0 || | |
209 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); | |
210 } else { | |
211 // Used to stop the other wait threads when an event has been signaled. | |
212 base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL)); | |
213 | |
214 // Create the first thread and pass a pointer to all handles >63 | |
215 // to the thread + 'stop'. Then implement the thread so that it checks | |
216 // if the number of handles is > 63. If so, spawns a new thread and | |
217 // passes >62 handles to that thread and waits for the 62 handles + stop + | |
218 // next thread. etc etc. | |
scherkus (not reviewing)
2012/03/08 02:37:08
bit confusing to read
perhaps reword to include s
tommi (sloooow) - chröme
2012/03/08 16:10:57
I realize now that this comment is a note I left t
| |
219 | |
220 int thread_signaled_event = -1; | |
221 ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1], | |
222 count - (MAXIMUM_WAIT_OBJECTS - 1), &thread_signaled_event); | |
223 base::PlatformThreadHandle thread; | |
224 base::PlatformThread::Create(0, &wait_thread, &thread); | |
225 HANDLE events[MAXIMUM_WAIT_OBJECTS]; | |
226 std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]); | |
227 events[MAXIMUM_WAIT_OBJECTS - 1] = thread; | |
228 wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE, | |
229 INFINITE); | |
230 wait_failed = wait < WAIT_OBJECT_0 || | |
231 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); | |
232 if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) { | |
233 if (thread_signaled_event < 0) { | |
234 wait_failed = true; | |
235 NOTREACHED(); | |
236 } else { | |
237 wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) + | |
238 thread_signaled_event; | |
239 } | |
240 } else { | |
241 ::SetEvent(stop); | |
242 } | |
243 base::PlatformThread::Join(thread); | |
244 } | |
245 | |
246 if (!wait_failed) { | |
247 // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0). | |
248 wait -= WAIT_OBJECT_0; | |
249 ::ResetEvent(handles[wait]); | |
250 ret = (wait + last_signaled + 1) % count; | |
251 DCHECK_EQ(handles[wait], synchronizers[ret].other_.Get()); | |
252 } else { | |
253 NOTREACHED(); | |
scherkus (not reviewing)
2012/03/08 02:37:08
LOG?
how severe is this?
note that in release mo
tommi (sloooow) - chröme
2012/03/08 16:10:57
These are just sanity checks. If they fail it's b
| |
254 } | |
255 | |
256 DCHECK_NE(ret, -1); | |
257 return ret; | |
258 } | |
OLD | NEW |