| Index: sync/internal_api/shared_model_type_processor.cc
|
| diff --git a/sync/internal_api/shared_model_type_processor.cc b/sync/internal_api/shared_model_type_processor.cc
|
| deleted file mode 100644
|
| index 8b046240dabaa0a8df7a2e3c8b452994ae202256..0000000000000000000000000000000000000000
|
| --- a/sync/internal_api/shared_model_type_processor.cc
|
| +++ /dev/null
|
| @@ -1,690 +0,0 @@
|
| -// Copyright 2014 The Chromium Authors. All rights reserved.
|
| -// Use of this source code is governed by a BSD-style license that can be
|
| -// found in the LICENSE file.
|
| -
|
| -#include "sync/internal_api/public/shared_model_type_processor.h"
|
| -
|
| -#include <utility>
|
| -#include <vector>
|
| -
|
| -#include "base/bind.h"
|
| -#include "base/location.h"
|
| -#include "base/memory/ptr_util.h"
|
| -#include "base/metrics/histogram.h"
|
| -#include "base/threading/thread_task_runner_handle.h"
|
| -#include "sync/engine/commit_queue.h"
|
| -#include "sync/internal_api/public/activation_context.h"
|
| -#include "sync/internal_api/public/processor_entity_tracker.h"
|
| -#include "sync/syncable/syncable_util.h"
|
| -
|
| -namespace syncer_v2 {
|
| -
|
| -namespace {
|
| -
|
| -class ModelTypeProcessorProxy : public ModelTypeProcessor {
|
| - public:
|
| - ModelTypeProcessorProxy(
|
| - const base::WeakPtr<ModelTypeProcessor>& processor,
|
| - const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
|
| - ~ModelTypeProcessorProxy() override;
|
| -
|
| - void ConnectSync(std::unique_ptr<CommitQueue> worker) override;
|
| - void DisconnectSync() override;
|
| - void OnCommitCompleted(const sync_pb::DataTypeState& type_state,
|
| - const CommitResponseDataList& response_list) override;
|
| - void OnUpdateReceived(const sync_pb::DataTypeState& type_state,
|
| - const UpdateResponseDataList& updates) override;
|
| -
|
| - private:
|
| - base::WeakPtr<ModelTypeProcessor> processor_;
|
| - scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
|
| -};
|
| -
|
| -ModelTypeProcessorProxy::ModelTypeProcessorProxy(
|
| - const base::WeakPtr<ModelTypeProcessor>& processor,
|
| - const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
|
| - : processor_(processor), processor_task_runner_(processor_task_runner) {}
|
| -
|
| -ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {}
|
| -
|
| -void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) {
|
| - processor_task_runner_->PostTask(
|
| - FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_,
|
| - base::Passed(std::move(worker))));
|
| -}
|
| -
|
| -void ModelTypeProcessorProxy::DisconnectSync() {
|
| - processor_task_runner_->PostTask(
|
| - FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_));
|
| -}
|
| -
|
| -void ModelTypeProcessorProxy::OnCommitCompleted(
|
| - const sync_pb::DataTypeState& type_state,
|
| - const CommitResponseDataList& response_list) {
|
| - processor_task_runner_->PostTask(
|
| - FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_,
|
| - type_state, response_list));
|
| -}
|
| -
|
| -void ModelTypeProcessorProxy::OnUpdateReceived(
|
| - const sync_pb::DataTypeState& type_state,
|
| - const UpdateResponseDataList& updates) {
|
| - processor_task_runner_->PostTask(
|
| - FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_,
|
| - type_state, updates));
|
| -}
|
| -
|
| -} // namespace
|
| -
|
| -SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type,
|
| - ModelTypeService* service)
|
| - : type_(type),
|
| - is_metadata_loaded_(false),
|
| - is_initial_pending_data_loaded_(false),
|
| - service_(service),
|
| - error_handler_(nullptr),
|
| - weak_ptr_factory_(this) {
|
| - DCHECK(service);
|
| -}
|
| -
|
| -SharedModelTypeProcessor::~SharedModelTypeProcessor() {}
|
| -
|
| -// static
|
| -std::unique_ptr<ModelTypeChangeProcessor>
|
| -SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type,
|
| - ModelTypeService* service) {
|
| - return std::unique_ptr<ModelTypeChangeProcessor>(
|
| - new SharedModelTypeProcessor(type, service));
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnSyncStarting(
|
| - syncer::DataTypeErrorHandler* error_handler,
|
| - const StartCallback& start_callback) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(start_callback_.is_null());
|
| - DCHECK(!IsConnected());
|
| - DCHECK(error_handler);
|
| - DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_);
|
| -
|
| - error_handler_ = error_handler;
|
| - start_callback_ = start_callback;
|
| - ConnectIfReady();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnMetadataLoaded(
|
| - syncer::SyncError error,
|
| - std::unique_ptr<MetadataBatch> batch) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(entities_.empty());
|
| - DCHECK(!is_metadata_loaded_);
|
| - DCHECK(!IsConnected());
|
| -
|
| - is_metadata_loaded_ = true;
|
| - // Flip this flag here to cover all cases where we don't need to load data.
|
| - is_initial_pending_data_loaded_ = true;
|
| -
|
| - if (error.IsSet()) {
|
| - start_error_ = error;
|
| - ConnectIfReady();
|
| - return;
|
| - }
|
| -
|
| - if (batch->GetDataTypeState().initial_sync_done()) {
|
| - EntityMetadataMap metadata_map(batch->TakeAllMetadata());
|
| - std::vector<std::string> entities_to_commit;
|
| -
|
| - for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) {
|
| - std::unique_ptr<ProcessorEntityTracker> entity =
|
| - ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second);
|
| - if (entity->RequiresCommitData()) {
|
| - entities_to_commit.push_back(entity->client_tag());
|
| - }
|
| - entities_[entity->metadata().client_tag_hash()] = std::move(entity);
|
| - }
|
| - data_type_state_ = batch->GetDataTypeState();
|
| - if (!entities_to_commit.empty()) {
|
| - is_initial_pending_data_loaded_ = false;
|
| - service_->GetData(
|
| - entities_to_commit,
|
| - base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| - } else {
|
| - // First time syncing; initialize metadata.
|
| - data_type_state_.mutable_progress_marker()->set_data_type_id(
|
| - GetSpecificsFieldNumberFromModelType(type_));
|
| - }
|
| -
|
| - ConnectIfReady();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::ConnectIfReady() {
|
| - DCHECK(CalledOnValidThread());
|
| - if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ ||
|
| - start_callback_.is_null()) {
|
| - return;
|
| - }
|
| -
|
| - std::unique_ptr<ActivationContext> activation_context;
|
| -
|
| - if (!start_error_.IsSet()) {
|
| - activation_context = base::WrapUnique(new ActivationContext);
|
| - activation_context->data_type_state = data_type_state_;
|
| - activation_context->type_processor = base::WrapUnique(
|
| - new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(),
|
| - base::ThreadTaskRunnerHandle::Get()));
|
| - }
|
| -
|
| - start_callback_.Run(start_error_, std::move(activation_context));
|
| - start_callback_.Reset();
|
| -}
|
| -
|
| -bool SharedModelTypeProcessor::IsAllowingChanges() const {
|
| - return is_metadata_loaded_;
|
| -}
|
| -
|
| -bool SharedModelTypeProcessor::IsConnected() const {
|
| - DCHECK(CalledOnValidThread());
|
| - return !!worker_;
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::DisableSync() {
|
| - DCHECK(CalledOnValidThread());
|
| - std::unique_ptr<MetadataChangeList> change_list =
|
| - service_->CreateMetadataChangeList();
|
| - for (auto it = entities_.begin(); it != entities_.end(); ++it) {
|
| - change_list->ClearMetadata(it->second->client_tag());
|
| - }
|
| - change_list->ClearDataTypeState();
|
| - // Nothing to do if this fails, so just ignore the error it might return.
|
| - service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
|
| -}
|
| -
|
| -syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError(
|
| - const tracked_objects::Location& location,
|
| - const std::string& message) {
|
| - if (error_handler_) {
|
| - return error_handler_->CreateAndUploadError(location, message, type_);
|
| - } else {
|
| - return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR,
|
| - message, type_);
|
| - }
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::ConnectSync(
|
| - std::unique_ptr<CommitQueue> worker) {
|
| - DCHECK(CalledOnValidThread());
|
| - DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
|
| -
|
| - worker_ = std::move(worker);
|
| -
|
| - FlushPendingCommitRequests();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::DisconnectSync() {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(IsConnected());
|
| -
|
| - DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_);
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| - worker_.reset();
|
| -
|
| - for (auto it = entities_.begin(); it != entities_.end(); ++it) {
|
| - it->second->ClearTransientSyncState();
|
| - }
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::Put(const std::string& tag,
|
| - std::unique_ptr<EntityData> data,
|
| - MetadataChangeList* metadata_change_list) {
|
| - DCHECK(IsAllowingChanges());
|
| - DCHECK(data.get());
|
| - DCHECK(!data->is_deleted());
|
| - DCHECK(!data->non_unique_name.empty());
|
| - DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics));
|
| -
|
| - if (!data_type_state_.initial_sync_done()) {
|
| - // Ignore changes before the initial sync is done.
|
| - return;
|
| - }
|
| -
|
| - // Fill in some data.
|
| - data->client_tag_hash = GetHashForTag(tag);
|
| - if (data->modification_time.is_null()) {
|
| - data->modification_time = base::Time::Now();
|
| - }
|
| -
|
| - ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash);
|
| -
|
| - if (entity == nullptr) {
|
| - // The service is creating a new entity.
|
| - if (data->creation_time.is_null()) {
|
| - data->creation_time = data->modification_time;
|
| - }
|
| - entity = CreateEntity(tag, *data);
|
| - } else if (entity->MatchesData(*data)) {
|
| - // Ignore changes that don't actually change anything.
|
| - return;
|
| - }
|
| -
|
| - entity->MakeLocalChange(std::move(data));
|
| - metadata_change_list->UpdateMetadata(tag, entity->metadata());
|
| -
|
| - FlushPendingCommitRequests();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::Delete(
|
| - const std::string& tag,
|
| - MetadataChangeList* metadata_change_list) {
|
| - DCHECK(IsAllowingChanges());
|
| -
|
| - if (!data_type_state_.initial_sync_done()) {
|
| - // Ignore changes before the initial sync is done.
|
| - return;
|
| - }
|
| -
|
| - ProcessorEntityTracker* entity = GetEntityForTag(tag);
|
| - if (entity == nullptr) {
|
| - // That's unusual, but not necessarily a bad thing.
|
| - // Missing is as good as deleted as far as the model is concerned.
|
| - DLOG(WARNING) << "Attempted to delete missing item."
|
| - << " client tag: " << tag;
|
| - return;
|
| - }
|
| -
|
| - entity->Delete();
|
| -
|
| - metadata_change_list->UpdateMetadata(tag, entity->metadata());
|
| - FlushPendingCommitRequests();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::FlushPendingCommitRequests() {
|
| - CommitRequestDataList commit_requests;
|
| -
|
| - // Don't bother sending anything if there's no one to send to.
|
| - if (!IsConnected())
|
| - return;
|
| -
|
| - // Don't send anything if the type is not ready to handle commits.
|
| - if (!data_type_state_.initial_sync_done())
|
| - return;
|
| -
|
| - // TODO(rlarocque): Do something smarter than iterate here.
|
| - for (auto it = entities_.begin(); it != entities_.end(); ++it) {
|
| - ProcessorEntityTracker* entity = it->second.get();
|
| - if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) {
|
| - CommitRequestData request;
|
| - entity->InitializeCommitRequestData(&request);
|
| - commit_requests.push_back(request);
|
| - }
|
| - }
|
| -
|
| - if (!commit_requests.empty())
|
| - worker_->EnqueueForCommit(commit_requests);
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnCommitCompleted(
|
| - const sync_pb::DataTypeState& type_state,
|
| - const CommitResponseDataList& response_list) {
|
| - std::unique_ptr<MetadataChangeList> change_list =
|
| - service_->CreateMetadataChangeList();
|
| -
|
| - data_type_state_ = type_state;
|
| - change_list->UpdateDataTypeState(data_type_state_);
|
| -
|
| - for (const CommitResponseData& data : response_list) {
|
| - ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash);
|
| - if (entity == nullptr) {
|
| - NOTREACHED() << "Received commit response for missing item."
|
| - << " type: " << type_
|
| - << " client_tag_hash: " << data.client_tag_hash;
|
| - continue;
|
| - }
|
| -
|
| - entity->ReceiveCommitResponse(data);
|
| -
|
| - if (entity->CanClearMetadata()) {
|
| - change_list->ClearMetadata(entity->client_tag());
|
| - entities_.erase(entity->metadata().client_tag_hash());
|
| - } else {
|
| - change_list->UpdateMetadata(entity->client_tag(), entity->metadata());
|
| - }
|
| - }
|
| -
|
| - syncer::SyncError error =
|
| - service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
|
| - if (error.IsSet()) {
|
| - error_handler_->OnSingleDataTypeUnrecoverableError(error);
|
| - }
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnUpdateReceived(
|
| - const sync_pb::DataTypeState& data_type_state,
|
| - const UpdateResponseDataList& updates) {
|
| - if (!data_type_state_.initial_sync_done()) {
|
| - OnInitialUpdateReceived(data_type_state, updates);
|
| - return;
|
| - }
|
| -
|
| - std::unique_ptr<MetadataChangeList> metadata_changes =
|
| - service_->CreateMetadataChangeList();
|
| - EntityChangeList entity_changes;
|
| -
|
| - metadata_changes->UpdateDataTypeState(data_type_state);
|
| - bool got_new_encryption_requirements =
|
| - data_type_state_.encryption_key_name() !=
|
| - data_type_state.encryption_key_name();
|
| - data_type_state_ = data_type_state;
|
| -
|
| - // If new encryption requirements come from the server, the entities that are
|
| - // in |updates| will be recorded here so they can be ignored during the
|
| - // re-encryption phase at the end.
|
| - std::unordered_set<std::string> already_updated;
|
| -
|
| - for (const UpdateResponseData& update : updates) {
|
| - ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes);
|
| -
|
| - if (!entity) {
|
| - // The update should be ignored.
|
| - continue;
|
| - }
|
| -
|
| - if (entity->CanClearMetadata()) {
|
| - metadata_changes->ClearMetadata(entity->client_tag());
|
| - entities_.erase(entity->metadata().client_tag_hash());
|
| - } else {
|
| - metadata_changes->UpdateMetadata(entity->client_tag(),
|
| - entity->metadata());
|
| - }
|
| -
|
| - if (got_new_encryption_requirements) {
|
| - already_updated.insert(entity->client_tag());
|
| - }
|
| - }
|
| -
|
| - if (got_new_encryption_requirements) {
|
| - RecommitAllForEncryption(already_updated, metadata_changes.get());
|
| - }
|
| -
|
| - // Inform the service of the new or updated data.
|
| - syncer::SyncError error =
|
| - service_->ApplySyncChanges(std::move(metadata_changes), entity_changes);
|
| -
|
| - if (error.IsSet()) {
|
| - error_handler_->OnSingleDataTypeUnrecoverableError(error);
|
| - } else {
|
| - // There may be new reasons to commit by the time this function is done.
|
| - FlushPendingCommitRequests();
|
| - }
|
| -}
|
| -
|
| -ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate(
|
| - const UpdateResponseData& update,
|
| - EntityChangeList* entity_changes) {
|
| - const EntityData& data = update.entity.value();
|
| - const std::string& client_tag_hash = data.client_tag_hash;
|
| - ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash);
|
| - if (entity == nullptr) {
|
| - if (data.is_deleted()) {
|
| - DLOG(WARNING) << "Received remote delete for a non-existing item."
|
| - << " client_tag_hash: " << client_tag_hash;
|
| - return nullptr;
|
| - }
|
| -
|
| - entity = CreateEntity(data);
|
| - entity_changes->push_back(
|
| - EntityChange::CreateAdd(entity->client_tag(), update.entity));
|
| - entity->RecordAcceptedUpdate(update);
|
| - } else if (entity->UpdateIsReflection(update.response_version)) {
|
| - // Seen this update before; just ignore it.
|
| - return nullptr;
|
| - } else if (entity->IsUnsynced()) {
|
| - ConflictResolution::Type resolution_type =
|
| - ResolveConflict(update, entity, entity_changes);
|
| - UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type,
|
| - ConflictResolution::TYPE_SIZE);
|
| - } else if (data.is_deleted()) {
|
| - // The entity was deleted; inform the service. Note that the local data
|
| - // can never be deleted at this point because it would have either been
|
| - // acked (the add case) or pending (the conflict case).
|
| - DCHECK(!entity->metadata().is_deleted());
|
| - entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag()));
|
| - entity->RecordAcceptedUpdate(update);
|
| - } else if (!entity->MatchesData(data)) {
|
| - // Specifics have changed, so update the service.
|
| - entity_changes->push_back(
|
| - EntityChange::CreateUpdate(entity->client_tag(), update.entity));
|
| - entity->RecordAcceptedUpdate(update);
|
| - } else {
|
| - // No data change; still record that the update was received.
|
| - entity->RecordAcceptedUpdate(update);
|
| - }
|
| -
|
| - // If the received entity has out of date encryption, we schedule another
|
| - // commit to fix it.
|
| - if (data_type_state_.encryption_key_name() != update.encryption_key_name) {
|
| - DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
|
| - << update.encryption_key_name << " -> "
|
| - << data_type_state_.encryption_key_name();
|
| -
|
| - entity->IncrementSequenceNumber();
|
| - if (entity->RequiresCommitData()) {
|
| - // If there is no pending commit data, then either this update wasn't
|
| - // in conflict or the remote data won; either way the remote data is
|
| - // the right data to re-queue for commit.
|
| - entity->CacheCommitData(update.entity);
|
| - }
|
| - }
|
| -
|
| - return entity;
|
| -}
|
| -
|
| -ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict(
|
| - const UpdateResponseData& update,
|
| - ProcessorEntityTracker* entity,
|
| - EntityChangeList* changes) {
|
| - const EntityData& remote_data = update.entity.value();
|
| -
|
| - ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE;
|
| - std::unique_ptr<EntityData> new_data;
|
| -
|
| - // Determine the type of resolution.
|
| - if (entity->MatchesData(remote_data)) {
|
| - // The changes are identical so there isn't a real conflict.
|
| - resolution_type = ConflictResolution::CHANGES_MATCH;
|
| - } else if (entity->RequiresCommitData() ||
|
| - entity->MatchesBaseData(entity->commit_data().value())) {
|
| - // If commit data needs to be loaded at this point, it can only be due to a
|
| - // re-encryption request. If the commit data matches the base data, it also
|
| - // must be a re-encryption request. Either way there's no real local change
|
| - // and the remote data should win.
|
| - resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION;
|
| - } else if (entity->MatchesBaseData(remote_data)) {
|
| - // The remote data isn't actually changing from the last remote data that
|
| - // was seen, so it must have been a re-encryption and can be ignored.
|
| - resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION;
|
| - } else {
|
| - // There's a real data conflict here; let the service resolve it.
|
| - ConflictResolution resolution =
|
| - service_->ResolveConflict(entity->commit_data().value(), remote_data);
|
| - resolution_type = resolution.type();
|
| - new_data = resolution.ExtractData();
|
| - }
|
| -
|
| - // Apply the resolution.
|
| - switch (resolution_type) {
|
| - case ConflictResolution::CHANGES_MATCH:
|
| - // Record the update and squash the pending commit.
|
| - entity->RecordForcedUpdate(update);
|
| - break;
|
| - case ConflictResolution::USE_LOCAL:
|
| - case ConflictResolution::IGNORE_REMOTE_ENCRYPTION:
|
| - // Record that we received the update from the server but leave the
|
| - // pending commit intact.
|
| - entity->RecordIgnoredUpdate(update);
|
| - break;
|
| - case ConflictResolution::USE_REMOTE:
|
| - case ConflictResolution::IGNORE_LOCAL_ENCRYPTION:
|
| - // Squash the pending commit.
|
| - entity->RecordForcedUpdate(update);
|
| - // Update client data to match server.
|
| - changes->push_back(
|
| - EntityChange::CreateUpdate(entity->client_tag(), update.entity));
|
| - break;
|
| - case ConflictResolution::USE_NEW:
|
| - // Record that we received the update.
|
| - entity->RecordIgnoredUpdate(update);
|
| - // Make a new pending commit to update the server.
|
| - entity->MakeLocalChange(std::move(new_data));
|
| - // Update the client with the new entity.
|
| - changes->push_back(EntityChange::CreateUpdate(entity->client_tag(),
|
| - entity->commit_data()));
|
| - break;
|
| - case ConflictResolution::TYPE_SIZE:
|
| - NOTREACHED();
|
| - break;
|
| - }
|
| - DCHECK(!new_data);
|
| -
|
| - return resolution_type;
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::RecommitAllForEncryption(
|
| - std::unordered_set<std::string> already_updated,
|
| - MetadataChangeList* metadata_changes) {
|
| - ModelTypeService::ClientTagList entities_needing_data;
|
| -
|
| - for (auto it = entities_.begin(); it != entities_.end(); ++it) {
|
| - ProcessorEntityTracker* entity = it->second.get();
|
| - if (already_updated.find(entity->client_tag()) != already_updated.end()) {
|
| - continue;
|
| - }
|
| - entity->IncrementSequenceNumber();
|
| - if (entity->RequiresCommitData()) {
|
| - entities_needing_data.push_back(entity->client_tag());
|
| - }
|
| - metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata());
|
| - }
|
| -
|
| - if (!entities_needing_data.empty()) {
|
| - service_->GetData(
|
| - entities_needing_data,
|
| - base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnInitialUpdateReceived(
|
| - const sync_pb::DataTypeState& data_type_state,
|
| - const UpdateResponseDataList& updates) {
|
| - DCHECK(entities_.empty());
|
| - // Ensure that initial sync was not already done and that the worker
|
| - // correctly marked initial sync as done for this update.
|
| - DCHECK(!data_type_state_.initial_sync_done());
|
| - DCHECK(data_type_state.initial_sync_done());
|
| -
|
| - std::unique_ptr<MetadataChangeList> metadata_changes =
|
| - service_->CreateMetadataChangeList();
|
| - EntityDataMap data_map;
|
| -
|
| - data_type_state_ = data_type_state;
|
| - metadata_changes->UpdateDataTypeState(data_type_state_);
|
| -
|
| - for (const UpdateResponseData& update : updates) {
|
| - ProcessorEntityTracker* entity = CreateEntity(update.entity.value());
|
| - const std::string& tag = entity->client_tag();
|
| - entity->RecordAcceptedUpdate(update);
|
| - metadata_changes->UpdateMetadata(tag, entity->metadata());
|
| - data_map[tag] = update.entity;
|
| - }
|
| -
|
| - // Let the service handle associating and merging the data.
|
| - syncer::SyncError error =
|
| - service_->MergeSyncData(std::move(metadata_changes), data_map);
|
| -
|
| - if (error.IsSet()) {
|
| - error_handler_->OnSingleDataTypeUnrecoverableError(error);
|
| - } else {
|
| - // We may have new reasons to commit by the time this function is done.
|
| - FlushPendingCommitRequests();
|
| - }
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnInitialPendingDataLoaded(
|
| - syncer::SyncError error,
|
| - std::unique_ptr<DataBatch> data_batch) {
|
| - DCHECK(!is_initial_pending_data_loaded_);
|
| -
|
| - if (error.IsSet()) {
|
| - start_error_ = error;
|
| - } else {
|
| - ConsumeDataBatch(std::move(data_batch));
|
| - }
|
| -
|
| - is_initial_pending_data_loaded_ = true;
|
| - ConnectIfReady();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::OnDataLoadedForReEncryption(
|
| - syncer::SyncError error,
|
| - std::unique_ptr<DataBatch> data_batch) {
|
| - DCHECK(is_initial_pending_data_loaded_);
|
| -
|
| - if (error.IsSet()) {
|
| - error_handler_->OnSingleDataTypeUnrecoverableError(error);
|
| - return;
|
| - }
|
| -
|
| - ConsumeDataBatch(std::move(data_batch));
|
| - FlushPendingCommitRequests();
|
| -}
|
| -
|
| -void SharedModelTypeProcessor::ConsumeDataBatch(
|
| - std::unique_ptr<DataBatch> data_batch) {
|
| - while (data_batch->HasNext()) {
|
| - TagAndData data = data_batch->Next();
|
| - ProcessorEntityTracker* entity = GetEntityForTag(data.first);
|
| - // If the entity wasn't deleted or updated with new commit.
|
| - if (entity != nullptr && entity->RequiresCommitData()) {
|
| - entity->CacheCommitData(data.second.get());
|
| - }
|
| - }
|
| -}
|
| -
|
| -std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) {
|
| - return syncer::syncable::GenerateSyncableHash(type_, tag);
|
| -}
|
| -
|
| -ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag(
|
| - const std::string& tag) {
|
| - return GetEntityForTagHash(GetHashForTag(tag));
|
| -}
|
| -
|
| -ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash(
|
| - const std::string& tag_hash) {
|
| - auto it = entities_.find(tag_hash);
|
| - return it != entities_.end() ? it->second.get() : nullptr;
|
| -}
|
| -
|
| -ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
|
| - const std::string& tag,
|
| - const EntityData& data) {
|
| - DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
|
| - std::unique_ptr<ProcessorEntityTracker> entity =
|
| - ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id,
|
| - data.creation_time);
|
| - ProcessorEntityTracker* entity_ptr = entity.get();
|
| - entities_[data.client_tag_hash] = std::move(entity);
|
| - return entity_ptr;
|
| -}
|
| -
|
| -ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
|
| - const EntityData& data) {
|
| - // Let the service define |client_tag| based on the entity data.
|
| - const std::string tag = service_->GetClientTag(data);
|
| - // This constraint may be relaxed in the future.
|
| - DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag));
|
| - return CreateEntity(tag, data);
|
| -}
|
| -
|
| -} // namespace syncer_v2
|
|
|