Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(512)

Side by Side Diff: media/audio/win/shared_mem_synchronizer_win.cc

Issue 9605015: Add a SharedMemSynchronizer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 #include "media/audio/shared_mem_synchronizer.h"
scherkus (not reviewing) 2012/03/08 02:37:08 blank line above
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
5
6 #include "base/logging.h"
7 #include "base/memory/scoped_ptr.h"
8 #include "base/threading/platform_thread.h"
9 #include "base/win/scoped_handle.h"
10
11 SharedMemSynchronizer::~SharedMemSynchronizer() {
12 }
13
14 SharedMemSynchronizer::SharedMemSynchronizer(IPCHandle handle_1,
15 IPCHandle handle_2)
16 : mine_(handle_1), other_(handle_2) {
17 DCHECK(IsValid());
18 }
19
20 void SharedMemSynchronizer::Signal() {
21 DCHECK(IsValid());
22 DCHECK(::WaitForSingleObject(mine_, 0) == WAIT_TIMEOUT);
scherkus (not reviewing) 2012/03/08 02:37:08 can we DCHECK_EQ it up here + below? also is this
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
23 ::SetEvent(mine_);
scherkus (not reviewing) 2012/03/08 02:37:08 FYI this could fail -- but not sure what we'd do t
tommi (sloooow) - chröme 2012/03/08 16:10:57 Paranoid is good in this case, so Done.
24 }
25
26 void SharedMemSynchronizer::Wait() {
27 DCHECK(IsValid());
28 DWORD wait = ::WaitForSingleObject(other_, INFINITE);
29 DCHECK(wait == WAIT_OBJECT_0);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
30 ::ResetEvent(other_);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto on failing here + way below
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
31 }
32
33 bool SharedMemSynchronizer::IsValid() const {
34 return mine_.IsValid() && other_.IsValid();
35 }
36
37 bool SharedMemSynchronizer::ShareToProcess(base::ProcessHandle process,
38 IPCHandle* handle_1,
39 IPCHandle* handle_2) {
40 DCHECK(IsValid());
41 bool success = false;
42 HANDLE our_process = ::GetCurrentProcess();
43 if (::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE,
scherkus (not reviewing) 2012/03/08 02:37:08 nit: return early on fail instead of nesting ifs a
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
44 DUPLICATE_SAME_ACCESS)) {
45 if (::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE,
46 DUPLICATE_SAME_ACCESS)) {
47 success = true;
48 } else {
49 // In case we're sharing to ourselves, we can close the handle, but
50 // if the target process is a different process, we do nothing.
51 if (process == our_process)
52 ::CloseHandle(*handle_1);
53 *handle_1 = NULL;
54 }
55 }
56 return success;
57 }
58
59 // static
60 bool SharedMemSynchronizer::Create(SharedMemSynchronizer* a,
61 SharedMemSynchronizer* b) {
62 DCHECK(!a->IsValid());
63 DCHECK(!b->IsValid());
64
65 bool success = false;
66
67 // Create two manually resettable events and give each party a handle
68 // to both events.
69 HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL);
70 HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL);
71 if (event_a && event_b) {
72 a->mine_.Set(event_a);
73 a->other_.Set(event_b);
74 success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b);
75 if (success) {
76 b->mine_.Set(event_b);
77 b->other_.Set(event_a);
78 } else {
79 a->mine_.Close();
80 a->other_.Close();
81 }
82 } else {
83 if (event_a)
84 ::CloseHandle(event_a);
85 if (event_b)
86 ::CloseHandle(event_b);
87 }
88
89 DCHECK(!success || a->IsValid());
90 DCHECK(!success || b->IsValid());
91
92 return success;
93 }
94
95 namespace {
96 class ExtraWaitThread : public base::PlatformThread::Delegate {
97 public:
98 ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count,
99 int* signaled_event)
100 : stop_(stop), events_(events), count_(count),
101 signaled_event_(signaled_event) {
102 *signaled_event_ = -1;
103 }
104 virtual ~ExtraWaitThread() {}
105
106 virtual void ThreadMain() OVERRIDE {
107 // Store the |stop_| event as the first event.
108 HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ };
109 HANDLE next_thread = NULL;
110 DWORD event_count = MAXIMUM_WAIT_OBJECTS;
111 int thread_signaled_event = -1;
112 scoped_ptr<ExtraWaitThread> extra_wait_thread;
113 if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) {
114 std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]);
115
116 extra_wait_thread.reset(new ExtraWaitThread(stop_,
117 &events_[MAXIMUM_WAIT_OBJECTS - 2],
118 count_ - (MAXIMUM_WAIT_OBJECTS - 2),
119 &thread_signaled_event));
120 base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread);
121
122 event_count = MAXIMUM_WAIT_OBJECTS;
123 events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread;
124 } else {
125 std::copy(&events_[0], &events_[count_], &events[1]);
126 event_count = count_ + 1;
127 }
128
129 DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE,
130 INFINITE);
131 if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) {
132 wait -= WAIT_OBJECT_0;
133 if (wait == 0) {
134 // The stop event was signaled. Check if it was signaled by a
135 // sub thread. In case our sub thread had to spin another thread (and
136 // so on), we must wait for ours to exit before we can check the
137 // propagated event offset.
138 if (next_thread) {
139 base::PlatformThread::Join(next_thread);
140 next_thread = NULL;
141 }
142 if (thread_signaled_event != -1)
143 *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2);
144 } else if (events[wait] == next_thread) {
145 NOTREACHED();
146 } else {
147 *signaled_event_ = static_cast<int>(wait);
148 SetEvent(stop_);
149 }
150 } else {
151 NOTREACHED();
152 }
153
154 if (next_thread)
155 base::PlatformThread::Join(next_thread);
156 }
157
158 private:
159 HANDLE stop_;
160 HANDLE* events_;
161 size_t count_;
162 int* signaled_event_;
163 DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread);
164 };
165 } // end namespace
166
167 // static
168 int SharedMemSynchronizer::WaitMultiple(SharedMemSynchronizer* synchronizers,
169 size_t count,
170 size_t last_signaled) {
171 DCHECK(last_signaled < count);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto here+below
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
172 #ifndef NDEBUG
173 for (size_t i = 0; i < count; ++i)
174 DCHECK(synchronizers[i].IsValid());
175 #endif
176
177 int ret = -1;
178
179 // TODO(tommi): Should we wait in an alertable state so that we can be
180 // canceled via an APC?
181 scoped_array<HANDLE> handles(new HANDLE[count]);
182
183 // Because of the way WaitForMultipleObjects works, we do a little trick here.
184 // When multiple events are signaled, WaitForMultipleObjects will return the
185 // index of the first signaled item (lowest). This means that if we always
186 // pass the array the same way to WaitForMultipleObjects, the objects that
187 // come first, have higher priority. In times of heavy load, this will cause
188 // elements at the back to become DOS-ed.
189 // So, we store the location of the item that was last signaled. Then we split
190 // up the array and move everything higher than the last signaled index to the
191 // front and the rest to the back (meaning that the last signaled item will
192 // become the last element in the list).
193 // Assuming equally busy events, this approach distributes the priority
scherkus (not reviewing) 2012/03/08 02:37:08 sanity check: aren't we using the same trick in po
tommi (sloooow) - chröme 2012/03/08 16:10:57 Yes. There's one very subtle difference which is
194 // evenly.
195
196 size_t index = 0;
197 for (size_t i = last_signaled + 1; i < count; ++i)
198 handles[index++] = synchronizers[i].other_;
199
200 for (size_t i = 0; i <= last_signaled; ++i)
201 handles[index++] = synchronizers[i].other_;
202 DCHECK_EQ(index, count);
203
204 DWORD wait = WAIT_FAILED;
205 bool wait_failed = false;
206 if (count <= MAXIMUM_WAIT_OBJECTS) {
scherkus (not reviewing) 2012/03/08 02:37:08 stepping back a bit is this something we want/need
tommi (sloooow) - chröme 2012/03/08 16:10:57 Right! So, I thought quite a bit about all of thi
207 wait = ::WaitForMultipleObjects(count, handles.get(), FALSE, INFINITE);
208 wait_failed = wait < WAIT_OBJECT_0 ||
209 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
210 } else {
211 // Used to stop the other wait threads when an event has been signaled.
212 base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL));
213
214 // Create the first thread and pass a pointer to all handles >63
215 // to the thread + 'stop'. Then implement the thread so that it checks
216 // if the number of handles is > 63. If so, spawns a new thread and
217 // passes >62 handles to that thread and waits for the 62 handles + stop +
218 // next thread. etc etc.
scherkus (not reviewing) 2012/03/08 02:37:08 bit confusing to read perhaps reword to include s
tommi (sloooow) - chröme 2012/03/08 16:10:57 I realize now that this comment is a note I left t
219
220 int thread_signaled_event = -1;
221 ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1],
222 count - (MAXIMUM_WAIT_OBJECTS - 1), &thread_signaled_event);
223 base::PlatformThreadHandle thread;
224 base::PlatformThread::Create(0, &wait_thread, &thread);
225 HANDLE events[MAXIMUM_WAIT_OBJECTS];
226 std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]);
227 events[MAXIMUM_WAIT_OBJECTS - 1] = thread;
228 wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE,
229 INFINITE);
230 wait_failed = wait < WAIT_OBJECT_0 ||
231 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
232 if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) {
233 if (thread_signaled_event < 0) {
234 wait_failed = true;
235 NOTREACHED();
236 } else {
237 wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) +
238 thread_signaled_event;
239 }
240 } else {
241 ::SetEvent(stop);
242 }
243 base::PlatformThread::Join(thread);
244 }
245
246 if (!wait_failed) {
247 // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0).
248 wait -= WAIT_OBJECT_0;
249 ::ResetEvent(handles[wait]);
250 ret = (wait + last_signaled + 1) % count;
251 DCHECK_EQ(handles[wait], synchronizers[ret].other_.Get());
252 } else {
253 NOTREACHED();
scherkus (not reviewing) 2012/03/08 02:37:08 LOG? how severe is this? note that in release mo
tommi (sloooow) - chröme 2012/03/08 16:10:57 These are just sanity checks. If they fail it's b
254 }
255
256 DCHECK_NE(ret, -1);
257 return ret;
258 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698