OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
6 | 6 |
7 #include <string> | |
8 #include <vector> | 7 #include <vector> |
9 | 8 |
| 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/tracked_objects.h" | 13 #include "base/tracked_objects.h" |
14 #include "google/cacheinvalidation/include/invalidation-client.h" | 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
15 #include "google/cacheinvalidation/include/types.h" | 15 #include "google/cacheinvalidation/include/types.h" |
16 #include "google/cacheinvalidation/types.pb.h" | 16 #include "google/cacheinvalidation/types.pb.h" |
17 #include "jingle/notifier/listener/push_client.h" | 17 #include "jingle/notifier/listener/push_client.h" |
18 #include "sync/notifier/invalidation_util.h" | 18 #include "sync/notifier/invalidation_util.h" |
19 #include "sync/notifier/registration_manager.h" | 19 #include "sync/notifier/registration_manager.h" |
20 | 20 |
21 namespace { | 21 namespace { |
22 | 22 |
23 const char kApplicationName[] = "chrome-sync"; | 23 const char kApplicationName[] = "chrome-sync"; |
24 | 24 |
25 } // namespace | 25 } // namespace |
26 | 26 |
27 namespace syncer { | 27 namespace syncer { |
28 | 28 |
29 SyncInvalidationListener::Delegate::~Delegate() {} | 29 SyncInvalidationListener::Delegate::~Delegate() {} |
30 | 30 |
31 SyncInvalidationListener::SyncInvalidationListener( | 31 SyncInvalidationListener::SyncInvalidationListener( |
| 32 base::TickClock* tick_clock, |
32 scoped_ptr<notifier::PushClient> push_client) | 33 scoped_ptr<notifier::PushClient> push_client) |
33 : push_client_(push_client.get()), | 34 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 35 ack_tracker_(tick_clock, ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 36 push_client_(push_client.get()), |
34 sync_system_resources_(push_client.Pass(), | 37 sync_system_resources_(push_client.Pass(), |
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 38 ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
36 delegate_(NULL), | 39 delegate_(NULL), |
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 40 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { | 41 push_client_state_(DEFAULT_INVALIDATION_ERROR) { |
39 DCHECK(CalledOnValidThread()); | 42 DCHECK(CalledOnValidThread()); |
40 push_client_->AddObserver(this); | 43 push_client_->AddObserver(this); |
41 } | 44 } |
42 | 45 |
43 SyncInvalidationListener::~SyncInvalidationListener() { | 46 SyncInvalidationListener::~SyncInvalidationListener() { |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
98 | 101 |
99 // TODO(rlarocque): This call exists as part of an effort to move the | 102 // TODO(rlarocque): This call exists as part of an effort to move the |
100 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to | 103 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to |
101 // storage that lives on the UI thread. Once this has been in place for a | 104 // storage that lives on the UI thread. Once this has been in place for a |
102 // milestone or two, we can remove it and start looking for invalidator client | 105 // milestone or two, we can remove it and start looking for invalidator client |
103 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. | 106 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. |
104 invalidation_state_tracker_.Call( | 107 invalidation_state_tracker_.Call( |
105 FROM_HERE, | 108 FROM_HERE, |
106 &InvalidationStateTracker::SetInvalidatorClientId, | 109 &InvalidationStateTracker::SetInvalidatorClientId, |
107 client_id); | 110 client_id); |
| 111 |
| 112 // Set up reminders for any invalidations that have not been locally |
| 113 // acknowledged. |
| 114 ObjectIdSet unacknowledged_ids; |
| 115 for (InvalidationStateMap::const_iterator it = |
| 116 invalidation_state_map_.begin(); |
| 117 it != invalidation_state_map_.end(); ++it) { |
| 118 if (it->second.expected.Equals(it->second.current)) |
| 119 continue; |
| 120 unacknowledged_ids.insert(it->first); |
| 121 } |
| 122 if (!unacknowledged_ids.empty()) |
| 123 ack_tracker_.Track(unacknowledged_ids); |
108 } | 124 } |
109 | 125 |
110 void SyncInvalidationListener::UpdateCredentials( | 126 void SyncInvalidationListener::UpdateCredentials( |
111 const std::string& email, const std::string& token) { | 127 const std::string& email, const std::string& token) { |
112 DCHECK(CalledOnValidThread()); | 128 DCHECK(CalledOnValidThread()); |
113 sync_system_resources_.network()->UpdateCredentials(email, token); | 129 sync_system_resources_.network()->UpdateCredentials(email, token); |
114 } | 130 } |
115 | 131 |
116 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 132 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
117 DCHECK(CalledOnValidThread()); | 133 DCHECK(CalledOnValidThread()); |
118 registered_ids_ = ids; | 134 registered_ids_ = ids; |
119 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 135 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
120 // working XMPP connection (as observed by us), so check it instead | 136 // working XMPP connection (as observed by us), so check it instead |
121 // of GetState() (see http://crbug.com/139424). | 137 // of GetState() (see http://crbug.com/139424). |
122 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { | 138 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { |
123 DoRegistrationUpdate(); | 139 DoRegistrationUpdate(); |
124 } | 140 } |
125 } | 141 } |
126 | 142 |
| 143 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
| 144 const AckHandle& ack_handle) { |
| 145 DCHECK(CalledOnValidThread()); |
| 146 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
| 147 if (state_it == invalidation_state_map_.end()) |
| 148 return; |
| 149 invalidation_state_tracker_.Call( |
| 150 FROM_HERE, |
| 151 &InvalidationStateTracker::Acknowledge, |
| 152 id, |
| 153 ack_handle); |
| 154 state_it->second.current = ack_handle; |
| 155 if (state_it->second.expected.Equals(ack_handle)) { |
| 156 // If the received ack matches the expected ack, then we no longer need to |
| 157 // keep track of |id| since it is up-to-date. |
| 158 ObjectIdSet ids; |
| 159 ids.insert(id); |
| 160 ack_tracker_.Ack(ids); |
| 161 } |
| 162 } |
| 163 |
127 void SyncInvalidationListener::Ready( | 164 void SyncInvalidationListener::Ready( |
128 invalidation::InvalidationClient* client) { | 165 invalidation::InvalidationClient* client) { |
129 DCHECK(CalledOnValidThread()); | 166 DCHECK(CalledOnValidThread()); |
130 DCHECK_EQ(client, invalidation_client_.get()); | 167 DCHECK_EQ(client, invalidation_client_.get()); |
131 ticl_state_ = INVALIDATIONS_ENABLED; | 168 ticl_state_ = INVALIDATIONS_ENABLED; |
132 EmitStateChange(); | 169 EmitStateChange(); |
133 DoRegistrationUpdate(); | 170 DoRegistrationUpdate(); |
134 } | 171 } |
135 | 172 |
136 void SyncInvalidationListener::Invalidate( | 173 void SyncInvalidationListener::Invalidate( |
(...skipping 23 matching lines...) Expand all Loading... |
160 } | 197 } |
161 | 198 |
162 std::string payload; | 199 std::string payload; |
163 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 200 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
164 if (invalidation.has_payload()) | 201 if (invalidation.has_payload()) |
165 payload = invalidation.payload(); | 202 payload = invalidation.payload(); |
166 | 203 |
167 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 204 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
168 << " to " << invalidation.version(); | 205 << " to " << invalidation.version(); |
169 invalidation_state_map_[id].version = invalidation.version(); | 206 invalidation_state_map_[id].version = invalidation.version(); |
| 207 invalidation_state_map_[id].payload = payload; |
170 invalidation_state_tracker_.Call( | 208 invalidation_state_tracker_.Call( |
171 FROM_HERE, | 209 FROM_HERE, |
172 &InvalidationStateTracker::SetMaxVersionAndPayload, | 210 &InvalidationStateTracker::SetMaxVersionAndPayload, |
173 id, invalidation.version(), payload); | 211 id, invalidation.version(), payload); |
174 | 212 |
175 ObjectIdInvalidationMap invalidation_map; | 213 ObjectIdSet ids; |
176 invalidation_map[id].payload = payload; | 214 ids.insert(id); |
177 EmitInvalidation(invalidation_map); | 215 PrepareInvalidation(ids, payload, client, ack_handle); |
178 // TODO(akalin): We should really acknowledge only after we get the | |
179 // updates from the sync server. (see http://crbug.com/78462). | |
180 client->Acknowledge(ack_handle); | |
181 } | 216 } |
182 | 217 |
183 void SyncInvalidationListener::InvalidateUnknownVersion( | 218 void SyncInvalidationListener::InvalidateUnknownVersion( |
184 invalidation::InvalidationClient* client, | 219 invalidation::InvalidationClient* client, |
185 const invalidation::ObjectId& object_id, | 220 const invalidation::ObjectId& object_id, |
186 const invalidation::AckHandle& ack_handle) { | 221 const invalidation::AckHandle& ack_handle) { |
187 DCHECK(CalledOnValidThread()); | 222 DCHECK(CalledOnValidThread()); |
188 DCHECK_EQ(client, invalidation_client_.get()); | 223 DCHECK_EQ(client, invalidation_client_.get()); |
189 DVLOG(1) << "InvalidateUnknownVersion"; | 224 DVLOG(1) << "InvalidateUnknownVersion"; |
190 | 225 |
191 ObjectIdInvalidationMap invalidation_map; | 226 ObjectIdSet ids; |
192 invalidation_map[object_id].payload = std::string(); | 227 ids.insert(object_id); |
193 EmitInvalidation(invalidation_map); | 228 PrepareInvalidation(ids, std::string(), client, ack_handle); |
194 // TODO(akalin): We should really acknowledge only after we get the | |
195 // updates from the sync server. (see http://crbug.com/78462). | |
196 client->Acknowledge(ack_handle); | |
197 } | 229 } |
198 | 230 |
199 // This should behave as if we got an invalidation with version | 231 // This should behave as if we got an invalidation with version |
200 // UNKNOWN_OBJECT_VERSION for all known data types. | 232 // UNKNOWN_OBJECT_VERSION for all known data types. |
201 void SyncInvalidationListener::InvalidateAll( | 233 void SyncInvalidationListener::InvalidateAll( |
202 invalidation::InvalidationClient* client, | 234 invalidation::InvalidationClient* client, |
203 const invalidation::AckHandle& ack_handle) { | 235 const invalidation::AckHandle& ack_handle) { |
204 DCHECK(CalledOnValidThread()); | 236 DCHECK(CalledOnValidThread()); |
205 DCHECK_EQ(client, invalidation_client_.get()); | 237 DCHECK_EQ(client, invalidation_client_.get()); |
206 DVLOG(1) << "InvalidateAll"; | 238 DVLOG(1) << "InvalidateAll"; |
207 | 239 |
208 const ObjectIdInvalidationMap& invalidation_map = | 240 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
209 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); | 241 } |
210 EmitInvalidation(invalidation_map); | 242 |
211 // TODO(akalin): We should really acknowledge only after we get the | 243 void SyncInvalidationListener::PrepareInvalidation( |
212 // updates from the sync server. (see http://crbug.com/76482). | 244 const ObjectIdSet& ids, |
| 245 const std::string& payload, |
| 246 invalidation::InvalidationClient* client, |
| 247 const invalidation::AckHandle& ack_handle) { |
| 248 DCHECK(CalledOnValidThread()); |
| 249 |
| 250 // A server invalidation resets the local retry count. |
| 251 ack_tracker_.Ack(ids); |
| 252 invalidation_state_tracker_.Call( |
| 253 FROM_HERE, |
| 254 &InvalidationStateTracker::GenerateAckHandles, |
| 255 ids, |
| 256 base::MessageLoopProxy::current(), |
| 257 base::Bind(&SyncInvalidationListener::EmitInvalidation, |
| 258 weak_ptr_factory_.GetWeakPtr(), |
| 259 ids, |
| 260 payload, |
| 261 client, |
| 262 ack_handle)); |
| 263 } |
| 264 |
| 265 void SyncInvalidationListener::EmitInvalidation( |
| 266 const ObjectIdSet& ids, |
| 267 const std::string& payload, |
| 268 invalidation::InvalidationClient* client, |
| 269 const invalidation::AckHandle& ack_handle, |
| 270 const AckHandleMap& local_ack_handles) { |
| 271 DCHECK(CalledOnValidThread()); |
| 272 ObjectIdInvalidationMap invalidation_map = |
| 273 ObjectIdSetToInvalidationMap(ids, payload); |
| 274 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
| 275 it != local_ack_handles.end(); ++it) { |
| 276 // Update in-memory copy of the invalidation state. |
| 277 invalidation_state_map_[it->first].expected = it->second; |
| 278 invalidation_map[it->first].ack_handle = it->second; |
| 279 } |
| 280 ack_tracker_.Track(ids); |
| 281 delegate_->OnInvalidate(invalidation_map); |
213 client->Acknowledge(ack_handle); | 282 client->Acknowledge(ack_handle); |
214 } | 283 } |
215 | 284 |
216 void SyncInvalidationListener::EmitInvalidation( | 285 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
217 const ObjectIdInvalidationMap& invalidation_map) { | 286 ObjectIdInvalidationMap invalidation_map; |
218 DCHECK(CalledOnValidThread()); | 287 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
| 288 Invalidation invalidation; |
| 289 invalidation.ack_handle = invalidation_state_map_[*it].expected; |
| 290 invalidation.payload = invalidation_state_map_[*it].payload; |
| 291 invalidation_map.insert(std::make_pair(*it, invalidation)); |
| 292 } |
| 293 |
219 delegate_->OnInvalidate(invalidation_map); | 294 delegate_->OnInvalidate(invalidation_map); |
220 } | 295 } |
221 | 296 |
222 void SyncInvalidationListener::InformRegistrationStatus( | 297 void SyncInvalidationListener::InformRegistrationStatus( |
223 invalidation::InvalidationClient* client, | 298 invalidation::InvalidationClient* client, |
224 const invalidation::ObjectId& object_id, | 299 const invalidation::ObjectId& object_id, |
225 InvalidationListener::RegistrationState new_state) { | 300 InvalidationListener::RegistrationState new_state) { |
226 DCHECK(CalledOnValidThread()); | 301 DCHECK(CalledOnValidThread()); |
227 DCHECK_EQ(client, invalidation_client_.get()); | 302 DCHECK_EQ(client, invalidation_client_.get()); |
228 DVLOG(1) << "InformRegistrationStatus: " | 303 DVLOG(1) << "InformRegistrationStatus: " |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
299 void SyncInvalidationListener::DoRegistrationUpdate() { | 374 void SyncInvalidationListener::DoRegistrationUpdate() { |
300 DCHECK(CalledOnValidThread()); | 375 DCHECK(CalledOnValidThread()); |
301 const ObjectIdSet& unregistered_ids = | 376 const ObjectIdSet& unregistered_ids = |
302 registration_manager_->UpdateRegisteredIds(registered_ids_); | 377 registration_manager_->UpdateRegisteredIds(registered_ids_); |
303 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | 378 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
304 it != unregistered_ids.end(); ++it) { | 379 it != unregistered_ids.end(); ++it) { |
305 invalidation_state_map_.erase(*it); | 380 invalidation_state_map_.erase(*it); |
306 } | 381 } |
307 invalidation_state_tracker_.Call( | 382 invalidation_state_tracker_.Call( |
308 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 383 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| 384 ack_tracker_.Ack(unregistered_ids); |
309 } | 385 } |
310 | 386 |
311 void SyncInvalidationListener::StopForTest() { | 387 void SyncInvalidationListener::StopForTest() { |
312 DCHECK(CalledOnValidThread()); | 388 DCHECK(CalledOnValidThread()); |
313 Stop(); | 389 Stop(); |
314 } | 390 } |
315 | 391 |
316 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { | 392 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
317 DCHECK(CalledOnValidThread()); | 393 DCHECK(CalledOnValidThread()); |
318 return invalidation_state_map_; | 394 return invalidation_state_map_; |
319 } | 395 } |
320 | 396 |
| 397 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { |
| 398 return &ack_tracker_; |
| 399 } |
| 400 |
321 void SyncInvalidationListener::Stop() { | 401 void SyncInvalidationListener::Stop() { |
322 DCHECK(CalledOnValidThread()); | 402 DCHECK(CalledOnValidThread()); |
323 if (!invalidation_client_.get()) { | 403 if (!invalidation_client_.get()) { |
324 return; | 404 return; |
325 } | 405 } |
326 | 406 |
| 407 ack_tracker_.Clear(); |
| 408 |
327 registration_manager_.reset(); | 409 registration_manager_.reset(); |
328 sync_system_resources_.Stop(); | 410 sync_system_resources_.Stop(); |
329 invalidation_client_->Stop(); | 411 invalidation_client_->Stop(); |
330 | 412 |
331 invalidation_client_.reset(); | 413 invalidation_client_.reset(); |
332 delegate_ = NULL; | 414 delegate_ = NULL; |
333 | 415 |
334 invalidation_state_tracker_.Reset(); | 416 invalidation_state_tracker_.Reset(); |
335 invalidation_state_map_.clear(); | 417 invalidation_state_map_.clear(); |
336 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 418 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
373 EmitStateChange(); | 455 EmitStateChange(); |
374 } | 456 } |
375 | 457 |
376 void SyncInvalidationListener::OnIncomingNotification( | 458 void SyncInvalidationListener::OnIncomingNotification( |
377 const notifier::Notification& notification) { | 459 const notifier::Notification& notification) { |
378 DCHECK(CalledOnValidThread()); | 460 DCHECK(CalledOnValidThread()); |
379 // Do nothing, since this is already handled by |invalidation_client_|. | 461 // Do nothing, since this is already handled by |invalidation_client_|. |
380 } | 462 } |
381 | 463 |
382 } // namespace syncer | 464 } // namespace syncer |
OLD | NEW |