Index: sync/notifier/sync_invalidation_listener.cc |
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
index 4e57bb85ed5a2096072ab572573d4d7cf01e431b..28b37bc07873bf829ad9342ac1cefc2ba75006b4 100644 |
--- a/sync/notifier/sync_invalidation_listener.cc |
+++ b/sync/notifier/sync_invalidation_listener.cc |
@@ -4,9 +4,9 @@ |
#include "sync/notifier/sync_invalidation_listener.h" |
-#include <string> |
#include <vector> |
+#include "base/bind.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/logging.h" |
@@ -29,8 +29,11 @@ namespace syncer { |
SyncInvalidationListener::Delegate::~Delegate() {} |
SyncInvalidationListener::SyncInvalidationListener( |
+ base::TickClock* tick_clock, |
scoped_ptr<notifier::PushClient> push_client) |
- : push_client_(push_client.get()), |
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
+ ack_tracker_(tick_clock, ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
+ push_client_(push_client.get()), |
sync_system_resources_(push_client.Pass(), |
ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
delegate_(NULL), |
@@ -105,6 +108,19 @@ void SyncInvalidationListener::Start( |
FROM_HERE, |
&InvalidationStateTracker::SetInvalidatorClientId, |
client_id); |
+ |
+ // Set up reminders for any invalidations that have not been locally |
+ // acknowledged. |
+ ObjectIdSet unacknowledged_ids; |
+ for (InvalidationStateMap::const_iterator it = |
+ invalidation_state_map_.begin(); |
+ it != invalidation_state_map_.end(); ++it) { |
+ if (it->second.expected.Equals(it->second.current)) |
+ continue; |
+ unacknowledged_ids.insert(it->first); |
+ } |
+ if (!unacknowledged_ids.empty()) |
+ ack_tracker_.Track(unacknowledged_ids); |
} |
void SyncInvalidationListener::UpdateCredentials( |
@@ -124,6 +140,27 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
} |
} |
+void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
+ const AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
+ if (state_it == invalidation_state_map_.end()) |
+ return; |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::Acknowledge, |
+ id, |
+ ack_handle); |
+ state_it->second.current = ack_handle; |
+ if (state_it->second.expected.Equals(ack_handle)) { |
+ // If the received ack matches the expected ack, then we no longer need to |
+ // keep track of |id| since it is up-to-date. |
+ ObjectIdSet ids; |
+ ids.insert(id); |
+ ack_tracker_.Ack(ids); |
+ } |
+} |
+ |
void SyncInvalidationListener::Ready( |
invalidation::InvalidationClient* client) { |
DCHECK(CalledOnValidThread()); |
@@ -167,17 +204,15 @@ void SyncInvalidationListener::Invalidate( |
DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
<< " to " << invalidation.version(); |
invalidation_state_map_[id].version = invalidation.version(); |
+ invalidation_state_map_[id].payload = payload; |
invalidation_state_tracker_.Call( |
FROM_HERE, |
&InvalidationStateTracker::SetMaxVersionAndPayload, |
id, invalidation.version(), payload); |
- ObjectIdInvalidationMap invalidation_map; |
- invalidation_map[id].payload = payload; |
- EmitInvalidation(invalidation_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/78462). |
- client->Acknowledge(ack_handle); |
+ ObjectIdSet ids; |
+ ids.insert(id); |
+ PrepareInvalidation(ids, payload, client, ack_handle); |
} |
void SyncInvalidationListener::InvalidateUnknownVersion( |
@@ -188,12 +223,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateUnknownVersion"; |
- ObjectIdInvalidationMap invalidation_map; |
- invalidation_map[object_id].payload = std::string(); |
- EmitInvalidation(invalidation_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/78462). |
- client->Acknowledge(ack_handle); |
+ ObjectIdSet ids; |
+ ids.insert(object_id); |
+ PrepareInvalidation(ids, std::string(), client, ack_handle); |
} |
// This should behave as if we got an invalidation with version |
@@ -205,17 +237,60 @@ void SyncInvalidationListener::InvalidateAll( |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateAll"; |
- const ObjectIdInvalidationMap& invalidation_map = |
- ObjectIdSetToInvalidationMap(registered_ids_, std::string()); |
- EmitInvalidation(invalidation_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/76482). |
- client->Acknowledge(ack_handle); |
+ PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
+} |
+ |
+void SyncInvalidationListener::PrepareInvalidation( |
+ const ObjectIdSet& ids, |
+ const std::string& payload, |
+ invalidation::InvalidationClient* client, |
+ const invalidation::AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ // A server invalidation resets the local retry count. |
+ ack_tracker_.Ack(ids); |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::GenerateAckHandles, |
+ ids, |
+ base::MessageLoopProxy::current(), |
+ base::Bind(&SyncInvalidationListener::EmitInvalidation, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ ids, |
+ payload, |
+ client, |
+ ack_handle)); |
} |
void SyncInvalidationListener::EmitInvalidation( |
- const ObjectIdInvalidationMap& invalidation_map) { |
+ const ObjectIdSet& ids, |
+ const std::string& payload, |
+ invalidation::InvalidationClient* client, |
+ const invalidation::AckHandle& ack_handle, |
+ const AckHandleMap& local_ack_handles) { |
DCHECK(CalledOnValidThread()); |
+ ObjectIdInvalidationMap invalidation_map = |
+ ObjectIdSetToInvalidationMap(ids, payload); |
+ for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
+ it != local_ack_handles.end(); ++it) { |
+ // Update in-memory copy of the invalidation state. |
+ invalidation_state_map_[it->first].expected = it->second; |
+ invalidation_map[it->first].ack_handle = it->second; |
+ } |
+ ack_tracker_.Track(ids); |
+ delegate_->OnInvalidate(invalidation_map); |
+ client->Acknowledge(ack_handle); |
+} |
+ |
+void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
+ ObjectIdInvalidationMap invalidation_map; |
+ for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
+ Invalidation invalidation; |
+ invalidation.ack_handle = invalidation_state_map_[*it].expected; |
+ invalidation.payload = invalidation_state_map_[*it].payload; |
+ invalidation_map.insert(std::make_pair(*it, invalidation)); |
+ } |
+ |
delegate_->OnInvalidate(invalidation_map); |
} |
@@ -306,6 +381,7 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
} |
invalidation_state_tracker_.Call( |
FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
+ ack_tracker_.Ack(unregistered_ids); |
} |
void SyncInvalidationListener::StopForTest() { |
@@ -318,12 +394,18 @@ InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
return invalidation_state_map_; |
} |
+AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { |
+ return &ack_tracker_; |
+} |
+ |
void SyncInvalidationListener::Stop() { |
DCHECK(CalledOnValidThread()); |
if (!invalidation_client_.get()) { |
return; |
} |
+ ack_tracker_.Clear(); |
+ |
registration_manager_.reset(); |
sync_system_resources_.Stop(); |
invalidation_client_->Stop(); |