Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: media/audio/shared_mem_synchronizer_win.cc

Issue 9605015: Add a SharedMemSynchronizer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: address review comments from Ami Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/audio/shared_mem_synchronizer.h"
6
7 #include "base/logging.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/threading/platform_thread.h"
10 #include "base/win/scoped_handle.h"
11
12 SharedMemSynchronizer::~SharedMemSynchronizer() {
13 }
14
15 SharedMemSynchronizer::SharedMemSynchronizer(IPCHandle handle_1,
16 IPCHandle handle_2)
17 : mine_(handle_1), other_(handle_2) {
18 DCHECK(IsValid());
19 }
20
21 void SharedMemSynchronizer::Signal() {
22 DCHECK(IsValid());
23 DCHECK_EQ(::WaitForSingleObject(mine_, 0), static_cast<DWORD>(WAIT_TIMEOUT))
24 << "Are you calling Signal() without calling Wait() first?";
25 BOOL ok = ::SetEvent(mine_);
26 CHECK(ok);
27 }
28
29 void SharedMemSynchronizer::Wait() {
30 DCHECK(IsValid());
31 DWORD wait = ::WaitForSingleObject(other_, INFINITE);
32 DCHECK_EQ(wait, WAIT_OBJECT_0);
33 BOOL ok = ::ResetEvent(other_);
34 CHECK(ok);
35 }
36
37 bool SharedMemSynchronizer::IsValid() const {
38 return mine_.IsValid() && other_.IsValid();
39 }
40
41 bool SharedMemSynchronizer::ShareToProcess(base::ProcessHandle process,
42 IPCHandle* handle_1,
43 IPCHandle* handle_2) {
44 DCHECK(IsValid());
45 HANDLE our_process = ::GetCurrentProcess();
46 if (!::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE,
47 DUPLICATE_SAME_ACCESS)) {
48 return false;
49 }
50
51 if (!::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE,
52 DUPLICATE_SAME_ACCESS)) {
53 // In case we're sharing to ourselves, we can close the handle, but
54 // if the target process is a different process, we do nothing.
55 if (process == our_process)
56 ::CloseHandle(*handle_1);
57 *handle_1 = NULL;
58 return false;
59 }
60
61 return true;
62 }
63
64 // static
65 bool SharedMemSynchronizer::InitializePair(SharedMemSynchronizer* a,
66 SharedMemSynchronizer* b) {
67 DCHECK(!a->IsValid());
68 DCHECK(!b->IsValid());
69
70 bool success = false;
71
72 // Create two manually resettable events and give each party a handle
73 // to both events.
74 HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL);
75 HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL);
76 if (event_a && event_b) {
77 a->mine_.Set(event_a);
78 a->other_.Set(event_b);
79 success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b);
80 if (success) {
81 b->mine_.Set(event_b);
82 b->other_.Set(event_a);
83 } else {
84 a->mine_.Close();
85 a->other_.Close();
86 }
87 } else {
88 if (event_a)
89 ::CloseHandle(event_a);
90 if (event_b)
91 ::CloseHandle(event_b);
92 }
93
94 DCHECK(!success || a->IsValid());
95 DCHECK(!success || b->IsValid());
96
97 return success;
98 }
99
100 namespace {
101 class ExtraWaitThread : public base::PlatformThread::Delegate {
102 public:
103 ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count,
104 int* signaled_event)
105 : stop_(stop), events_(events), count_(count),
106 signaled_event_(signaled_event) {
107 *signaled_event_ = -1;
108 }
109 virtual ~ExtraWaitThread() {}
110
111 virtual void ThreadMain() OVERRIDE {
112 // Store the |stop_| event as the first event.
113 HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ };
114 HANDLE next_thread = NULL;
115 DWORD event_count = MAXIMUM_WAIT_OBJECTS;
116 int thread_signaled_event = -1;
117 scoped_ptr<ExtraWaitThread> extra_wait_thread;
118 if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) {
119 std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]);
120
121 extra_wait_thread.reset(new ExtraWaitThread(stop_,
122 &events_[MAXIMUM_WAIT_OBJECTS - 2],
123 count_ - (MAXIMUM_WAIT_OBJECTS - 2),
124 &thread_signaled_event));
125 base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread);
126
127 event_count = MAXIMUM_WAIT_OBJECTS;
128 events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread;
129 } else {
130 std::copy(&events_[0], &events_[count_], &events[1]);
131 event_count = count_ + 1;
132 }
133
134 DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE,
135 INFINITE);
136 if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) {
137 wait -= WAIT_OBJECT_0;
138 if (wait == 0) {
139 // The stop event was signaled. Check if it was signaled by a
140 // sub thread. In case our sub thread had to spin another thread (and
141 // so on), we must wait for ours to exit before we can check the
142 // propagated event offset.
143 if (next_thread) {
144 base::PlatformThread::Join(next_thread);
145 next_thread = NULL;
146 }
147 if (thread_signaled_event != -1)
148 *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2);
149 } else if (events[wait] == next_thread) {
150 NOTREACHED();
151 } else {
152 *signaled_event_ = static_cast<int>(wait);
153 SetEvent(stop_);
154 }
155 } else {
156 NOTREACHED();
157 }
158
159 if (next_thread)
160 base::PlatformThread::Join(next_thread);
161 }
162
163 private:
164 HANDLE stop_;
165 HANDLE* events_;
166 size_t count_;
167 int* signaled_event_;
168 DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread);
169 };
170 } // end namespace
171
172 // static
173 int SharedMemSynchronizer::WaitMultiple(const SynchronizerVector& synchronizers,
Ami GONE FROM CHROMIUM 2012/03/13 20:08:02 FTR I haven't reviewed the win impl at all (IDK nu
tommi (sloooow) - chröme 2012/03/14 13:32:43 noted. Andrew - any specific comments on the win
174 size_t last_signaled) {
175 DCHECK_LT(last_signaled, synchronizers.size());
176
177 for (size_t i = 0; i < synchronizers.size(); ++i) {
178 DCHECK(synchronizers[i]->IsValid());
179 }
180
181 int ret = -1;
182
183 // TODO(tommi): Should we wait in an alertable state so that we can be
184 // canceled via an APC?
185 scoped_array<HANDLE> handles(new HANDLE[synchronizers.size()]);
186
187 // Because of the way WaitForMultipleObjects works, we do a little trick here.
188 // When multiple events are signaled, WaitForMultipleObjects will return the
189 // index of the first signaled item (lowest). This means that if we always
190 // pass the array the same way to WaitForMultipleObjects, the objects that
191 // come first, have higher priority. In times of heavy load, this will cause
192 // elements at the back to become DOS-ed.
193 // So, we store the location of the item that was last signaled. Then we split
194 // up the array and move everything higher than the last signaled index to the
195 // front and the rest to the back (meaning that the last signaled item will
196 // become the last element in the list).
197 // Assuming equally busy events, this approach distributes the priority
198 // evenly.
199
200 size_t index = 0;
201 for (size_t i = last_signaled + 1; i < synchronizers.size(); ++i)
202 handles[index++] = synchronizers[i]->other_;
203
204 for (size_t i = 0; i <= last_signaled; ++i)
205 handles[index++] = synchronizers[i]->other_;
206 DCHECK_EQ(index, synchronizers.size());
207
208 DWORD wait = WAIT_FAILED;
209 bool wait_failed = false;
210 if (synchronizers.size() <= MAXIMUM_WAIT_OBJECTS) {
211 wait = ::WaitForMultipleObjects(synchronizers.size(), &handles[0], FALSE,
212 INFINITE);
213 wait_failed = wait < WAIT_OBJECT_0 ||
214 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
215 } else {
216 // Used to stop the other wait threads when an event has been signaled.
217 base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL));
218
219 // Create the first thread and pass a pointer to all handles >63
220 // to the thread + 'stop'. Then implement the thread so that it checks
221 // if the number of handles is > 63. If so, spawns a new thread and
222 // passes >62 handles to that thread and waits for the 62 handles + stop +
223 // next thread. etc etc.
224
225 // Create a list of threads so that each thread waits on at most 62 events
226 // including one event for when a child thread signals completion and one
227 // event for when all of the threads must be stopped (due to some event
228 // being signaled).
229
230 int thread_signaled_event = -1;
231 ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1],
232 synchronizers.size() - (MAXIMUM_WAIT_OBJECTS - 1),
233 &thread_signaled_event);
234 base::PlatformThreadHandle thread;
235 base::PlatformThread::Create(0, &wait_thread, &thread);
236 HANDLE events[MAXIMUM_WAIT_OBJECTS];
237 std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]);
238 events[MAXIMUM_WAIT_OBJECTS - 1] = thread;
239 wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE,
240 INFINITE);
241 wait_failed = wait < WAIT_OBJECT_0 ||
242 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
243 if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) {
244 if (thread_signaled_event < 0) {
245 wait_failed = true;
246 NOTREACHED();
247 } else {
248 wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) +
249 thread_signaled_event;
250 }
251 } else {
252 ::SetEvent(stop);
253 }
254 base::PlatformThread::Join(thread);
255 }
256
257 if (!wait_failed) {
258 // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0).
259 wait -= WAIT_OBJECT_0;
260 BOOL ok = ::ResetEvent(handles[wait]);
261 CHECK(ok);
262 ret = (wait + last_signaled + 1) % synchronizers.size();
263 DCHECK_EQ(handles[wait], synchronizers[ret]->other_.Get());
264 } else {
265 NOTREACHED();
266 }
267
268 CHECK_NE(ret, -1);
269 return ret;
270 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698