OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" | 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" |
6 | 6 |
7 #include <string> | 7 #include <string> |
8 | 8 |
| 9 #include "base/bind.h" |
9 #include "base/base64.h" | 10 #include "base/base64.h" |
10 #include "base/callback.h" | 11 #include "base/callback.h" |
11 #include "base/compiler_specific.h" | 12 #include "base/compiler_specific.h" |
12 #include "base/logging.h" | 13 #include "base/logging.h" |
13 #include "base/rand_util.h" | 14 #include "base/rand_util.h" |
14 #include "base/string_number_conversions.h" | 15 #include "base/string_number_conversions.h" |
15 #include "google/cacheinvalidation/v2/client_gateway.pb.h" | |
16 #include "google/cacheinvalidation/v2/constants.h" | 16 #include "google/cacheinvalidation/v2/constants.h" |
17 #include "google/cacheinvalidation/v2/invalidation-client.h" | 17 #include "google/cacheinvalidation/v2/invalidation-client.h" |
18 #include "google/cacheinvalidation/v2/system-resources.h" | 18 #include "google/cacheinvalidation/v2/system-resources.h" |
19 #include "jingle/notifier/listener/notification_constants.h" | |
20 #include "jingle/notifier/listener/push_notifications_send_update_task.h" | |
21 #include "jingle/notifier/listener/xml_element_util.h" | 19 #include "jingle/notifier/listener/xml_element_util.h" |
22 #include "talk/xmpp/constants.h" | 20 #include "talk/xmpp/constants.h" |
23 #include "talk/xmpp/jid.h" | 21 #include "talk/xmpp/jid.h" |
24 #include "talk/xmpp/xmppclient.h" | 22 #include "talk/xmpp/xmppclient.h" |
25 #include "talk/xmpp/xmpptask.h" | 23 #include "talk/xmpp/xmpptask.h" |
26 | 24 |
27 namespace sync_notifier { | 25 namespace sync_notifier { |
28 | 26 |
29 namespace { | 27 namespace { |
30 | 28 |
31 const char kBotJid[] = "tango@bot.talk.google.com"; | 29 const char kBotJid[] = "tango@bot.talk.google.com"; |
32 const char kChannelName[] = "tango_raw"; | 30 const char kServiceUrl[] = "http://www.google.com/chrome/sync"; |
| 31 |
| 32 buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); } |
| 33 buzz::QName GetQnSeq() { return buzz::QName("", "seq"); } |
| 34 buzz::QName GetQnSid() { return buzz::QName("", "sid"); } |
| 35 buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); } |
| 36 buzz::QName GetQnProtocolVersion() { |
| 37 return buzz::QName("", "protocolVersion"); |
| 38 } |
| 39 buzz::QName GetQnChannelContext() { |
| 40 return buzz::QName("", "channelContext"); |
| 41 } |
| 42 |
| 43 // TODO(akalin): Move these task classes out so that they can be |
| 44 // unit-tested. This'll probably be done easier once we consolidate |
| 45 // all the packet sending/receiving classes. |
| 46 |
| 47 // A task that listens for ClientInvalidation messages and calls the |
| 48 // given callback on them. |
| 49 class CacheInvalidationListenTask : public buzz::XmppTask { |
| 50 public: |
| 51 // Takes ownership of callback. |
| 52 CacheInvalidationListenTask( |
| 53 buzz::XmppTaskParentInterface* parent, |
| 54 const base::Callback<void(const std::string&)>& callback, |
| 55 const base::Callback<void(const std::string&)>& context_change_callback) |
| 56 : XmppTask(parent, buzz::XmppEngine::HL_TYPE), |
| 57 callback_(callback), |
| 58 context_change_callback_(context_change_callback) {} |
| 59 virtual ~CacheInvalidationListenTask() {} |
| 60 |
| 61 virtual int ProcessStart() { |
| 62 DVLOG(2) << "CacheInvalidationListenTask started"; |
| 63 return STATE_RESPONSE; |
| 64 } |
| 65 |
| 66 virtual int ProcessResponse() { |
| 67 const buzz::XmlElement* stanza = NextStanza(); |
| 68 if (stanza == NULL) { |
| 69 DVLOG(2) << "CacheInvalidationListenTask blocked"; |
| 70 return STATE_BLOCKED; |
| 71 } |
| 72 DVLOG(2) << "CacheInvalidationListenTask response received"; |
| 73 std::string data; |
| 74 if (GetCacheInvalidationIqPacketData(stanza, &data)) { |
| 75 callback_.Run(data); |
| 76 } else { |
| 77 LOG(ERROR) << "Could not get packet data"; |
| 78 } |
| 79 // Acknowledge receipt of the iq to the buzz server. |
| 80 // TODO(akalin): Send an error response for malformed packets. |
| 81 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); |
| 82 SendStanza(response_stanza.get()); |
| 83 return STATE_RESPONSE; |
| 84 } |
| 85 |
| 86 virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| 87 DVLOG(1) << "Stanza received: " |
| 88 << notifier::XmlElementToString(*stanza); |
| 89 if (IsValidCacheInvalidationIqPacket(stanza)) { |
| 90 DVLOG(2) << "Queueing stanza"; |
| 91 QueueStanza(stanza); |
| 92 return true; |
| 93 } |
| 94 DVLOG(2) << "Stanza skipped"; |
| 95 return false; |
| 96 } |
| 97 |
| 98 private: |
| 99 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { |
| 100 // We deliberately minimize the verification we do here: see |
| 101 // http://crbug.com/71285 . |
| 102 return MatchRequestIq(stanza, buzz::STR_SET, GetQnData()); |
| 103 } |
| 104 |
| 105 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, |
| 106 std::string* data) { |
| 107 DCHECK(IsValidCacheInvalidationIqPacket(stanza)); |
| 108 const buzz::XmlElement* cache_invalidation_iq_packet = |
| 109 stanza->FirstNamed(GetQnData()); |
| 110 if (!cache_invalidation_iq_packet) { |
| 111 LOG(ERROR) << "Could not find cache invalidation IQ packet element"; |
| 112 return false; |
| 113 } |
| 114 // Look for a channelContext attribute in the content of the stanza. If |
| 115 // present, remember it so it can be echoed back. |
| 116 if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) { |
| 117 context_change_callback_.Run( |
| 118 cache_invalidation_iq_packet->Attr(GetQnChannelContext())); |
| 119 } |
| 120 *data = cache_invalidation_iq_packet->BodyText(); |
| 121 return true; |
| 122 } |
| 123 |
| 124 base::Callback<void(const std::string&)> callback_; |
| 125 base::Callback<void(const std::string&)> context_change_callback_; |
| 126 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); |
| 127 }; |
| 128 |
| 129 std::string MakeProtocolVersion() { |
| 130 return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) + |
| 131 "." + |
| 132 base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion); |
| 133 } |
| 134 |
| 135 // A task that sends a single outbound ClientInvalidation message. |
| 136 class CacheInvalidationSendMessageTask : public buzz::XmppTask { |
| 137 public: |
| 138 CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent, |
| 139 const buzz::Jid& to_jid, |
| 140 const std::string& msg, |
| 141 int seq, |
| 142 const std::string& sid, |
| 143 const std::string& channel_context) |
| 144 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), |
| 145 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid), |
| 146 channel_context_(channel_context) {} |
| 147 virtual ~CacheInvalidationSendMessageTask() {} |
| 148 |
| 149 virtual int ProcessStart() { |
| 150 scoped_ptr<buzz::XmlElement> stanza( |
| 151 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, |
| 152 seq_, sid_, channel_context_)); |
| 153 DVLOG(1) << "Sending message: " |
| 154 << notifier::XmlElementToString(*stanza.get()); |
| 155 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { |
| 156 DVLOG(2) << "Error when sending message"; |
| 157 return STATE_ERROR; |
| 158 } |
| 159 return STATE_RESPONSE; |
| 160 } |
| 161 |
| 162 virtual int ProcessResponse() { |
| 163 const buzz::XmlElement* stanza = NextStanza(); |
| 164 if (stanza == NULL) { |
| 165 DVLOG(2) << "CacheInvalidationSendMessageTask blocked..."; |
| 166 return STATE_BLOCKED; |
| 167 } |
| 168 DVLOG(2) << "CacheInvalidationSendMessageTask response received: " |
| 169 << notifier::XmlElementToString(*stanza); |
| 170 // TODO(akalin): Handle errors here. |
| 171 return STATE_DONE; |
| 172 } |
| 173 |
| 174 virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
| 175 DVLOG(1) << "Stanza received: " |
| 176 << notifier::XmlElementToString(*stanza); |
| 177 if (!MatchResponseIq(stanza, to_jid_, task_id())) { |
| 178 DVLOG(2) << "Stanza skipped"; |
| 179 return false; |
| 180 } |
| 181 DVLOG(2) << "Queueing stanza"; |
| 182 QueueStanza(stanza); |
| 183 return true; |
| 184 } |
| 185 |
| 186 private: |
| 187 static buzz::XmlElement* MakeCacheInvalidationIqPacket( |
| 188 const buzz::Jid& to_jid, |
| 189 const std::string& task_id, |
| 190 const std::string& msg, |
| 191 int seq, const std::string& sid, const std::string& channel_context) { |
| 192 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); |
| 193 buzz::XmlElement* cache_invalidation_iq_packet = |
| 194 new buzz::XmlElement(GetQnData(), true); |
| 195 iq->AddElement(cache_invalidation_iq_packet); |
| 196 cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq)); |
| 197 cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid); |
| 198 cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl); |
| 199 cache_invalidation_iq_packet->SetAttr( |
| 200 GetQnProtocolVersion(), MakeProtocolVersion()); |
| 201 if (!channel_context.empty()) { |
| 202 cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(), |
| 203 channel_context); |
| 204 } |
| 205 cache_invalidation_iq_packet->SetBodyText(msg); |
| 206 return iq; |
| 207 } |
| 208 |
| 209 const buzz::Jid to_jid_; |
| 210 std::string msg_; |
| 211 int seq_; |
| 212 std::string sid_; |
| 213 const std::string channel_context_; |
| 214 |
| 215 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); |
| 216 }; |
| 217 |
| 218 std::string MakeSid() { |
| 219 uint64 sid = base::RandUint64(); |
| 220 return std::string("chrome-sync-") + base::Uint64ToString(sid); |
| 221 } |
33 | 222 |
34 } // namespace | 223 } // namespace |
35 | 224 |
36 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( | 225 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( |
37 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) | 226 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) |
38 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 227 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
39 base_task_(base_task), | 228 base_task_(base_task), |
40 seq_(0), | 229 seq_(0), |
41 scheduling_hash_(0) { | 230 sid_(MakeSid()) { |
42 CHECK(base_task_.get()); | 231 CHECK(base_task_.get()); |
43 // Owned by base_task. Takes ownership of the callback. | 232 // Owned by base_task. Takes ownership of the callback. |
44 notifier::PushNotificationsListenTask* listen_task = | 233 CacheInvalidationListenTask* listen_task = |
45 new notifier::PushNotificationsListenTask(base_task_, this); | 234 new CacheInvalidationListenTask( |
| 235 base_task_, base::Bind( |
| 236 &CacheInvalidationPacketHandler::HandleInboundPacket, |
| 237 weak_factory_.GetWeakPtr()), |
| 238 base::Bind( |
| 239 &CacheInvalidationPacketHandler::HandleChannelContextChange, |
| 240 weak_factory_.GetWeakPtr())); |
46 listen_task->Start(); | 241 listen_task->Start(); |
47 } | 242 } |
48 | 243 |
49 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { | 244 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { |
50 DCHECK(non_thread_safe_.CalledOnValidThread()); | 245 DCHECK(non_thread_safe_.CalledOnValidThread()); |
51 } | 246 } |
52 | 247 |
53 void CacheInvalidationPacketHandler::SendMessage( | 248 void CacheInvalidationPacketHandler::SendMessage( |
54 const std::string& message) { | 249 const std::string& message) { |
55 DCHECK(non_thread_safe_.CalledOnValidThread()); | 250 DCHECK(non_thread_safe_.CalledOnValidThread()); |
56 if (!base_task_.get()) { | 251 if (!base_task_.get()) { |
57 return; | 252 return; |
58 } | 253 } |
59 ipc::invalidation::ClientGatewayMessage envelope; | 254 std::string encoded_message; |
60 envelope.set_is_client_to_server(true); | 255 if (!base::Base64Encode(message, &encoded_message)) { |
61 if (!service_context_.empty()) { | 256 LOG(ERROR) << "Could not base64-encode message to send: " |
62 envelope.set_service_context(service_context_); | 257 << message; |
63 envelope.set_rpc_scheduling_hash(scheduling_hash_); | 258 return; |
64 } | 259 } |
65 envelope.set_network_message(message); | |
66 | |
67 notifier::Recipient recipient; | |
68 recipient.to = kBotJid; | |
69 notifier::Notification notification; | |
70 notification.channel = kChannelName; | |
71 notification.recipients.push_back(recipient); | |
72 envelope.SerializeToString(¬ification.data); | |
73 | |
74 // Owned by base_task_. | 260 // Owned by base_task_. |
75 notifier::PushNotificationsSendUpdateTask* send_message_task = | 261 CacheInvalidationSendMessageTask* send_message_task = |
76 new notifier::PushNotificationsSendUpdateTask(base_task_, notification); | 262 new CacheInvalidationSendMessageTask(base_task_, |
| 263 buzz::Jid(kBotJid), |
| 264 encoded_message, |
| 265 seq_, sid_, channel_context_); |
77 send_message_task->Start(); | 266 send_message_task->Start(); |
| 267 ++seq_; |
78 } | 268 } |
79 | 269 |
80 void CacheInvalidationPacketHandler::SetMessageReceiver( | 270 void CacheInvalidationPacketHandler::SetMessageReceiver( |
81 invalidation::MessageCallback* incoming_receiver) { | 271 invalidation::MessageCallback* incoming_receiver) { |
82 incoming_receiver_.reset(incoming_receiver); | 272 incoming_receiver_.reset(incoming_receiver); |
83 } | 273 } |
84 | 274 |
85 void CacheInvalidationPacketHandler::SendSubscriptionRequest() { | 275 void CacheInvalidationPacketHandler::HandleInboundPacket( |
86 notifier::Subscription subscription; | 276 const std::string& packet) { |
87 subscription.channel = kChannelName; | 277 DCHECK(non_thread_safe_.CalledOnValidThread()); |
88 subscription.from = ""; | 278 std::string decoded_message; |
89 notifier::SubscriptionList subscription_list; | 279 if (!base::Base64Decode(packet, &decoded_message)) { |
90 subscription_list.push_back(subscription); | 280 LOG(ERROR) << "Could not base64-decode received message: " |
91 // Owned by base_task_. | 281 << packet; |
92 notifier::PushNotificationsSubscribeTask* push_subscription_task = | 282 return; |
93 new notifier::PushNotificationsSubscribeTask( | 283 } |
94 base_task_, subscription_list, this); | 284 incoming_receiver_->Run(decoded_message); |
95 push_subscription_task->Start(); | |
96 } | 285 } |
97 | 286 |
98 void CacheInvalidationPacketHandler::OnSubscribed() { | 287 void CacheInvalidationPacketHandler::HandleChannelContextChange( |
99 // TODO(ghc): Consider whether we should do more here. | 288 const std::string& context) { |
100 } | |
101 | |
102 void CacheInvalidationPacketHandler::OnSubscriptionError() { | |
103 // TODO(ghc): Consider whether we should do more here. | |
104 } | |
105 | |
106 void CacheInvalidationPacketHandler::OnNotificationReceived( | |
107 const notifier::Notification& notification) { | |
108 DCHECK(non_thread_safe_.CalledOnValidThread()); | 289 DCHECK(non_thread_safe_.CalledOnValidThread()); |
109 const std::string& decoded_message = notification.data; | 290 channel_context_ = context; |
110 ipc::invalidation::ClientGatewayMessage envelope; | |
111 envelope.ParseFromString(decoded_message); | |
112 if (!envelope.IsInitialized()) { | |
113 LOG(ERROR) << "Could not parse ClientGatewayMessage: " | |
114 << decoded_message; | |
115 return; | |
116 } | |
117 if (envelope.has_service_context()) { | |
118 service_context_ = envelope.service_context(); | |
119 } | |
120 if (envelope.has_rpc_scheduling_hash()) { | |
121 scheduling_hash_ = envelope.rpc_scheduling_hash(); | |
122 } | |
123 incoming_receiver_->Run(envelope.network_message()); | |
124 } | 291 } |
125 | 292 |
126 } // namespace sync_notifier | 293 } // namespace sync_notifier |
OLD | NEW |