Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(193)

Side by Side Diff: media/audio/cross_process_notification_win.cc

Issue 9605015: Add a SharedMemSynchronizer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Disabled two tests before relanding Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « media/audio/cross_process_notification_unittest.cc ('k') | media/media.gyp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/audio/cross_process_notification.h"
6
7 #include "base/logging.h"
8 #include "base/memory/scoped_ptr.h"
9 #include "base/threading/platform_thread.h"
10 #include "base/win/scoped_handle.h"
11
12 CrossProcessNotification::~CrossProcessNotification() {}
13
14 CrossProcessNotification::CrossProcessNotification(IPCHandle handle_1,
15 IPCHandle handle_2)
16 : mine_(handle_1), other_(handle_2) {
17 DCHECK(IsValid());
18 }
19
20 void CrossProcessNotification::Signal() {
21 DCHECK(IsValid());
22 DCHECK_EQ(::WaitForSingleObject(mine_, 0), static_cast<DWORD>(WAIT_TIMEOUT))
23 << "Are you calling Signal() without calling Wait() first?";
24 BOOL ok = ::SetEvent(mine_);
25 CHECK(ok);
26 }
27
28 void CrossProcessNotification::Wait() {
29 DCHECK(IsValid());
30 DWORD wait = ::WaitForSingleObject(other_, INFINITE);
31 DCHECK_EQ(wait, WAIT_OBJECT_0);
32 BOOL ok = ::ResetEvent(other_);
33 CHECK(ok);
34 }
35
36 bool CrossProcessNotification::IsValid() const {
37 return mine_.IsValid() && other_.IsValid();
38 }
39
40 bool CrossProcessNotification::ShareToProcess(base::ProcessHandle process,
41 IPCHandle* handle_1,
42 IPCHandle* handle_2) {
43 DCHECK(IsValid());
44 HANDLE our_process = ::GetCurrentProcess();
45 if (!::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE,
46 DUPLICATE_SAME_ACCESS)) {
47 return false;
48 }
49
50 if (!::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE,
51 DUPLICATE_SAME_ACCESS)) {
52 // In case we're sharing to ourselves, we can close the handle, but
53 // if the target process is a different process, we do nothing.
54 if (process == our_process)
55 ::CloseHandle(*handle_1);
56 *handle_1 = NULL;
57 return false;
58 }
59
60 return true;
61 }
62
63 // static
64 bool CrossProcessNotification::InitializePair(CrossProcessNotification* a,
65 CrossProcessNotification* b) {
66 DCHECK(!a->IsValid());
67 DCHECK(!b->IsValid());
68
69 bool success = false;
70
71 // Create two manually resettable events and give each party a handle
72 // to both events.
73 HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL);
74 HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL);
75 if (event_a && event_b) {
76 a->mine_.Set(event_a);
77 a->other_.Set(event_b);
78 success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b);
79 if (success) {
80 b->mine_.Set(event_b);
81 b->other_.Set(event_a);
82 } else {
83 a->mine_.Close();
84 a->other_.Close();
85 }
86 } else {
87 if (event_a)
88 ::CloseHandle(event_a);
89 if (event_b)
90 ::CloseHandle(event_b);
91 }
92
93 DCHECK(!success || a->IsValid());
94 DCHECK(!success || b->IsValid());
95
96 return success;
97 }
98
99 namespace {
100 class ExtraWaitThread : public base::PlatformThread::Delegate {
101 public:
102 ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count,
103 int* signaled_event)
104 : stop_(stop), events_(events), count_(count),
105 signaled_event_(signaled_event) {
106 *signaled_event_ = -1;
107 }
108 virtual ~ExtraWaitThread() {}
109
110 virtual void ThreadMain() OVERRIDE {
111 // Store the |stop_| event as the first event.
112 HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ };
113 HANDLE next_thread = NULL;
114 DWORD event_count = MAXIMUM_WAIT_OBJECTS;
115 int thread_signaled_event = -1;
116 scoped_ptr<ExtraWaitThread> extra_wait_thread;
117 if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) {
118 std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]);
119
120 extra_wait_thread.reset(new ExtraWaitThread(stop_,
121 &events_[MAXIMUM_WAIT_OBJECTS - 2],
122 count_ - (MAXIMUM_WAIT_OBJECTS - 2),
123 &thread_signaled_event));
124 base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread);
125
126 event_count = MAXIMUM_WAIT_OBJECTS;
127 events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread;
128 } else {
129 std::copy(&events_[0], &events_[count_], &events[1]);
130 event_count = count_ + 1;
131 }
132
133 DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE,
134 INFINITE);
135 if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) {
136 wait -= WAIT_OBJECT_0;
137 if (wait == 0) {
138 // The stop event was signaled. Check if it was signaled by a
139 // sub thread. In case our sub thread had to spin another thread (and
140 // so on), we must wait for ours to exit before we can check the
141 // propagated event offset.
142 if (next_thread) {
143 base::PlatformThread::Join(next_thread);
144 next_thread = NULL;
145 }
146 if (thread_signaled_event != -1)
147 *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2);
148 } else if (events[wait] == next_thread) {
149 NOTREACHED();
150 } else {
151 *signaled_event_ = static_cast<int>(wait);
152 SetEvent(stop_);
153 }
154 } else {
155 NOTREACHED();
156 }
157
158 if (next_thread)
159 base::PlatformThread::Join(next_thread);
160 }
161
162 private:
163 HANDLE stop_;
164 HANDLE* events_;
165 size_t count_;
166 int* signaled_event_;
167 DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread);
168 };
169 } // end namespace
170
171 // static
172 int CrossProcessNotification::WaitMultiple(const Notifications& notifications,
173 size_t wait_offset) {
174 DCHECK_LT(wait_offset, notifications.size());
175
176 for (size_t i = 0; i < notifications.size(); ++i) {
177 DCHECK(notifications[i]->IsValid());
178 }
179
180 // TODO(tommi): Should we wait in an alertable state so that we can be
181 // canceled via an APC?
182 scoped_array<HANDLE> handles(new HANDLE[notifications.size()]);
183
184 // Because of the way WaitForMultipleObjects works, we do a little trick here.
185 // When multiple events are signaled, WaitForMultipleObjects will return the
186 // index of the first signaled item (lowest). This means that if we always
187 // pass the array the same way to WaitForMultipleObjects, the objects that
188 // come first, have higher priority. In times of heavy load, this will cause
189 // elements at the back to become DOS-ed.
190 // So, we store the location of the item that was last signaled. Then we split
191 // up the array and move everything higher than the last signaled index to the
192 // front and the rest to the back (meaning that the last signaled item will
193 // become the last element in the list).
194 // Assuming equally busy events, this approach distributes the priority
195 // evenly.
196
197 size_t index = 0;
198 for (size_t i = wait_offset; i < notifications.size(); ++i)
199 handles[index++] = notifications[i]->other_;
200
201 for (size_t i = 0; i < wait_offset; ++i)
202 handles[index++] = notifications[i]->other_;
203 DCHECK_EQ(index, notifications.size());
204
205 DWORD wait = WAIT_FAILED;
206 bool wait_failed = false;
207 if (notifications.size() <= MAXIMUM_WAIT_OBJECTS) {
208 wait = ::WaitForMultipleObjects(notifications.size(), &handles[0], FALSE,
209 INFINITE);
210 wait_failed = wait < WAIT_OBJECT_0 ||
211 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
212 } else {
213 // Used to stop the other wait threads when an event has been signaled.
214 base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL));
215
216 // Create the first thread and pass a pointer to all handles >63
217 // to the thread + 'stop'. Then implement the thread so that it checks
218 // if the number of handles is > 63. If so, spawns a new thread and
219 // passes >62 handles to that thread and waits for the 62 handles + stop +
220 // next thread. etc etc.
221
222 // Create a list of threads so that each thread waits on at most 62 events
223 // including one event for when a child thread signals completion and one
224 // event for when all of the threads must be stopped (due to some event
225 // being signaled).
226
227 int thread_signaled_event = -1;
228 ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1],
229 notifications.size() - (MAXIMUM_WAIT_OBJECTS - 1),
230 &thread_signaled_event);
231 base::PlatformThreadHandle thread;
232 base::PlatformThread::Create(0, &wait_thread, &thread);
233 HANDLE events[MAXIMUM_WAIT_OBJECTS];
234 std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]);
235 events[MAXIMUM_WAIT_OBJECTS - 1] = thread;
236 wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE,
237 INFINITE);
238 wait_failed = wait < WAIT_OBJECT_0 ||
239 wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
240 if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) {
241 if (thread_signaled_event < 0) {
242 wait_failed = true;
243 NOTREACHED();
244 } else {
245 wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) +
246 thread_signaled_event;
247 }
248 } else {
249 ::SetEvent(stop);
250 }
251 base::PlatformThread::Join(thread);
252 }
253
254 int ret = -1;
255 if (!wait_failed) {
256 // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0).
257 wait -= WAIT_OBJECT_0;
258 BOOL ok = ::ResetEvent(handles[wait]);
259 CHECK(ok);
260 ret = (wait + wait_offset) % notifications.size();
261 DCHECK_EQ(handles[wait], notifications[ret]->other_.Get());
262 } else {
263 NOTREACHED();
264 }
265
266 CHECK_NE(ret, -1);
267 return ret;
268 }
OLDNEW
« no previous file with comments | « media/audio/cross_process_notification_unittest.cc ('k') | media/media.gyp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698