Index: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc |
=================================================================== |
--- chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (revision 119174) |
+++ chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc (working copy) |
@@ -1,4 +1,4 @@ |
-// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
@@ -6,18 +6,16 @@ |
#include <string> |
+#include "base/bind.h" |
#include "base/base64.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/logging.h" |
#include "base/rand_util.h" |
#include "base/string_number_conversions.h" |
-#include "google/cacheinvalidation/v2/client_gateway.pb.h" |
#include "google/cacheinvalidation/v2/constants.h" |
#include "google/cacheinvalidation/v2/invalidation-client.h" |
#include "google/cacheinvalidation/v2/system-resources.h" |
-#include "jingle/notifier/listener/notification_constants.h" |
-#include "jingle/notifier/listener/push_notifications_send_update_task.h" |
#include "jingle/notifier/listener/xml_element_util.h" |
#include "talk/xmpp/constants.h" |
#include "talk/xmpp/jid.h" |
@@ -29,8 +27,199 @@ |
namespace { |
const char kBotJid[] = "tango@bot.talk.google.com"; |
-const char kChannelName[] = "tango_raw"; |
+const char kServiceUrl[] = "http://www.google.com/chrome/sync"; |
+buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); } |
+buzz::QName GetQnSeq() { return buzz::QName("", "seq"); } |
+buzz::QName GetQnSid() { return buzz::QName("", "sid"); } |
+buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); } |
+buzz::QName GetQnProtocolVersion() { |
+ return buzz::QName("", "protocolVersion"); |
+} |
+buzz::QName GetQnChannelContext() { |
+ return buzz::QName("", "channelContext"); |
+} |
+ |
+// TODO(akalin): Move these task classes out so that they can be |
+// unit-tested. This'll probably be done easier once we consolidate |
+// all the packet sending/receiving classes. |
+ |
+// A task that listens for ClientInvalidation messages and calls the |
+// given callback on them. |
+class CacheInvalidationListenTask : public buzz::XmppTask { |
+ public: |
+ // Takes ownership of callback. |
+ CacheInvalidationListenTask( |
+ buzz::XmppTaskParentInterface* parent, |
+ const base::Callback<void(const std::string&)>& callback, |
+ const base::Callback<void(const std::string&)>& context_change_callback) |
+ : XmppTask(parent, buzz::XmppEngine::HL_TYPE), |
+ callback_(callback), |
+ context_change_callback_(context_change_callback) {} |
+ virtual ~CacheInvalidationListenTask() {} |
+ |
+ virtual int ProcessStart() { |
+ DVLOG(2) << "CacheInvalidationListenTask started"; |
+ return STATE_RESPONSE; |
+ } |
+ |
+ virtual int ProcessResponse() { |
+ const buzz::XmlElement* stanza = NextStanza(); |
+ if (stanza == NULL) { |
+ DVLOG(2) << "CacheInvalidationListenTask blocked"; |
+ return STATE_BLOCKED; |
+ } |
+ DVLOG(2) << "CacheInvalidationListenTask response received"; |
+ std::string data; |
+ if (GetCacheInvalidationIqPacketData(stanza, &data)) { |
+ callback_.Run(data); |
+ } else { |
+ LOG(ERROR) << "Could not get packet data"; |
+ } |
+ // Acknowledge receipt of the iq to the buzz server. |
+ // TODO(akalin): Send an error response for malformed packets. |
+ scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); |
+ SendStanza(response_stanza.get()); |
+ return STATE_RESPONSE; |
+ } |
+ |
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
+ DVLOG(1) << "Stanza received: " |
+ << notifier::XmlElementToString(*stanza); |
+ if (IsValidCacheInvalidationIqPacket(stanza)) { |
+ DVLOG(2) << "Queueing stanza"; |
+ QueueStanza(stanza); |
+ return true; |
+ } |
+ DVLOG(2) << "Stanza skipped"; |
+ return false; |
+ } |
+ |
+ private: |
+ bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { |
+ // We deliberately minimize the verification we do here: see |
+ // http://crbug.com/71285 . |
+ return MatchRequestIq(stanza, buzz::STR_SET, GetQnData()); |
+ } |
+ |
+ bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, |
+ std::string* data) { |
+ DCHECK(IsValidCacheInvalidationIqPacket(stanza)); |
+ const buzz::XmlElement* cache_invalidation_iq_packet = |
+ stanza->FirstNamed(GetQnData()); |
+ if (!cache_invalidation_iq_packet) { |
+ LOG(ERROR) << "Could not find cache invalidation IQ packet element"; |
+ return false; |
+ } |
+ // Look for a channelContext attribute in the content of the stanza. If |
+ // present, remember it so it can be echoed back. |
+ if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) { |
+ context_change_callback_.Run( |
+ cache_invalidation_iq_packet->Attr(GetQnChannelContext())); |
+ } |
+ *data = cache_invalidation_iq_packet->BodyText(); |
+ return true; |
+ } |
+ |
+ base::Callback<void(const std::string&)> callback_; |
+ base::Callback<void(const std::string&)> context_change_callback_; |
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); |
+}; |
+ |
+std::string MakeProtocolVersion() { |
+ return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) + |
+ "." + |
+ base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion); |
+} |
+ |
+// A task that sends a single outbound ClientInvalidation message. |
+class CacheInvalidationSendMessageTask : public buzz::XmppTask { |
+ public: |
+ CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent, |
+ const buzz::Jid& to_jid, |
+ const std::string& msg, |
+ int seq, |
+ const std::string& sid, |
+ const std::string& channel_context) |
+ : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), |
+ to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid), |
+ channel_context_(channel_context) {} |
+ virtual ~CacheInvalidationSendMessageTask() {} |
+ |
+ virtual int ProcessStart() { |
+ scoped_ptr<buzz::XmlElement> stanza( |
+ MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, |
+ seq_, sid_, channel_context_)); |
+ DVLOG(1) << "Sending message: " |
+ << notifier::XmlElementToString(*stanza.get()); |
+ if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { |
+ DVLOG(2) << "Error when sending message"; |
+ return STATE_ERROR; |
+ } |
+ return STATE_RESPONSE; |
+ } |
+ |
+ virtual int ProcessResponse() { |
+ const buzz::XmlElement* stanza = NextStanza(); |
+ if (stanza == NULL) { |
+ DVLOG(2) << "CacheInvalidationSendMessageTask blocked..."; |
+ return STATE_BLOCKED; |
+ } |
+ DVLOG(2) << "CacheInvalidationSendMessageTask response received: " |
+ << notifier::XmlElementToString(*stanza); |
+ // TODO(akalin): Handle errors here. |
+ return STATE_DONE; |
+ } |
+ |
+ virtual bool HandleStanza(const buzz::XmlElement* stanza) { |
+ DVLOG(1) << "Stanza received: " |
+ << notifier::XmlElementToString(*stanza); |
+ if (!MatchResponseIq(stanza, to_jid_, task_id())) { |
+ DVLOG(2) << "Stanza skipped"; |
+ return false; |
+ } |
+ DVLOG(2) << "Queueing stanza"; |
+ QueueStanza(stanza); |
+ return true; |
+ } |
+ |
+ private: |
+ static buzz::XmlElement* MakeCacheInvalidationIqPacket( |
+ const buzz::Jid& to_jid, |
+ const std::string& task_id, |
+ const std::string& msg, |
+ int seq, const std::string& sid, const std::string& channel_context) { |
+ buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); |
+ buzz::XmlElement* cache_invalidation_iq_packet = |
+ new buzz::XmlElement(GetQnData(), true); |
+ iq->AddElement(cache_invalidation_iq_packet); |
+ cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq)); |
+ cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid); |
+ cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl); |
+ cache_invalidation_iq_packet->SetAttr( |
+ GetQnProtocolVersion(), MakeProtocolVersion()); |
+ if (!channel_context.empty()) { |
+ cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(), |
+ channel_context); |
+ } |
+ cache_invalidation_iq_packet->SetBodyText(msg); |
+ return iq; |
+ } |
+ |
+ const buzz::Jid to_jid_; |
+ std::string msg_; |
+ int seq_; |
+ std::string sid_; |
+ const std::string channel_context_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); |
+}; |
+ |
+std::string MakeSid() { |
+ uint64 sid = base::RandUint64(); |
+ return std::string("chrome-sync-") + base::Uint64ToString(sid); |
+} |
+ |
} // namespace |
CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( |
@@ -38,11 +227,17 @@ |
: weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
base_task_(base_task), |
seq_(0), |
- scheduling_hash_(0) { |
+ sid_(MakeSid()) { |
CHECK(base_task_.get()); |
// Owned by base_task. Takes ownership of the callback. |
- notifier::PushNotificationsListenTask* listen_task = |
- new notifier::PushNotificationsListenTask(base_task_, this); |
+ CacheInvalidationListenTask* listen_task = |
+ new CacheInvalidationListenTask( |
+ base_task_, base::Bind( |
+ &CacheInvalidationPacketHandler::HandleInboundPacket, |
+ weak_factory_.GetWeakPtr()), |
+ base::Bind( |
+ &CacheInvalidationPacketHandler::HandleChannelContextChange, |
+ weak_factory_.GetWeakPtr())); |
listen_task->Start(); |
} |
@@ -56,25 +251,20 @@ |
if (!base_task_.get()) { |
return; |
} |
- ipc::invalidation::ClientGatewayMessage envelope; |
- envelope.set_is_client_to_server(true); |
- if (!service_context_.empty()) { |
- envelope.set_service_context(service_context_); |
- envelope.set_rpc_scheduling_hash(scheduling_hash_); |
+ std::string encoded_message; |
+ if (!base::Base64Encode(message, &encoded_message)) { |
+ LOG(ERROR) << "Could not base64-encode message to send: " |
+ << message; |
+ return; |
} |
- envelope.set_network_message(message); |
- |
- notifier::Recipient recipient; |
- recipient.to = kBotJid; |
- notifier::Notification notification; |
- notification.channel = kChannelName; |
- notification.recipients.push_back(recipient); |
- envelope.SerializeToString(¬ification.data); |
- |
// Owned by base_task_. |
- notifier::PushNotificationsSendUpdateTask* send_message_task = |
- new notifier::PushNotificationsSendUpdateTask(base_task_, notification); |
+ CacheInvalidationSendMessageTask* send_message_task = |
+ new CacheInvalidationSendMessageTask(base_task_, |
+ buzz::Jid(kBotJid), |
+ encoded_message, |
+ seq_, sid_, channel_context_); |
send_message_task->Start(); |
+ ++seq_; |
} |
void CacheInvalidationPacketHandler::SetMessageReceiver( |
@@ -82,45 +272,22 @@ |
incoming_receiver_.reset(incoming_receiver); |
} |
-void CacheInvalidationPacketHandler::SendSubscriptionRequest() { |
- notifier::Subscription subscription; |
- subscription.channel = kChannelName; |
- subscription.from = ""; |
- notifier::SubscriptionList subscription_list; |
- subscription_list.push_back(subscription); |
- // Owned by base_task_. |
- notifier::PushNotificationsSubscribeTask* push_subscription_task = |
- new notifier::PushNotificationsSubscribeTask( |
- base_task_, subscription_list, this); |
- push_subscription_task->Start(); |
-} |
- |
-void CacheInvalidationPacketHandler::OnSubscribed() { |
- // TODO(ghc): Consider whether we should do more here. |
-} |
- |
-void CacheInvalidationPacketHandler::OnSubscriptionError() { |
- // TODO(ghc): Consider whether we should do more here. |
-} |
- |
-void CacheInvalidationPacketHandler::OnNotificationReceived( |
- const notifier::Notification& notification) { |
+void CacheInvalidationPacketHandler::HandleInboundPacket( |
+ const std::string& packet) { |
DCHECK(non_thread_safe_.CalledOnValidThread()); |
- const std::string& decoded_message = notification.data; |
- ipc::invalidation::ClientGatewayMessage envelope; |
- envelope.ParseFromString(decoded_message); |
- if (!envelope.IsInitialized()) { |
- LOG(ERROR) << "Could not parse ClientGatewayMessage: " |
- << decoded_message; |
+ std::string decoded_message; |
+ if (!base::Base64Decode(packet, &decoded_message)) { |
+ LOG(ERROR) << "Could not base64-decode received message: " |
+ << packet; |
return; |
} |
- if (envelope.has_service_context()) { |
- service_context_ = envelope.service_context(); |
- } |
- if (envelope.has_rpc_scheduling_hash()) { |
- scheduling_hash_ = envelope.rpc_scheduling_hash(); |
- } |
- incoming_receiver_->Run(envelope.network_message()); |
+ incoming_receiver_->Run(decoded_message); |
} |
+void CacheInvalidationPacketHandler::HandleChannelContextChange( |
+ const std::string& context) { |
+ DCHECK(non_thread_safe_.CalledOnValidThread()); |
+ channel_context_ = context; |
+} |
+ |
} // namespace sync_notifier |