Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(432)

Unified Diff: media/audio/win/shared_mem_synchronizer_win.cc

Issue 9605015: Add a SharedMemSynchronizer class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: media/audio/win/shared_mem_synchronizer_win.cc
diff --git a/media/audio/win/shared_mem_synchronizer_win.cc b/media/audio/win/shared_mem_synchronizer_win.cc
new file mode 100644
index 0000000000000000000000000000000000000000..a096cdd866889c5aad43f8d992333530c96ef45c
--- /dev/null
+++ b/media/audio/win/shared_mem_synchronizer_win.cc
@@ -0,0 +1,258 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "media/audio/shared_mem_synchronizer.h"
scherkus (not reviewing) 2012/03/08 02:37:08 blank line above
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/threading/platform_thread.h"
+#include "base/win/scoped_handle.h"
+
+SharedMemSynchronizer::~SharedMemSynchronizer() {
+}
+
+SharedMemSynchronizer::SharedMemSynchronizer(IPCHandle handle_1,
+ IPCHandle handle_2)
+ : mine_(handle_1), other_(handle_2) {
+ DCHECK(IsValid());
+}
+
+void SharedMemSynchronizer::Signal() {
+ DCHECK(IsValid());
+ DCHECK(::WaitForSingleObject(mine_, 0) == WAIT_TIMEOUT);
scherkus (not reviewing) 2012/03/08 02:37:08 can we DCHECK_EQ it up here + below? also is this
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+ ::SetEvent(mine_);
scherkus (not reviewing) 2012/03/08 02:37:08 FYI this could fail -- but not sure what we'd do t
tommi (sloooow) - chröme 2012/03/08 16:10:57 Paranoid is good in this case, so Done.
+}
+
+void SharedMemSynchronizer::Wait() {
+ DCHECK(IsValid());
+ DWORD wait = ::WaitForSingleObject(other_, INFINITE);
+ DCHECK(wait == WAIT_OBJECT_0);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+ ::ResetEvent(other_);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto on failing here + way below
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+}
+
+bool SharedMemSynchronizer::IsValid() const {
+ return mine_.IsValid() && other_.IsValid();
+}
+
+bool SharedMemSynchronizer::ShareToProcess(base::ProcessHandle process,
+ IPCHandle* handle_1,
+ IPCHandle* handle_2) {
+ DCHECK(IsValid());
+ bool success = false;
+ HANDLE our_process = ::GetCurrentProcess();
+ if (::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE,
scherkus (not reviewing) 2012/03/08 02:37:08 nit: return early on fail instead of nesting ifs a
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+ DUPLICATE_SAME_ACCESS)) {
+ if (::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE,
+ DUPLICATE_SAME_ACCESS)) {
+ success = true;
+ } else {
+ // In case we're sharing to ourselves, we can close the handle, but
+ // if the target process is a different process, we do nothing.
+ if (process == our_process)
+ ::CloseHandle(*handle_1);
+ *handle_1 = NULL;
+ }
+ }
+ return success;
+}
+
+// static
+bool SharedMemSynchronizer::Create(SharedMemSynchronizer* a,
+ SharedMemSynchronizer* b) {
+ DCHECK(!a->IsValid());
+ DCHECK(!b->IsValid());
+
+ bool success = false;
+
+ // Create two manually resettable events and give each party a handle
+ // to both events.
+ HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL);
+ HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (event_a && event_b) {
+ a->mine_.Set(event_a);
+ a->other_.Set(event_b);
+ success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b);
+ if (success) {
+ b->mine_.Set(event_b);
+ b->other_.Set(event_a);
+ } else {
+ a->mine_.Close();
+ a->other_.Close();
+ }
+ } else {
+ if (event_a)
+ ::CloseHandle(event_a);
+ if (event_b)
+ ::CloseHandle(event_b);
+ }
+
+ DCHECK(!success || a->IsValid());
+ DCHECK(!success || b->IsValid());
+
+ return success;
+}
+
+namespace {
+class ExtraWaitThread : public base::PlatformThread::Delegate {
+ public:
+ ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count,
+ int* signaled_event)
+ : stop_(stop), events_(events), count_(count),
+ signaled_event_(signaled_event) {
+ *signaled_event_ = -1;
+ }
+ virtual ~ExtraWaitThread() {}
+
+ virtual void ThreadMain() OVERRIDE {
+ // Store the |stop_| event as the first event.
+ HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ };
+ HANDLE next_thread = NULL;
+ DWORD event_count = MAXIMUM_WAIT_OBJECTS;
+ int thread_signaled_event = -1;
+ scoped_ptr<ExtraWaitThread> extra_wait_thread;
+ if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) {
+ std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]);
+
+ extra_wait_thread.reset(new ExtraWaitThread(stop_,
+ &events_[MAXIMUM_WAIT_OBJECTS - 2],
+ count_ - (MAXIMUM_WAIT_OBJECTS - 2),
+ &thread_signaled_event));
+ base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread);
+
+ event_count = MAXIMUM_WAIT_OBJECTS;
+ events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread;
+ } else {
+ std::copy(&events_[0], &events_[count_], &events[1]);
+ event_count = count_ + 1;
+ }
+
+ DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE,
+ INFINITE);
+ if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) {
+ wait -= WAIT_OBJECT_0;
+ if (wait == 0) {
+ // The stop event was signaled. Check if it was signaled by a
+ // sub thread. In case our sub thread had to spin another thread (and
+ // so on), we must wait for ours to exit before we can check the
+ // propagated event offset.
+ if (next_thread) {
+ base::PlatformThread::Join(next_thread);
+ next_thread = NULL;
+ }
+ if (thread_signaled_event != -1)
+ *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2);
+ } else if (events[wait] == next_thread) {
+ NOTREACHED();
+ } else {
+ *signaled_event_ = static_cast<int>(wait);
+ SetEvent(stop_);
+ }
+ } else {
+ NOTREACHED();
+ }
+
+ if (next_thread)
+ base::PlatformThread::Join(next_thread);
+ }
+
+ private:
+ HANDLE stop_;
+ HANDLE* events_;
+ size_t count_;
+ int* signaled_event_;
+ DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread);
+};
+} // end namespace
+
+// static
+int SharedMemSynchronizer::WaitMultiple(SharedMemSynchronizer* synchronizers,
+ size_t count,
+ size_t last_signaled) {
+ DCHECK(last_signaled < count);
scherkus (not reviewing) 2012/03/08 02:37:08 ditto here+below
tommi (sloooow) - chröme 2012/03/08 16:10:57 Done.
+#ifndef NDEBUG
+ for (size_t i = 0; i < count; ++i)
+ DCHECK(synchronizers[i].IsValid());
+#endif
+
+ int ret = -1;
+
+ // TODO(tommi): Should we wait in an alertable state so that we can be
+ // canceled via an APC?
+ scoped_array<HANDLE> handles(new HANDLE[count]);
+
+ // Because of the way WaitForMultipleObjects works, we do a little trick here.
+ // When multiple events are signaled, WaitForMultipleObjects will return the
+ // index of the first signaled item (lowest). This means that if we always
+ // pass the array the same way to WaitForMultipleObjects, the objects that
+ // come first, have higher priority. In times of heavy load, this will cause
+ // elements at the back to become DOS-ed.
+ // So, we store the location of the item that was last signaled. Then we split
+ // up the array and move everything higher than the last signaled index to the
+ // front and the rest to the back (meaning that the last signaled item will
+ // become the last element in the list).
+ // Assuming equally busy events, this approach distributes the priority
scherkus (not reviewing) 2012/03/08 02:37:08 sanity check: aren't we using the same trick in po
tommi (sloooow) - chröme 2012/03/08 16:10:57 Yes. There's one very subtle difference which is
+ // evenly.
+
+ size_t index = 0;
+ for (size_t i = last_signaled + 1; i < count; ++i)
+ handles[index++] = synchronizers[i].other_;
+
+ for (size_t i = 0; i <= last_signaled; ++i)
+ handles[index++] = synchronizers[i].other_;
+ DCHECK_EQ(index, count);
+
+ DWORD wait = WAIT_FAILED;
+ bool wait_failed = false;
+ if (count <= MAXIMUM_WAIT_OBJECTS) {
scherkus (not reviewing) 2012/03/08 02:37:08 stepping back a bit is this something we want/need
tommi (sloooow) - chröme 2012/03/08 16:10:57 Right! So, I thought quite a bit about all of thi
+ wait = ::WaitForMultipleObjects(count, handles.get(), FALSE, INFINITE);
+ wait_failed = wait < WAIT_OBJECT_0 ||
+ wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
+ } else {
+ // Used to stop the other wait threads when an event has been signaled.
+ base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL));
+
+ // Create the first thread and pass a pointer to all handles >63
+ // to the thread + 'stop'. Then implement the thread so that it checks
+ // if the number of handles is > 63. If so, spawns a new thread and
+ // passes >62 handles to that thread and waits for the 62 handles + stop +
+ // next thread. etc etc.
scherkus (not reviewing) 2012/03/08 02:37:08 bit confusing to read perhaps reword to include s
tommi (sloooow) - chröme 2012/03/08 16:10:57 I realize now that this comment is a note I left t
+
+ int thread_signaled_event = -1;
+ ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1],
+ count - (MAXIMUM_WAIT_OBJECTS - 1), &thread_signaled_event);
+ base::PlatformThreadHandle thread;
+ base::PlatformThread::Create(0, &wait_thread, &thread);
+ HANDLE events[MAXIMUM_WAIT_OBJECTS];
+ std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]);
+ events[MAXIMUM_WAIT_OBJECTS - 1] = thread;
+ wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE,
+ INFINITE);
+ wait_failed = wait < WAIT_OBJECT_0 ||
+ wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
+ if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) {
+ if (thread_signaled_event < 0) {
+ wait_failed = true;
+ NOTREACHED();
+ } else {
+ wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) +
+ thread_signaled_event;
+ }
+ } else {
+ ::SetEvent(stop);
+ }
+ base::PlatformThread::Join(thread);
+ }
+
+ if (!wait_failed) {
+ // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0).
+ wait -= WAIT_OBJECT_0;
+ ::ResetEvent(handles[wait]);
+ ret = (wait + last_signaled + 1) % count;
+ DCHECK_EQ(handles[wait], synchronizers[ret].other_.Get());
+ } else {
+ NOTREACHED();
scherkus (not reviewing) 2012/03/08 02:37:08 LOG? how severe is this? note that in release mo
tommi (sloooow) - chröme 2012/03/08 16:10:57 These are just sanity checks. If they fail it's b
+ }
+
+ DCHECK_NE(ret, -1);
+ return ret;
+}

Powered by Google App Engine
This is Rietveld 408576698