Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(484)

Unified Diff: sync/notifier/sync_invalidation_listener.cc

Issue 10911084: Implement Invalidator::Acknowledge (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Adapt patch to new TickClock interface Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sync/notifier/sync_invalidation_listener.h ('k') | sync/notifier/sync_invalidation_listener_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sync/notifier/sync_invalidation_listener.cc
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
index 4e57bb85ed5a2096072ab572573d4d7cf01e431b..28b37bc07873bf829ad9342ac1cefc2ba75006b4 100644
--- a/sync/notifier/sync_invalidation_listener.cc
+++ b/sync/notifier/sync_invalidation_listener.cc
@@ -4,9 +4,9 @@
#include "sync/notifier/sync_invalidation_listener.h"
-#include <string>
#include <vector>
+#include "base/bind.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
@@ -29,8 +29,11 @@ namespace syncer {
SyncInvalidationListener::Delegate::~Delegate() {}
SyncInvalidationListener::SyncInvalidationListener(
+ base::TickClock* tick_clock,
scoped_ptr<notifier::PushClient> push_client)
- : push_client_(push_client.get()),
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ ack_tracker_(tick_clock, ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ push_client_(push_client.get()),
sync_system_resources_(push_client.Pass(),
ALLOW_THIS_IN_INITIALIZER_LIST(this)),
delegate_(NULL),
@@ -105,6 +108,19 @@ void SyncInvalidationListener::Start(
FROM_HERE,
&InvalidationStateTracker::SetInvalidatorClientId,
client_id);
+
+ // Set up reminders for any invalidations that have not been locally
+ // acknowledged.
+ ObjectIdSet unacknowledged_ids;
+ for (InvalidationStateMap::const_iterator it =
+ invalidation_state_map_.begin();
+ it != invalidation_state_map_.end(); ++it) {
+ if (it->second.expected.Equals(it->second.current))
+ continue;
+ unacknowledged_ids.insert(it->first);
+ }
+ if (!unacknowledged_ids.empty())
+ ack_tracker_.Track(unacknowledged_ids);
}
void SyncInvalidationListener::UpdateCredentials(
@@ -124,6 +140,27 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
}
}
+void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
+ const AckHandle& ack_handle) {
+ DCHECK(CalledOnValidThread());
+ InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
+ if (state_it == invalidation_state_map_.end())
+ return;
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::Acknowledge,
+ id,
+ ack_handle);
+ state_it->second.current = ack_handle;
+ if (state_it->second.expected.Equals(ack_handle)) {
+ // If the received ack matches the expected ack, then we no longer need to
+ // keep track of |id| since it is up-to-date.
+ ObjectIdSet ids;
+ ids.insert(id);
+ ack_tracker_.Ack(ids);
+ }
+}
+
void SyncInvalidationListener::Ready(
invalidation::InvalidationClient* client) {
DCHECK(CalledOnValidThread());
@@ -167,17 +204,15 @@ void SyncInvalidationListener::Invalidate(
DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
<< " to " << invalidation.version();
invalidation_state_map_[id].version = invalidation.version();
+ invalidation_state_map_[id].payload = payload;
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetMaxVersionAndPayload,
id, invalidation.version(), payload);
- ObjectIdInvalidationMap invalidation_map;
- invalidation_map[id].payload = payload;
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/78462).
- client->Acknowledge(ack_handle);
+ ObjectIdSet ids;
+ ids.insert(id);
+ PrepareInvalidation(ids, payload, client, ack_handle);
}
void SyncInvalidationListener::InvalidateUnknownVersion(
@@ -188,12 +223,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateUnknownVersion";
- ObjectIdInvalidationMap invalidation_map;
- invalidation_map[object_id].payload = std::string();
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/78462).
- client->Acknowledge(ack_handle);
+ ObjectIdSet ids;
+ ids.insert(object_id);
+ PrepareInvalidation(ids, std::string(), client, ack_handle);
}
// This should behave as if we got an invalidation with version
@@ -205,17 +237,60 @@ void SyncInvalidationListener::InvalidateAll(
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateAll";
- const ObjectIdInvalidationMap& invalidation_map =
- ObjectIdSetToInvalidationMap(registered_ids_, std::string());
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/76482).
- client->Acknowledge(ack_handle);
+ PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
+}
+
+void SyncInvalidationListener::PrepareInvalidation(
+ const ObjectIdSet& ids,
+ const std::string& payload,
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle) {
+ DCHECK(CalledOnValidThread());
+
+ // A server invalidation resets the local retry count.
+ ack_tracker_.Ack(ids);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::GenerateAckHandles,
+ ids,
+ base::MessageLoopProxy::current(),
+ base::Bind(&SyncInvalidationListener::EmitInvalidation,
+ weak_ptr_factory_.GetWeakPtr(),
+ ids,
+ payload,
+ client,
+ ack_handle));
}
void SyncInvalidationListener::EmitInvalidation(
- const ObjectIdInvalidationMap& invalidation_map) {
+ const ObjectIdSet& ids,
+ const std::string& payload,
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle,
+ const AckHandleMap& local_ack_handles) {
DCHECK(CalledOnValidThread());
+ ObjectIdInvalidationMap invalidation_map =
+ ObjectIdSetToInvalidationMap(ids, payload);
+ for (AckHandleMap::const_iterator it = local_ack_handles.begin();
+ it != local_ack_handles.end(); ++it) {
+ // Update in-memory copy of the invalidation state.
+ invalidation_state_map_[it->first].expected = it->second;
+ invalidation_map[it->first].ack_handle = it->second;
+ }
+ ack_tracker_.Track(ids);
+ delegate_->OnInvalidate(invalidation_map);
+ client->Acknowledge(ack_handle);
+}
+
+void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
+ ObjectIdInvalidationMap invalidation_map;
+ for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
+ Invalidation invalidation;
+ invalidation.ack_handle = invalidation_state_map_[*it].expected;
+ invalidation.payload = invalidation_state_map_[*it].payload;
+ invalidation_map.insert(std::make_pair(*it, invalidation));
+ }
+
delegate_->OnInvalidate(invalidation_map);
}
@@ -306,6 +381,7 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
}
invalidation_state_tracker_.Call(
FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
+ ack_tracker_.Ack(unregistered_ids);
}
void SyncInvalidationListener::StopForTest() {
@@ -318,12 +394,18 @@ InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
return invalidation_state_map_;
}
+AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
+ return &ack_tracker_;
+}
+
void SyncInvalidationListener::Stop() {
DCHECK(CalledOnValidThread());
if (!invalidation_client_.get()) {
return;
}
+ ack_tracker_.Clear();
+
registration_manager_.reset();
sync_system_resources_.Stop();
invalidation_client_->Stop();
« no previous file with comments | « sync/notifier/sync_invalidation_listener.h ('k') | sync/notifier/sync_invalidation_listener_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698