Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(669)

Side by Side Diff: dbus/bus.cc

Issue 12255043: DBus: Use TaskRunners instead of MessageLoopProxies. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: rebase Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "dbus/bus.h" 5 #include "dbus/bus.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop.h" 9 #include "base/message_loop.h"
10 #include "base/message_loop_proxy.h" 10 #include "base/message_loop_proxy.h"
11 #include "base/stl_util.h" 11 #include "base/stl_util.h"
12 #include "base/threading/thread.h" 12 #include "base/threading/thread.h"
13 #include "base/threading/thread_restrictions.h" 13 #include "base/threading/thread_restrictions.h"
14 #include "base/time.h" 14 #include "base/time.h"
15 #include "dbus/exported_object.h" 15 #include "dbus/exported_object.h"
16 #include "dbus/object_path.h"
17 #include "dbus/object_proxy.h" 16 #include "dbus/object_proxy.h"
18 #include "dbus/scoped_dbus_error.h" 17 #include "dbus/scoped_dbus_error.h"
19 18
20 namespace dbus { 19 namespace dbus {
21 20
22 namespace { 21 namespace {
23 22
24 const char kDisconnectedSignal[] = "Disconnected"; 23 const char kDisconnectedSignal[] = "Disconnected";
25 const char kDisconnectedMatchRule[] = 24 const char kDisconnectedMatchRule[] =
26 "type='signal', path='/org/freedesktop/DBus/Local'," 25 "type='signal', path='/org/freedesktop/DBus/Local',"
27 "interface='org.freedesktop.DBus.Local', member='Disconnected'"; 26 "interface='org.freedesktop.DBus.Local', member='Disconnected'";
28 27
29 // The class is used for watching the file descriptor used for D-Bus 28 // The class is used for watching the file descriptor used for D-Bus
30 // communication. 29 // communication.
31 class Watch : public base::MessagePumpLibevent::Watcher { 30 class Watch : public base::MessagePumpLibevent::Watcher {
32 public: 31 public:
33 Watch(DBusWatch* watch) 32 explicit Watch(DBusWatch* watch)
34 : raw_watch_(watch) { 33 : raw_watch_(watch) {
35 dbus_watch_set_data(raw_watch_, this, NULL); 34 dbus_watch_set_data(raw_watch_, this, NULL);
36 } 35 }
37 36
38 virtual ~Watch() { 37 virtual ~Watch() {
39 dbus_watch_set_data(raw_watch_, NULL, NULL); 38 dbus_watch_set_data(raw_watch_, NULL, NULL);
40 } 39 }
41 40
42 // Returns true if the underlying file descriptor is ready to be watched. 41 // Returns true if the underlying file descriptor is ready to be watched.
43 bool IsReadyToBeWatched() { 42 bool IsReadyToBeWatched() {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
94 // The class is used for monitoring the timeout used for D-Bus method 93 // The class is used for monitoring the timeout used for D-Bus method
95 // calls. 94 // calls.
96 // 95 //
97 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 96 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
98 // the object is is alive when HandleTimeout() is called. It's unlikely 97 // the object is is alive when HandleTimeout() is called. It's unlikely
99 // but it may be possible that HandleTimeout() is called after 98 // but it may be possible that HandleTimeout() is called after
100 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 99 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
101 // Bus::OnRemoveTimeout(). 100 // Bus::OnRemoveTimeout().
102 class Timeout : public base::RefCountedThreadSafe<Timeout> { 101 class Timeout : public base::RefCountedThreadSafe<Timeout> {
103 public: 102 public:
104 Timeout(DBusTimeout* timeout) 103 explicit Timeout(DBusTimeout* timeout)
105 : raw_timeout_(timeout), 104 : raw_timeout_(timeout),
106 monitoring_is_active_(false), 105 monitoring_is_active_(false),
107 is_completed(false) { 106 is_completed(false) {
108 dbus_timeout_set_data(raw_timeout_, this, NULL); 107 dbus_timeout_set_data(raw_timeout_, this, NULL);
109 AddRef(); // Balanced on Complete(). 108 AddRef(); // Balanced on Complete().
110 } 109 }
111 110
112 // Returns true if the timeout is ready to be monitored. 111 // Returns true if the timeout is ready to be monitored.
113 bool IsReadyToBeMonitored() { 112 bool IsReadyToBeMonitored() {
114 return dbus_timeout_get_enabled(raw_timeout_); 113 return dbus_timeout_get_enabled(raw_timeout_);
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 : bus_type(SESSION), 173 : bus_type(SESSION),
175 connection_type(PRIVATE) { 174 connection_type(PRIVATE) {
176 } 175 }
177 176
178 Bus::Options::~Options() { 177 Bus::Options::~Options() {
179 } 178 }
180 179
181 Bus::Bus(const Options& options) 180 Bus::Bus(const Options& options)
182 : bus_type_(options.bus_type), 181 : bus_type_(options.bus_type),
183 connection_type_(options.connection_type), 182 connection_type_(options.connection_type),
184 dbus_thread_message_loop_proxy_(options.dbus_thread_message_loop_proxy), 183 dbus_task_runner_(options.dbus_task_runner),
185 on_shutdown_(false /* manual_reset */, false /* initially_signaled */), 184 on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
186 connection_(NULL), 185 connection_(NULL),
187 origin_thread_id_(base::PlatformThread::CurrentId()), 186 origin_thread_id_(base::PlatformThread::CurrentId()),
188 async_operations_set_up_(false), 187 async_operations_set_up_(false),
189 shutdown_completed_(false), 188 shutdown_completed_(false),
190 num_pending_watches_(0), 189 num_pending_watches_(0),
191 num_pending_timeouts_(0), 190 num_pending_timeouts_(0),
192 address_(options.address), 191 address_(options.address),
193 on_disconnected_closure_(options.disconnected_callback) { 192 on_disconnected_closure_(options.disconnected_callback) {
194 // This is safe to call multiple times. 193 // This is safe to call multiple times.
195 dbus_threads_init_default(); 194 dbus_threads_init_default();
196 // The origin message loop is unnecessary if the client uses synchronous 195 // The origin message loop is unnecessary if the client uses synchronous
197 // functions only. 196 // functions only.
198 if (MessageLoop::current()) 197 if (MessageLoop::current())
199 origin_message_loop_proxy_ = MessageLoop::current()->message_loop_proxy(); 198 origin_task_runner_ = MessageLoop::current()->message_loop_proxy();
200 } 199 }
201 200
202 Bus::~Bus() { 201 Bus::~Bus() {
203 DCHECK(!connection_); 202 DCHECK(!connection_);
204 DCHECK(owned_service_names_.empty()); 203 DCHECK(owned_service_names_.empty());
205 DCHECK(match_rules_added_.empty()); 204 DCHECK(match_rules_added_.empty());
206 DCHECK(filter_functions_added_.empty()); 205 DCHECK(filter_functions_added_.empty());
207 DCHECK(registered_object_paths_.empty()); 206 DCHECK(registered_object_paths_.empty());
208 DCHECK_EQ(0, num_pending_watches_); 207 DCHECK_EQ(0, num_pending_watches_);
209 // TODO(satorux): This check fails occasionally in browser_tests for tests 208 // TODO(satorux): This check fails occasionally in browser_tests for tests
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 // GetExportedObject() call to return a new object, rather than this one. 300 // GetExportedObject() call to return a new object, rather than this one.
302 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 301 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
303 if (iter == exported_object_table_.end()) 302 if (iter == exported_object_table_.end())
304 return; 303 return;
305 304
306 scoped_refptr<ExportedObject> exported_object = iter->second; 305 scoped_refptr<ExportedObject> exported_object = iter->second;
307 exported_object_table_.erase(iter); 306 exported_object_table_.erase(iter);
308 307
309 // Post the task to perform the final unregistration to the D-Bus thread. 308 // Post the task to perform the final unregistration to the D-Bus thread.
310 // Since the registration also happens on the D-Bus thread in 309 // Since the registration also happens on the D-Bus thread in
311 // TryRegisterObjectPath(), and the message loop proxy we post to is a 310 // TryRegisterObjectPath(), and the task runner we post to is a
312 // MessageLoopProxy which inherits from SequencedTaskRunner, there is a 311 // SequencedTaskRunner, there is a guarantee that this will happen before any
313 // guarantee that this will happen before any future registration call. 312 // future registration call.
314 PostTaskToDBusThread(FROM_HERE, base::Bind( 313 PostTaskToDBusThread(FROM_HERE,
315 &Bus::UnregisterExportedObjectInternal, 314 base::Bind(&Bus::UnregisterExportedObjectInternal,
316 this, exported_object)); 315 this, exported_object));
317 } 316 }
318 317
319 void Bus::UnregisterExportedObjectInternal( 318 void Bus::UnregisterExportedObjectInternal(
320 scoped_refptr<dbus::ExportedObject> exported_object) { 319 scoped_refptr<dbus::ExportedObject> exported_object) {
321 AssertOnDBusThread(); 320 AssertOnDBusThread();
322 321
323 exported_object->Unregister(); 322 exported_object->Unregister();
324 } 323 }
325 324
326 bool Bus::Connect() { 325 bool Bus::Connect() {
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
431 // dbus_connection_close() won't unref. 430 // dbus_connection_close() won't unref.
432 dbus_connection_unref(connection_); 431 dbus_connection_unref(connection_);
433 } 432 }
434 433
435 connection_ = NULL; 434 connection_ = NULL;
436 shutdown_completed_ = true; 435 shutdown_completed_ = true;
437 } 436 }
438 437
439 void Bus::ShutdownOnDBusThreadAndBlock() { 438 void Bus::ShutdownOnDBusThreadAndBlock() {
440 AssertOnOriginThread(); 439 AssertOnOriginThread();
441 DCHECK(dbus_thread_message_loop_proxy_.get()); 440 DCHECK(dbus_task_runner_.get());
442 441
443 PostTaskToDBusThread(FROM_HERE, base::Bind( 442 PostTaskToDBusThread(FROM_HERE, base::Bind(
444 &Bus::ShutdownOnDBusThreadAndBlockInternal, 443 &Bus::ShutdownOnDBusThreadAndBlockInternal,
445 this)); 444 this));
446 445
447 // http://crbug.com/125222 446 // http://crbug.com/125222
448 base::ThreadRestrictions::ScopedAllowWait allow_wait; 447 base::ThreadRestrictions::ScopedAllowWait allow_wait;
449 448
450 // Wait until the shutdown is complete on the D-Bus thread. 449 // Wait until the shutdown is complete on the D-Bus thread.
451 // The shutdown should not hang, but set timeout just in case. 450 // The shutdown should not hang, but set timeout just in case.
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after
735 // (crbug.com/174431) 734 // (crbug.com/174431)
736 if (dbus_connection_get_dispatch_status(connection_) == 735 if (dbus_connection_get_dispatch_status(connection_) ==
737 DBUS_DISPATCH_DATA_REMAINS) { 736 DBUS_DISPATCH_DATA_REMAINS) {
738 while (dbus_connection_dispatch(connection_) == 737 while (dbus_connection_dispatch(connection_) ==
739 DBUS_DISPATCH_DATA_REMAINS); 738 DBUS_DISPATCH_DATA_REMAINS);
740 } 739 }
741 } 740 }
742 741
743 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, 742 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
744 const base::Closure& task) { 743 const base::Closure& task) {
745 DCHECK(origin_message_loop_proxy_.get()); 744 DCHECK(origin_task_runner_.get());
746 if (!origin_message_loop_proxy_->PostTask(from_here, task)) { 745 if (!origin_task_runner_->PostTask(from_here, task)) {
747 LOG(WARNING) << "Failed to post a task to the origin message loop"; 746 LOG(WARNING) << "Failed to post a task to the origin message loop";
748 } 747 }
749 } 748 }
750 749
751 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, 750 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
752 const base::Closure& task) { 751 const base::Closure& task) {
753 if (dbus_thread_message_loop_proxy_.get()) { 752 if (dbus_task_runner_.get()) {
754 if (!dbus_thread_message_loop_proxy_->PostTask(from_here, task)) { 753 if (!dbus_task_runner_->PostTask(from_here, task)) {
755 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 754 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
756 } 755 }
757 } else { 756 } else {
758 DCHECK(origin_message_loop_proxy_.get()); 757 DCHECK(origin_task_runner_.get());
759 if (!origin_message_loop_proxy_->PostTask(from_here, task)) { 758 if (!origin_task_runner_->PostTask(from_here, task)) {
760 LOG(WARNING) << "Failed to post a task to the origin message loop"; 759 LOG(WARNING) << "Failed to post a task to the origin message loop";
761 } 760 }
762 } 761 }
763 } 762 }
764 763
765 void Bus::PostDelayedTaskToDBusThread( 764 void Bus::PostDelayedTaskToDBusThread(
766 const tracked_objects::Location& from_here, 765 const tracked_objects::Location& from_here,
767 const base::Closure& task, 766 const base::Closure& task,
768 base::TimeDelta delay) { 767 base::TimeDelta delay) {
769 if (dbus_thread_message_loop_proxy_.get()) { 768 if (dbus_task_runner_.get()) {
770 if (!dbus_thread_message_loop_proxy_->PostDelayedTask( 769 if (!dbus_task_runner_->PostDelayedTask(
771 from_here, task, delay)) { 770 from_here, task, delay)) {
772 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 771 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
773 } 772 }
774 } else { 773 } else {
775 DCHECK(origin_message_loop_proxy_.get()); 774 DCHECK(origin_task_runner_.get());
776 if (!origin_message_loop_proxy_->PostDelayedTask( 775 if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) {
777 from_here, task, delay)) {
778 LOG(WARNING) << "Failed to post a task to the origin message loop"; 776 LOG(WARNING) << "Failed to post a task to the origin message loop";
779 } 777 }
780 } 778 }
781 } 779 }
782 780
783 bool Bus::HasDBusThread() { 781 bool Bus::HasDBusThread() {
784 return dbus_thread_message_loop_proxy_.get() != NULL; 782 return dbus_task_runner_.get() != NULL;
785 } 783 }
786 784
787 void Bus::AssertOnOriginThread() { 785 void Bus::AssertOnOriginThread() {
788 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 786 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
789 } 787 }
790 788
791 void Bus::AssertOnDBusThread() { 789 void Bus::AssertOnDBusThread() {
792 base::ThreadRestrictions::AssertIOAllowed(); 790 base::ThreadRestrictions::AssertIOAllowed();
793 791
794 if (dbus_thread_message_loop_proxy_.get()) { 792 if (dbus_task_runner_.get()) {
795 DCHECK(dbus_thread_message_loop_proxy_->BelongsToCurrentThread()); 793 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
796 } else { 794 } else {
797 AssertOnOriginThread(); 795 AssertOnOriginThread();
798 } 796 }
799 } 797 }
800 798
801 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 799 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
802 AssertOnDBusThread(); 800 AssertOnDBusThread();
803 801
804 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 802 // watch will be deleted when raw_watch is removed in OnRemoveWatch().
805 Watch* watch = new Watch(raw_watch); 803 Watch* watch = new Watch(raw_watch);
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
921 } 919 }
922 920
923 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 921 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
924 DBusDispatchStatus status, 922 DBusDispatchStatus status,
925 void* data) { 923 void* data) {
926 Bus* self = static_cast<Bus*>(data); 924 Bus* self = static_cast<Bus*>(data);
927 self->OnDispatchStatusChanged(connection, status); 925 self->OnDispatchStatusChanged(connection, status);
928 } 926 }
929 927
930 DBusHandlerResult Bus::OnConnectionDisconnectedFilter( 928 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
931 DBusConnection *connection, 929 DBusConnection* connection,
932 DBusMessage *message, 930 DBusMessage* message,
933 void *data) { 931 void* data) {
934 if (dbus_message_is_signal(message, 932 if (dbus_message_is_signal(message,
935 DBUS_INTERFACE_LOCAL, 933 DBUS_INTERFACE_LOCAL,
936 kDisconnectedSignal)) { 934 kDisconnectedSignal)) {
937 Bus* self = static_cast<Bus*>(data); 935 Bus* self = static_cast<Bus*>(data);
938 self->AssertOnDBusThread(); 936 self->AssertOnDBusThread();
939 self->OnConnectionDisconnected(connection); 937 self->OnConnectionDisconnected(connection);
940 return DBUS_HANDLER_RESULT_HANDLED; 938 return DBUS_HANDLER_RESULT_HANDLED;
941 } 939 }
942 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 940 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
943 } 941 }
944 942
945 } // namespace dbus 943 } // namespace dbus
OLDNEW
« no previous file with comments | « dbus/bus.h ('k') | dbus/bus_unittest.cc » ('j') | dbus/mock_bus.cc » ('J')

Powered by Google App Engine
This is Rietveld 408576698