Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Side by Side Diff: chrome/browser/sync/notifier/cache_invalidation_packet_handler.cc

Issue 9234053: Revert 119171 - Maybe introduced a static initializer - or may be 119173 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 8 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6 6
7 #include <string> 7 #include <string>
8 8
9 #include "base/bind.h"
9 #include "base/base64.h" 10 #include "base/base64.h"
10 #include "base/callback.h" 11 #include "base/callback.h"
11 #include "base/compiler_specific.h" 12 #include "base/compiler_specific.h"
12 #include "base/logging.h" 13 #include "base/logging.h"
13 #include "base/rand_util.h" 14 #include "base/rand_util.h"
14 #include "base/string_number_conversions.h" 15 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/v2/client_gateway.pb.h"
16 #include "google/cacheinvalidation/v2/constants.h" 16 #include "google/cacheinvalidation/v2/constants.h"
17 #include "google/cacheinvalidation/v2/invalidation-client.h" 17 #include "google/cacheinvalidation/v2/invalidation-client.h"
18 #include "google/cacheinvalidation/v2/system-resources.h" 18 #include "google/cacheinvalidation/v2/system-resources.h"
19 #include "jingle/notifier/listener/notification_constants.h"
20 #include "jingle/notifier/listener/push_notifications_send_update_task.h"
21 #include "jingle/notifier/listener/xml_element_util.h" 19 #include "jingle/notifier/listener/xml_element_util.h"
22 #include "talk/xmpp/constants.h" 20 #include "talk/xmpp/constants.h"
23 #include "talk/xmpp/jid.h" 21 #include "talk/xmpp/jid.h"
24 #include "talk/xmpp/xmppclient.h" 22 #include "talk/xmpp/xmppclient.h"
25 #include "talk/xmpp/xmpptask.h" 23 #include "talk/xmpp/xmpptask.h"
26 24
27 namespace sync_notifier { 25 namespace sync_notifier {
28 26
29 namespace { 27 namespace {
30 28
31 const char kBotJid[] = "tango@bot.talk.google.com"; 29 const char kBotJid[] = "tango@bot.talk.google.com";
32 const char kChannelName[] = "tango_raw"; 30 const char kServiceUrl[] = "http://www.google.com/chrome/sync";
31
32 buzz::QName GetQnData() { return buzz::QName("google:notifier", "data"); }
33 buzz::QName GetQnSeq() { return buzz::QName("", "seq"); }
34 buzz::QName GetQnSid() { return buzz::QName("", "sid"); }
35 buzz::QName GetQnServiceUrl() { return buzz::QName("", "serviceUrl"); }
36 buzz::QName GetQnProtocolVersion() {
37 return buzz::QName("", "protocolVersion");
38 }
39 buzz::QName GetQnChannelContext() {
40 return buzz::QName("", "channelContext");
41 }
42
43 // TODO(akalin): Move these task classes out so that they can be
44 // unit-tested. This'll probably be done easier once we consolidate
45 // all the packet sending/receiving classes.
46
47 // A task that listens for ClientInvalidation messages and calls the
48 // given callback on them.
49 class CacheInvalidationListenTask : public buzz::XmppTask {
50 public:
51 // Takes ownership of callback.
52 CacheInvalidationListenTask(
53 buzz::XmppTaskParentInterface* parent,
54 const base::Callback<void(const std::string&)>& callback,
55 const base::Callback<void(const std::string&)>& context_change_callback)
56 : XmppTask(parent, buzz::XmppEngine::HL_TYPE),
57 callback_(callback),
58 context_change_callback_(context_change_callback) {}
59 virtual ~CacheInvalidationListenTask() {}
60
61 virtual int ProcessStart() {
62 DVLOG(2) << "CacheInvalidationListenTask started";
63 return STATE_RESPONSE;
64 }
65
66 virtual int ProcessResponse() {
67 const buzz::XmlElement* stanza = NextStanza();
68 if (stanza == NULL) {
69 DVLOG(2) << "CacheInvalidationListenTask blocked";
70 return STATE_BLOCKED;
71 }
72 DVLOG(2) << "CacheInvalidationListenTask response received";
73 std::string data;
74 if (GetCacheInvalidationIqPacketData(stanza, &data)) {
75 callback_.Run(data);
76 } else {
77 LOG(ERROR) << "Could not get packet data";
78 }
79 // Acknowledge receipt of the iq to the buzz server.
80 // TODO(akalin): Send an error response for malformed packets.
81 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
82 SendStanza(response_stanza.get());
83 return STATE_RESPONSE;
84 }
85
86 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
87 DVLOG(1) << "Stanza received: "
88 << notifier::XmlElementToString(*stanza);
89 if (IsValidCacheInvalidationIqPacket(stanza)) {
90 DVLOG(2) << "Queueing stanza";
91 QueueStanza(stanza);
92 return true;
93 }
94 DVLOG(2) << "Stanza skipped";
95 return false;
96 }
97
98 private:
99 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
100 // We deliberately minimize the verification we do here: see
101 // http://crbug.com/71285 .
102 return MatchRequestIq(stanza, buzz::STR_SET, GetQnData());
103 }
104
105 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
106 std::string* data) {
107 DCHECK(IsValidCacheInvalidationIqPacket(stanza));
108 const buzz::XmlElement* cache_invalidation_iq_packet =
109 stanza->FirstNamed(GetQnData());
110 if (!cache_invalidation_iq_packet) {
111 LOG(ERROR) << "Could not find cache invalidation IQ packet element";
112 return false;
113 }
114 // Look for a channelContext attribute in the content of the stanza. If
115 // present, remember it so it can be echoed back.
116 if (cache_invalidation_iq_packet->HasAttr(GetQnChannelContext())) {
117 context_change_callback_.Run(
118 cache_invalidation_iq_packet->Attr(GetQnChannelContext()));
119 }
120 *data = cache_invalidation_iq_packet->BodyText();
121 return true;
122 }
123
124 base::Callback<void(const std::string&)> callback_;
125 base::Callback<void(const std::string&)> context_change_callback_;
126 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
127 };
128
129 std::string MakeProtocolVersion() {
130 return base::Uint64ToString(invalidation::Constants::kProtocolMajorVersion) +
131 "." +
132 base::Uint64ToString(invalidation::Constants::kProtocolMinorVersion);
133 }
134
135 // A task that sends a single outbound ClientInvalidation message.
136 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
137 public:
138 CacheInvalidationSendMessageTask(buzz::XmppTaskParentInterface* parent,
139 const buzz::Jid& to_jid,
140 const std::string& msg,
141 int seq,
142 const std::string& sid,
143 const std::string& channel_context)
144 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
145 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid),
146 channel_context_(channel_context) {}
147 virtual ~CacheInvalidationSendMessageTask() {}
148
149 virtual int ProcessStart() {
150 scoped_ptr<buzz::XmlElement> stanza(
151 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
152 seq_, sid_, channel_context_));
153 DVLOG(1) << "Sending message: "
154 << notifier::XmlElementToString(*stanza.get());
155 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
156 DVLOG(2) << "Error when sending message";
157 return STATE_ERROR;
158 }
159 return STATE_RESPONSE;
160 }
161
162 virtual int ProcessResponse() {
163 const buzz::XmlElement* stanza = NextStanza();
164 if (stanza == NULL) {
165 DVLOG(2) << "CacheInvalidationSendMessageTask blocked...";
166 return STATE_BLOCKED;
167 }
168 DVLOG(2) << "CacheInvalidationSendMessageTask response received: "
169 << notifier::XmlElementToString(*stanza);
170 // TODO(akalin): Handle errors here.
171 return STATE_DONE;
172 }
173
174 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
175 DVLOG(1) << "Stanza received: "
176 << notifier::XmlElementToString(*stanza);
177 if (!MatchResponseIq(stanza, to_jid_, task_id())) {
178 DVLOG(2) << "Stanza skipped";
179 return false;
180 }
181 DVLOG(2) << "Queueing stanza";
182 QueueStanza(stanza);
183 return true;
184 }
185
186 private:
187 static buzz::XmlElement* MakeCacheInvalidationIqPacket(
188 const buzz::Jid& to_jid,
189 const std::string& task_id,
190 const std::string& msg,
191 int seq, const std::string& sid, const std::string& channel_context) {
192 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
193 buzz::XmlElement* cache_invalidation_iq_packet =
194 new buzz::XmlElement(GetQnData(), true);
195 iq->AddElement(cache_invalidation_iq_packet);
196 cache_invalidation_iq_packet->SetAttr(GetQnSeq(), base::IntToString(seq));
197 cache_invalidation_iq_packet->SetAttr(GetQnSid(), sid);
198 cache_invalidation_iq_packet->SetAttr(GetQnServiceUrl(), kServiceUrl);
199 cache_invalidation_iq_packet->SetAttr(
200 GetQnProtocolVersion(), MakeProtocolVersion());
201 if (!channel_context.empty()) {
202 cache_invalidation_iq_packet->SetAttr(GetQnChannelContext(),
203 channel_context);
204 }
205 cache_invalidation_iq_packet->SetBodyText(msg);
206 return iq;
207 }
208
209 const buzz::Jid to_jid_;
210 std::string msg_;
211 int seq_;
212 std::string sid_;
213 const std::string channel_context_;
214
215 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
216 };
217
218 std::string MakeSid() {
219 uint64 sid = base::RandUint64();
220 return std::string("chrome-sync-") + base::Uint64ToString(sid);
221 }
33 222
34 } // namespace 223 } // namespace
35 224
36 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 225 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
37 base::WeakPtr<buzz::XmppTaskParentInterface> base_task) 226 base::WeakPtr<buzz::XmppTaskParentInterface> base_task)
38 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 227 : weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
39 base_task_(base_task), 228 base_task_(base_task),
40 seq_(0), 229 seq_(0),
41 scheduling_hash_(0) { 230 sid_(MakeSid()) {
42 CHECK(base_task_.get()); 231 CHECK(base_task_.get());
43 // Owned by base_task. Takes ownership of the callback. 232 // Owned by base_task. Takes ownership of the callback.
44 notifier::PushNotificationsListenTask* listen_task = 233 CacheInvalidationListenTask* listen_task =
45 new notifier::PushNotificationsListenTask(base_task_, this); 234 new CacheInvalidationListenTask(
235 base_task_, base::Bind(
236 &CacheInvalidationPacketHandler::HandleInboundPacket,
237 weak_factory_.GetWeakPtr()),
238 base::Bind(
239 &CacheInvalidationPacketHandler::HandleChannelContextChange,
240 weak_factory_.GetWeakPtr()));
46 listen_task->Start(); 241 listen_task->Start();
47 } 242 }
48 243
49 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 244 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
50 DCHECK(non_thread_safe_.CalledOnValidThread()); 245 DCHECK(non_thread_safe_.CalledOnValidThread());
51 } 246 }
52 247
53 void CacheInvalidationPacketHandler::SendMessage( 248 void CacheInvalidationPacketHandler::SendMessage(
54 const std::string& message) { 249 const std::string& message) {
55 DCHECK(non_thread_safe_.CalledOnValidThread()); 250 DCHECK(non_thread_safe_.CalledOnValidThread());
56 if (!base_task_.get()) { 251 if (!base_task_.get()) {
57 return; 252 return;
58 } 253 }
59 ipc::invalidation::ClientGatewayMessage envelope; 254 std::string encoded_message;
60 envelope.set_is_client_to_server(true); 255 if (!base::Base64Encode(message, &encoded_message)) {
61 if (!service_context_.empty()) { 256 LOG(ERROR) << "Could not base64-encode message to send: "
62 envelope.set_service_context(service_context_); 257 << message;
63 envelope.set_rpc_scheduling_hash(scheduling_hash_); 258 return;
64 } 259 }
65 envelope.set_network_message(message);
66
67 notifier::Recipient recipient;
68 recipient.to = kBotJid;
69 notifier::Notification notification;
70 notification.channel = kChannelName;
71 notification.recipients.push_back(recipient);
72 envelope.SerializeToString(&notification.data);
73
74 // Owned by base_task_. 260 // Owned by base_task_.
75 notifier::PushNotificationsSendUpdateTask* send_message_task = 261 CacheInvalidationSendMessageTask* send_message_task =
76 new notifier::PushNotificationsSendUpdateTask(base_task_, notification); 262 new CacheInvalidationSendMessageTask(base_task_,
263 buzz::Jid(kBotJid),
264 encoded_message,
265 seq_, sid_, channel_context_);
77 send_message_task->Start(); 266 send_message_task->Start();
267 ++seq_;
78 } 268 }
79 269
80 void CacheInvalidationPacketHandler::SetMessageReceiver( 270 void CacheInvalidationPacketHandler::SetMessageReceiver(
81 invalidation::MessageCallback* incoming_receiver) { 271 invalidation::MessageCallback* incoming_receiver) {
82 incoming_receiver_.reset(incoming_receiver); 272 incoming_receiver_.reset(incoming_receiver);
83 } 273 }
84 274
85 void CacheInvalidationPacketHandler::SendSubscriptionRequest() { 275 void CacheInvalidationPacketHandler::HandleInboundPacket(
86 notifier::Subscription subscription; 276 const std::string& packet) {
87 subscription.channel = kChannelName; 277 DCHECK(non_thread_safe_.CalledOnValidThread());
88 subscription.from = ""; 278 std::string decoded_message;
89 notifier::SubscriptionList subscription_list; 279 if (!base::Base64Decode(packet, &decoded_message)) {
90 subscription_list.push_back(subscription); 280 LOG(ERROR) << "Could not base64-decode received message: "
91 // Owned by base_task_. 281 << packet;
92 notifier::PushNotificationsSubscribeTask* push_subscription_task = 282 return;
93 new notifier::PushNotificationsSubscribeTask( 283 }
94 base_task_, subscription_list, this); 284 incoming_receiver_->Run(decoded_message);
95 push_subscription_task->Start();
96 } 285 }
97 286
98 void CacheInvalidationPacketHandler::OnSubscribed() { 287 void CacheInvalidationPacketHandler::HandleChannelContextChange(
99 // TODO(ghc): Consider whether we should do more here. 288 const std::string& context) {
100 }
101
102 void CacheInvalidationPacketHandler::OnSubscriptionError() {
103 // TODO(ghc): Consider whether we should do more here.
104 }
105
106 void CacheInvalidationPacketHandler::OnNotificationReceived(
107 const notifier::Notification& notification) {
108 DCHECK(non_thread_safe_.CalledOnValidThread()); 289 DCHECK(non_thread_safe_.CalledOnValidThread());
109 const std::string& decoded_message = notification.data; 290 channel_context_ = context;
110 ipc::invalidation::ClientGatewayMessage envelope;
111 envelope.ParseFromString(decoded_message);
112 if (!envelope.IsInitialized()) {
113 LOG(ERROR) << "Could not parse ClientGatewayMessage: "
114 << decoded_message;
115 return;
116 }
117 if (envelope.has_service_context()) {
118 service_context_ = envelope.service_context();
119 }
120 if (envelope.has_rpc_scheduling_hash()) {
121 scheduling_hash_ = envelope.rpc_scheduling_hash();
122 }
123 incoming_receiver_->Run(envelope.network_message());
124 } 291 }
125 292
126 } // namespace sync_notifier 293 } // namespace sync_notifier
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698