Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(145)

Side by Side Diff: sync/notifier/sync_invalidation_listener.cc

Issue 10911084: Implement Invalidator::Acknowledge (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Adapt patch to new TickClock interface Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/notifier/sync_invalidation_listener.h" 5 #include "sync/notifier/sync_invalidation_listener.h"
6 6
7 #include <string>
8 #include <vector> 7 #include <vector>
9 8
9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/tracked_objects.h" 13 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h" 15 #include "google/cacheinvalidation/include/types.h"
16 #include "google/cacheinvalidation/types.pb.h" 16 #include "google/cacheinvalidation/types.pb.h"
17 #include "jingle/notifier/listener/push_client.h" 17 #include "jingle/notifier/listener/push_client.h"
18 #include "sync/notifier/invalidation_util.h" 18 #include "sync/notifier/invalidation_util.h"
19 #include "sync/notifier/registration_manager.h" 19 #include "sync/notifier/registration_manager.h"
20 20
21 namespace { 21 namespace {
22 22
23 const char kApplicationName[] = "chrome-sync"; 23 const char kApplicationName[] = "chrome-sync";
24 24
25 } // namespace 25 } // namespace
26 26
27 namespace syncer { 27 namespace syncer {
28 28
29 SyncInvalidationListener::Delegate::~Delegate() {} 29 SyncInvalidationListener::Delegate::~Delegate() {}
30 30
31 SyncInvalidationListener::SyncInvalidationListener( 31 SyncInvalidationListener::SyncInvalidationListener(
32 base::TickClock* tick_clock,
32 scoped_ptr<notifier::PushClient> push_client) 33 scoped_ptr<notifier::PushClient> push_client)
33 : push_client_(push_client.get()), 34 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
35 ack_tracker_(tick_clock, ALLOW_THIS_IN_INITIALIZER_LIST(this)),
36 push_client_(push_client.get()),
34 sync_system_resources_(push_client.Pass(), 37 sync_system_resources_(push_client.Pass(),
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), 38 ALLOW_THIS_IN_INITIALIZER_LIST(this)),
36 delegate_(NULL), 39 delegate_(NULL),
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), 40 ticl_state_(DEFAULT_INVALIDATION_ERROR),
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { 41 push_client_state_(DEFAULT_INVALIDATION_ERROR) {
39 DCHECK(CalledOnValidThread()); 42 DCHECK(CalledOnValidThread());
40 push_client_->AddObserver(this); 43 push_client_->AddObserver(this);
41 } 44 }
42 45
43 SyncInvalidationListener::~SyncInvalidationListener() { 46 SyncInvalidationListener::~SyncInvalidationListener() {
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 101
99 // TODO(rlarocque): This call exists as part of an effort to move the 102 // TODO(rlarocque): This call exists as part of an effort to move the
100 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to 103 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to
101 // storage that lives on the UI thread. Once this has been in place for a 104 // storage that lives on the UI thread. Once this has been in place for a
102 // milestone or two, we can remove it and start looking for invalidator client 105 // milestone or two, we can remove it and start looking for invalidator client
103 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. 106 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142.
104 invalidation_state_tracker_.Call( 107 invalidation_state_tracker_.Call(
105 FROM_HERE, 108 FROM_HERE,
106 &InvalidationStateTracker::SetInvalidatorClientId, 109 &InvalidationStateTracker::SetInvalidatorClientId,
107 client_id); 110 client_id);
111
112 // Set up reminders for any invalidations that have not been locally
113 // acknowledged.
114 ObjectIdSet unacknowledged_ids;
115 for (InvalidationStateMap::const_iterator it =
116 invalidation_state_map_.begin();
117 it != invalidation_state_map_.end(); ++it) {
118 if (it->second.expected.Equals(it->second.current))
119 continue;
120 unacknowledged_ids.insert(it->first);
121 }
122 if (!unacknowledged_ids.empty())
123 ack_tracker_.Track(unacknowledged_ids);
108 } 124 }
109 125
110 void SyncInvalidationListener::UpdateCredentials( 126 void SyncInvalidationListener::UpdateCredentials(
111 const std::string& email, const std::string& token) { 127 const std::string& email, const std::string& token) {
112 DCHECK(CalledOnValidThread()); 128 DCHECK(CalledOnValidThread());
113 sync_system_resources_.network()->UpdateCredentials(email, token); 129 sync_system_resources_.network()->UpdateCredentials(email, token);
114 } 130 }
115 131
116 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 132 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
117 DCHECK(CalledOnValidThread()); 133 DCHECK(CalledOnValidThread());
118 registered_ids_ = ids; 134 registered_ids_ = ids;
119 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 135 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
120 // working XMPP connection (as observed by us), so check it instead 136 // working XMPP connection (as observed by us), so check it instead
121 // of GetState() (see http://crbug.com/139424). 137 // of GetState() (see http://crbug.com/139424).
122 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { 138 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) {
123 DoRegistrationUpdate(); 139 DoRegistrationUpdate();
124 } 140 }
125 } 141 }
126 142
143 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
144 const AckHandle& ack_handle) {
145 DCHECK(CalledOnValidThread());
146 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
147 if (state_it == invalidation_state_map_.end())
148 return;
149 invalidation_state_tracker_.Call(
150 FROM_HERE,
151 &InvalidationStateTracker::Acknowledge,
152 id,
153 ack_handle);
154 state_it->second.current = ack_handle;
155 if (state_it->second.expected.Equals(ack_handle)) {
156 // If the received ack matches the expected ack, then we no longer need to
157 // keep track of |id| since it is up-to-date.
158 ObjectIdSet ids;
159 ids.insert(id);
160 ack_tracker_.Ack(ids);
161 }
162 }
163
127 void SyncInvalidationListener::Ready( 164 void SyncInvalidationListener::Ready(
128 invalidation::InvalidationClient* client) { 165 invalidation::InvalidationClient* client) {
129 DCHECK(CalledOnValidThread()); 166 DCHECK(CalledOnValidThread());
130 DCHECK_EQ(client, invalidation_client_.get()); 167 DCHECK_EQ(client, invalidation_client_.get());
131 ticl_state_ = INVALIDATIONS_ENABLED; 168 ticl_state_ = INVALIDATIONS_ENABLED;
132 EmitStateChange(); 169 EmitStateChange();
133 DoRegistrationUpdate(); 170 DoRegistrationUpdate();
134 } 171 }
135 172
136 void SyncInvalidationListener::Invalidate( 173 void SyncInvalidationListener::Invalidate(
(...skipping 23 matching lines...) Expand all
160 } 197 }
161 198
162 std::string payload; 199 std::string payload;
163 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 200 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
164 if (invalidation.has_payload()) 201 if (invalidation.has_payload())
165 payload = invalidation.payload(); 202 payload = invalidation.payload();
166 203
167 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) 204 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
168 << " to " << invalidation.version(); 205 << " to " << invalidation.version();
169 invalidation_state_map_[id].version = invalidation.version(); 206 invalidation_state_map_[id].version = invalidation.version();
207 invalidation_state_map_[id].payload = payload;
170 invalidation_state_tracker_.Call( 208 invalidation_state_tracker_.Call(
171 FROM_HERE, 209 FROM_HERE,
172 &InvalidationStateTracker::SetMaxVersionAndPayload, 210 &InvalidationStateTracker::SetMaxVersionAndPayload,
173 id, invalidation.version(), payload); 211 id, invalidation.version(), payload);
174 212
175 ObjectIdInvalidationMap invalidation_map; 213 ObjectIdSet ids;
176 invalidation_map[id].payload = payload; 214 ids.insert(id);
177 EmitInvalidation(invalidation_map); 215 PrepareInvalidation(ids, payload, client, ack_handle);
178 // TODO(akalin): We should really acknowledge only after we get the
179 // updates from the sync server. (see http://crbug.com/78462).
180 client->Acknowledge(ack_handle);
181 } 216 }
182 217
183 void SyncInvalidationListener::InvalidateUnknownVersion( 218 void SyncInvalidationListener::InvalidateUnknownVersion(
184 invalidation::InvalidationClient* client, 219 invalidation::InvalidationClient* client,
185 const invalidation::ObjectId& object_id, 220 const invalidation::ObjectId& object_id,
186 const invalidation::AckHandle& ack_handle) { 221 const invalidation::AckHandle& ack_handle) {
187 DCHECK(CalledOnValidThread()); 222 DCHECK(CalledOnValidThread());
188 DCHECK_EQ(client, invalidation_client_.get()); 223 DCHECK_EQ(client, invalidation_client_.get());
189 DVLOG(1) << "InvalidateUnknownVersion"; 224 DVLOG(1) << "InvalidateUnknownVersion";
190 225
191 ObjectIdInvalidationMap invalidation_map; 226 ObjectIdSet ids;
192 invalidation_map[object_id].payload = std::string(); 227 ids.insert(object_id);
193 EmitInvalidation(invalidation_map); 228 PrepareInvalidation(ids, std::string(), client, ack_handle);
194 // TODO(akalin): We should really acknowledge only after we get the
195 // updates from the sync server. (see http://crbug.com/78462).
196 client->Acknowledge(ack_handle);
197 } 229 }
198 230
199 // This should behave as if we got an invalidation with version 231 // This should behave as if we got an invalidation with version
200 // UNKNOWN_OBJECT_VERSION for all known data types. 232 // UNKNOWN_OBJECT_VERSION for all known data types.
201 void SyncInvalidationListener::InvalidateAll( 233 void SyncInvalidationListener::InvalidateAll(
202 invalidation::InvalidationClient* client, 234 invalidation::InvalidationClient* client,
203 const invalidation::AckHandle& ack_handle) { 235 const invalidation::AckHandle& ack_handle) {
204 DCHECK(CalledOnValidThread()); 236 DCHECK(CalledOnValidThread());
205 DCHECK_EQ(client, invalidation_client_.get()); 237 DCHECK_EQ(client, invalidation_client_.get());
206 DVLOG(1) << "InvalidateAll"; 238 DVLOG(1) << "InvalidateAll";
207 239
208 const ObjectIdInvalidationMap& invalidation_map = 240 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
209 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); 241 }
210 EmitInvalidation(invalidation_map); 242
211 // TODO(akalin): We should really acknowledge only after we get the 243 void SyncInvalidationListener::PrepareInvalidation(
212 // updates from the sync server. (see http://crbug.com/76482). 244 const ObjectIdSet& ids,
245 const std::string& payload,
246 invalidation::InvalidationClient* client,
247 const invalidation::AckHandle& ack_handle) {
248 DCHECK(CalledOnValidThread());
249
250 // A server invalidation resets the local retry count.
251 ack_tracker_.Ack(ids);
252 invalidation_state_tracker_.Call(
253 FROM_HERE,
254 &InvalidationStateTracker::GenerateAckHandles,
255 ids,
256 base::MessageLoopProxy::current(),
257 base::Bind(&SyncInvalidationListener::EmitInvalidation,
258 weak_ptr_factory_.GetWeakPtr(),
259 ids,
260 payload,
261 client,
262 ack_handle));
263 }
264
265 void SyncInvalidationListener::EmitInvalidation(
266 const ObjectIdSet& ids,
267 const std::string& payload,
268 invalidation::InvalidationClient* client,
269 const invalidation::AckHandle& ack_handle,
270 const AckHandleMap& local_ack_handles) {
271 DCHECK(CalledOnValidThread());
272 ObjectIdInvalidationMap invalidation_map =
273 ObjectIdSetToInvalidationMap(ids, payload);
274 for (AckHandleMap::const_iterator it = local_ack_handles.begin();
275 it != local_ack_handles.end(); ++it) {
276 // Update in-memory copy of the invalidation state.
277 invalidation_state_map_[it->first].expected = it->second;
278 invalidation_map[it->first].ack_handle = it->second;
279 }
280 ack_tracker_.Track(ids);
281 delegate_->OnInvalidate(invalidation_map);
213 client->Acknowledge(ack_handle); 282 client->Acknowledge(ack_handle);
214 } 283 }
215 284
216 void SyncInvalidationListener::EmitInvalidation( 285 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
217 const ObjectIdInvalidationMap& invalidation_map) { 286 ObjectIdInvalidationMap invalidation_map;
218 DCHECK(CalledOnValidThread()); 287 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
288 Invalidation invalidation;
289 invalidation.ack_handle = invalidation_state_map_[*it].expected;
290 invalidation.payload = invalidation_state_map_[*it].payload;
291 invalidation_map.insert(std::make_pair(*it, invalidation));
292 }
293
219 delegate_->OnInvalidate(invalidation_map); 294 delegate_->OnInvalidate(invalidation_map);
220 } 295 }
221 296
222 void SyncInvalidationListener::InformRegistrationStatus( 297 void SyncInvalidationListener::InformRegistrationStatus(
223 invalidation::InvalidationClient* client, 298 invalidation::InvalidationClient* client,
224 const invalidation::ObjectId& object_id, 299 const invalidation::ObjectId& object_id,
225 InvalidationListener::RegistrationState new_state) { 300 InvalidationListener::RegistrationState new_state) {
226 DCHECK(CalledOnValidThread()); 301 DCHECK(CalledOnValidThread());
227 DCHECK_EQ(client, invalidation_client_.get()); 302 DCHECK_EQ(client, invalidation_client_.get());
228 DVLOG(1) << "InformRegistrationStatus: " 303 DVLOG(1) << "InformRegistrationStatus: "
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
299 void SyncInvalidationListener::DoRegistrationUpdate() { 374 void SyncInvalidationListener::DoRegistrationUpdate() {
300 DCHECK(CalledOnValidThread()); 375 DCHECK(CalledOnValidThread());
301 const ObjectIdSet& unregistered_ids = 376 const ObjectIdSet& unregistered_ids =
302 registration_manager_->UpdateRegisteredIds(registered_ids_); 377 registration_manager_->UpdateRegisteredIds(registered_ids_);
303 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); 378 for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
304 it != unregistered_ids.end(); ++it) { 379 it != unregistered_ids.end(); ++it) {
305 invalidation_state_map_.erase(*it); 380 invalidation_state_map_.erase(*it);
306 } 381 }
307 invalidation_state_tracker_.Call( 382 invalidation_state_tracker_.Call(
308 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); 383 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
384 ack_tracker_.Ack(unregistered_ids);
309 } 385 }
310 386
311 void SyncInvalidationListener::StopForTest() { 387 void SyncInvalidationListener::StopForTest() {
312 DCHECK(CalledOnValidThread()); 388 DCHECK(CalledOnValidThread());
313 Stop(); 389 Stop();
314 } 390 }
315 391
316 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { 392 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
317 DCHECK(CalledOnValidThread()); 393 DCHECK(CalledOnValidThread());
318 return invalidation_state_map_; 394 return invalidation_state_map_;
319 } 395 }
320 396
397 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
398 return &ack_tracker_;
399 }
400
321 void SyncInvalidationListener::Stop() { 401 void SyncInvalidationListener::Stop() {
322 DCHECK(CalledOnValidThread()); 402 DCHECK(CalledOnValidThread());
323 if (!invalidation_client_.get()) { 403 if (!invalidation_client_.get()) {
324 return; 404 return;
325 } 405 }
326 406
407 ack_tracker_.Clear();
408
327 registration_manager_.reset(); 409 registration_manager_.reset();
328 sync_system_resources_.Stop(); 410 sync_system_resources_.Stop();
329 invalidation_client_->Stop(); 411 invalidation_client_->Stop();
330 412
331 invalidation_client_.reset(); 413 invalidation_client_.reset();
332 delegate_ = NULL; 414 delegate_ = NULL;
333 415
334 invalidation_state_tracker_.Reset(); 416 invalidation_state_tracker_.Reset();
335 invalidation_state_map_.clear(); 417 invalidation_state_map_.clear();
336 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 418 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
373 EmitStateChange(); 455 EmitStateChange();
374 } 456 }
375 457
376 void SyncInvalidationListener::OnIncomingNotification( 458 void SyncInvalidationListener::OnIncomingNotification(
377 const notifier::Notification& notification) { 459 const notifier::Notification& notification) {
378 DCHECK(CalledOnValidThread()); 460 DCHECK(CalledOnValidThread());
379 // Do nothing, since this is already handled by |invalidation_client_|. 461 // Do nothing, since this is already handled by |invalidation_client_|.
380 } 462 }
381 463
382 } // namespace syncer 464 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/notifier/sync_invalidation_listener.h ('k') | sync/notifier/sync_invalidation_listener_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698