Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: sync/internal_api/shared_model_type_processor.cc

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/public/shared_model_type_processor.h"
6
7 #include <utility>
8 #include <vector>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/metrics/histogram.h"
14 #include "base/threading/thread_task_runner_handle.h"
15 #include "sync/engine/commit_queue.h"
16 #include "sync/internal_api/public/activation_context.h"
17 #include "sync/internal_api/public/processor_entity_tracker.h"
18 #include "sync/syncable/syncable_util.h"
19
20 namespace syncer_v2 {
21
22 namespace {
23
24 class ModelTypeProcessorProxy : public ModelTypeProcessor {
25 public:
26 ModelTypeProcessorProxy(
27 const base::WeakPtr<ModelTypeProcessor>& processor,
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29 ~ModelTypeProcessorProxy() override;
30
31 void ConnectSync(std::unique_ptr<CommitQueue> worker) override;
32 void DisconnectSync() override;
33 void OnCommitCompleted(const sync_pb::DataTypeState& type_state,
34 const CommitResponseDataList& response_list) override;
35 void OnUpdateReceived(const sync_pb::DataTypeState& type_state,
36 const UpdateResponseDataList& updates) override;
37
38 private:
39 base::WeakPtr<ModelTypeProcessor> processor_;
40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
41 };
42
43 ModelTypeProcessorProxy::ModelTypeProcessorProxy(
44 const base::WeakPtr<ModelTypeProcessor>& processor,
45 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
46 : processor_(processor), processor_task_runner_(processor_task_runner) {}
47
48 ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {}
49
50 void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) {
51 processor_task_runner_->PostTask(
52 FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_,
53 base::Passed(std::move(worker))));
54 }
55
56 void ModelTypeProcessorProxy::DisconnectSync() {
57 processor_task_runner_->PostTask(
58 FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_));
59 }
60
61 void ModelTypeProcessorProxy::OnCommitCompleted(
62 const sync_pb::DataTypeState& type_state,
63 const CommitResponseDataList& response_list) {
64 processor_task_runner_->PostTask(
65 FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_,
66 type_state, response_list));
67 }
68
69 void ModelTypeProcessorProxy::OnUpdateReceived(
70 const sync_pb::DataTypeState& type_state,
71 const UpdateResponseDataList& updates) {
72 processor_task_runner_->PostTask(
73 FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_,
74 type_state, updates));
75 }
76
77 } // namespace
78
79 SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type,
80 ModelTypeService* service)
81 : type_(type),
82 is_metadata_loaded_(false),
83 is_initial_pending_data_loaded_(false),
84 service_(service),
85 error_handler_(nullptr),
86 weak_ptr_factory_(this) {
87 DCHECK(service);
88 }
89
90 SharedModelTypeProcessor::~SharedModelTypeProcessor() {}
91
92 // static
93 std::unique_ptr<ModelTypeChangeProcessor>
94 SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type,
95 ModelTypeService* service) {
96 return std::unique_ptr<ModelTypeChangeProcessor>(
97 new SharedModelTypeProcessor(type, service));
98 }
99
100 void SharedModelTypeProcessor::OnSyncStarting(
101 syncer::DataTypeErrorHandler* error_handler,
102 const StartCallback& start_callback) {
103 DCHECK(CalledOnValidThread());
104 DCHECK(start_callback_.is_null());
105 DCHECK(!IsConnected());
106 DCHECK(error_handler);
107 DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_);
108
109 error_handler_ = error_handler;
110 start_callback_ = start_callback;
111 ConnectIfReady();
112 }
113
114 void SharedModelTypeProcessor::OnMetadataLoaded(
115 syncer::SyncError error,
116 std::unique_ptr<MetadataBatch> batch) {
117 DCHECK(CalledOnValidThread());
118 DCHECK(entities_.empty());
119 DCHECK(!is_metadata_loaded_);
120 DCHECK(!IsConnected());
121
122 is_metadata_loaded_ = true;
123 // Flip this flag here to cover all cases where we don't need to load data.
124 is_initial_pending_data_loaded_ = true;
125
126 if (error.IsSet()) {
127 start_error_ = error;
128 ConnectIfReady();
129 return;
130 }
131
132 if (batch->GetDataTypeState().initial_sync_done()) {
133 EntityMetadataMap metadata_map(batch->TakeAllMetadata());
134 std::vector<std::string> entities_to_commit;
135
136 for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) {
137 std::unique_ptr<ProcessorEntityTracker> entity =
138 ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second);
139 if (entity->RequiresCommitData()) {
140 entities_to_commit.push_back(entity->client_tag());
141 }
142 entities_[entity->metadata().client_tag_hash()] = std::move(entity);
143 }
144 data_type_state_ = batch->GetDataTypeState();
145 if (!entities_to_commit.empty()) {
146 is_initial_pending_data_loaded_ = false;
147 service_->GetData(
148 entities_to_commit,
149 base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded,
150 weak_ptr_factory_.GetWeakPtr()));
151 }
152 } else {
153 // First time syncing; initialize metadata.
154 data_type_state_.mutable_progress_marker()->set_data_type_id(
155 GetSpecificsFieldNumberFromModelType(type_));
156 }
157
158 ConnectIfReady();
159 }
160
161 void SharedModelTypeProcessor::ConnectIfReady() {
162 DCHECK(CalledOnValidThread());
163 if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ ||
164 start_callback_.is_null()) {
165 return;
166 }
167
168 std::unique_ptr<ActivationContext> activation_context;
169
170 if (!start_error_.IsSet()) {
171 activation_context = base::WrapUnique(new ActivationContext);
172 activation_context->data_type_state = data_type_state_;
173 activation_context->type_processor = base::WrapUnique(
174 new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(),
175 base::ThreadTaskRunnerHandle::Get()));
176 }
177
178 start_callback_.Run(start_error_, std::move(activation_context));
179 start_callback_.Reset();
180 }
181
182 bool SharedModelTypeProcessor::IsAllowingChanges() const {
183 return is_metadata_loaded_;
184 }
185
186 bool SharedModelTypeProcessor::IsConnected() const {
187 DCHECK(CalledOnValidThread());
188 return !!worker_;
189 }
190
191 void SharedModelTypeProcessor::DisableSync() {
192 DCHECK(CalledOnValidThread());
193 std::unique_ptr<MetadataChangeList> change_list =
194 service_->CreateMetadataChangeList();
195 for (auto it = entities_.begin(); it != entities_.end(); ++it) {
196 change_list->ClearMetadata(it->second->client_tag());
197 }
198 change_list->ClearDataTypeState();
199 // Nothing to do if this fails, so just ignore the error it might return.
200 service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
201 }
202
203 syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError(
204 const tracked_objects::Location& location,
205 const std::string& message) {
206 if (error_handler_) {
207 return error_handler_->CreateAndUploadError(location, message, type_);
208 } else {
209 return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR,
210 message, type_);
211 }
212 }
213
214 void SharedModelTypeProcessor::ConnectSync(
215 std::unique_ptr<CommitQueue> worker) {
216 DCHECK(CalledOnValidThread());
217 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
218
219 worker_ = std::move(worker);
220
221 FlushPendingCommitRequests();
222 }
223
224 void SharedModelTypeProcessor::DisconnectSync() {
225 DCHECK(CalledOnValidThread());
226 DCHECK(IsConnected());
227
228 DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_);
229 weak_ptr_factory_.InvalidateWeakPtrs();
230 worker_.reset();
231
232 for (auto it = entities_.begin(); it != entities_.end(); ++it) {
233 it->second->ClearTransientSyncState();
234 }
235 }
236
237 void SharedModelTypeProcessor::Put(const std::string& tag,
238 std::unique_ptr<EntityData> data,
239 MetadataChangeList* metadata_change_list) {
240 DCHECK(IsAllowingChanges());
241 DCHECK(data.get());
242 DCHECK(!data->is_deleted());
243 DCHECK(!data->non_unique_name.empty());
244 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics));
245
246 if (!data_type_state_.initial_sync_done()) {
247 // Ignore changes before the initial sync is done.
248 return;
249 }
250
251 // Fill in some data.
252 data->client_tag_hash = GetHashForTag(tag);
253 if (data->modification_time.is_null()) {
254 data->modification_time = base::Time::Now();
255 }
256
257 ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash);
258
259 if (entity == nullptr) {
260 // The service is creating a new entity.
261 if (data->creation_time.is_null()) {
262 data->creation_time = data->modification_time;
263 }
264 entity = CreateEntity(tag, *data);
265 } else if (entity->MatchesData(*data)) {
266 // Ignore changes that don't actually change anything.
267 return;
268 }
269
270 entity->MakeLocalChange(std::move(data));
271 metadata_change_list->UpdateMetadata(tag, entity->metadata());
272
273 FlushPendingCommitRequests();
274 }
275
276 void SharedModelTypeProcessor::Delete(
277 const std::string& tag,
278 MetadataChangeList* metadata_change_list) {
279 DCHECK(IsAllowingChanges());
280
281 if (!data_type_state_.initial_sync_done()) {
282 // Ignore changes before the initial sync is done.
283 return;
284 }
285
286 ProcessorEntityTracker* entity = GetEntityForTag(tag);
287 if (entity == nullptr) {
288 // That's unusual, but not necessarily a bad thing.
289 // Missing is as good as deleted as far as the model is concerned.
290 DLOG(WARNING) << "Attempted to delete missing item."
291 << " client tag: " << tag;
292 return;
293 }
294
295 entity->Delete();
296
297 metadata_change_list->UpdateMetadata(tag, entity->metadata());
298 FlushPendingCommitRequests();
299 }
300
301 void SharedModelTypeProcessor::FlushPendingCommitRequests() {
302 CommitRequestDataList commit_requests;
303
304 // Don't bother sending anything if there's no one to send to.
305 if (!IsConnected())
306 return;
307
308 // Don't send anything if the type is not ready to handle commits.
309 if (!data_type_state_.initial_sync_done())
310 return;
311
312 // TODO(rlarocque): Do something smarter than iterate here.
313 for (auto it = entities_.begin(); it != entities_.end(); ++it) {
314 ProcessorEntityTracker* entity = it->second.get();
315 if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) {
316 CommitRequestData request;
317 entity->InitializeCommitRequestData(&request);
318 commit_requests.push_back(request);
319 }
320 }
321
322 if (!commit_requests.empty())
323 worker_->EnqueueForCommit(commit_requests);
324 }
325
326 void SharedModelTypeProcessor::OnCommitCompleted(
327 const sync_pb::DataTypeState& type_state,
328 const CommitResponseDataList& response_list) {
329 std::unique_ptr<MetadataChangeList> change_list =
330 service_->CreateMetadataChangeList();
331
332 data_type_state_ = type_state;
333 change_list->UpdateDataTypeState(data_type_state_);
334
335 for (const CommitResponseData& data : response_list) {
336 ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash);
337 if (entity == nullptr) {
338 NOTREACHED() << "Received commit response for missing item."
339 << " type: " << type_
340 << " client_tag_hash: " << data.client_tag_hash;
341 continue;
342 }
343
344 entity->ReceiveCommitResponse(data);
345
346 if (entity->CanClearMetadata()) {
347 change_list->ClearMetadata(entity->client_tag());
348 entities_.erase(entity->metadata().client_tag_hash());
349 } else {
350 change_list->UpdateMetadata(entity->client_tag(), entity->metadata());
351 }
352 }
353
354 syncer::SyncError error =
355 service_->ApplySyncChanges(std::move(change_list), EntityChangeList());
356 if (error.IsSet()) {
357 error_handler_->OnSingleDataTypeUnrecoverableError(error);
358 }
359 }
360
361 void SharedModelTypeProcessor::OnUpdateReceived(
362 const sync_pb::DataTypeState& data_type_state,
363 const UpdateResponseDataList& updates) {
364 if (!data_type_state_.initial_sync_done()) {
365 OnInitialUpdateReceived(data_type_state, updates);
366 return;
367 }
368
369 std::unique_ptr<MetadataChangeList> metadata_changes =
370 service_->CreateMetadataChangeList();
371 EntityChangeList entity_changes;
372
373 metadata_changes->UpdateDataTypeState(data_type_state);
374 bool got_new_encryption_requirements =
375 data_type_state_.encryption_key_name() !=
376 data_type_state.encryption_key_name();
377 data_type_state_ = data_type_state;
378
379 // If new encryption requirements come from the server, the entities that are
380 // in |updates| will be recorded here so they can be ignored during the
381 // re-encryption phase at the end.
382 std::unordered_set<std::string> already_updated;
383
384 for (const UpdateResponseData& update : updates) {
385 ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes);
386
387 if (!entity) {
388 // The update should be ignored.
389 continue;
390 }
391
392 if (entity->CanClearMetadata()) {
393 metadata_changes->ClearMetadata(entity->client_tag());
394 entities_.erase(entity->metadata().client_tag_hash());
395 } else {
396 metadata_changes->UpdateMetadata(entity->client_tag(),
397 entity->metadata());
398 }
399
400 if (got_new_encryption_requirements) {
401 already_updated.insert(entity->client_tag());
402 }
403 }
404
405 if (got_new_encryption_requirements) {
406 RecommitAllForEncryption(already_updated, metadata_changes.get());
407 }
408
409 // Inform the service of the new or updated data.
410 syncer::SyncError error =
411 service_->ApplySyncChanges(std::move(metadata_changes), entity_changes);
412
413 if (error.IsSet()) {
414 error_handler_->OnSingleDataTypeUnrecoverableError(error);
415 } else {
416 // There may be new reasons to commit by the time this function is done.
417 FlushPendingCommitRequests();
418 }
419 }
420
421 ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate(
422 const UpdateResponseData& update,
423 EntityChangeList* entity_changes) {
424 const EntityData& data = update.entity.value();
425 const std::string& client_tag_hash = data.client_tag_hash;
426 ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash);
427 if (entity == nullptr) {
428 if (data.is_deleted()) {
429 DLOG(WARNING) << "Received remote delete for a non-existing item."
430 << " client_tag_hash: " << client_tag_hash;
431 return nullptr;
432 }
433
434 entity = CreateEntity(data);
435 entity_changes->push_back(
436 EntityChange::CreateAdd(entity->client_tag(), update.entity));
437 entity->RecordAcceptedUpdate(update);
438 } else if (entity->UpdateIsReflection(update.response_version)) {
439 // Seen this update before; just ignore it.
440 return nullptr;
441 } else if (entity->IsUnsynced()) {
442 ConflictResolution::Type resolution_type =
443 ResolveConflict(update, entity, entity_changes);
444 UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type,
445 ConflictResolution::TYPE_SIZE);
446 } else if (data.is_deleted()) {
447 // The entity was deleted; inform the service. Note that the local data
448 // can never be deleted at this point because it would have either been
449 // acked (the add case) or pending (the conflict case).
450 DCHECK(!entity->metadata().is_deleted());
451 entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag()));
452 entity->RecordAcceptedUpdate(update);
453 } else if (!entity->MatchesData(data)) {
454 // Specifics have changed, so update the service.
455 entity_changes->push_back(
456 EntityChange::CreateUpdate(entity->client_tag(), update.entity));
457 entity->RecordAcceptedUpdate(update);
458 } else {
459 // No data change; still record that the update was received.
460 entity->RecordAcceptedUpdate(update);
461 }
462
463 // If the received entity has out of date encryption, we schedule another
464 // commit to fix it.
465 if (data_type_state_.encryption_key_name() != update.encryption_key_name) {
466 DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
467 << update.encryption_key_name << " -> "
468 << data_type_state_.encryption_key_name();
469
470 entity->IncrementSequenceNumber();
471 if (entity->RequiresCommitData()) {
472 // If there is no pending commit data, then either this update wasn't
473 // in conflict or the remote data won; either way the remote data is
474 // the right data to re-queue for commit.
475 entity->CacheCommitData(update.entity);
476 }
477 }
478
479 return entity;
480 }
481
482 ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict(
483 const UpdateResponseData& update,
484 ProcessorEntityTracker* entity,
485 EntityChangeList* changes) {
486 const EntityData& remote_data = update.entity.value();
487
488 ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE;
489 std::unique_ptr<EntityData> new_data;
490
491 // Determine the type of resolution.
492 if (entity->MatchesData(remote_data)) {
493 // The changes are identical so there isn't a real conflict.
494 resolution_type = ConflictResolution::CHANGES_MATCH;
495 } else if (entity->RequiresCommitData() ||
496 entity->MatchesBaseData(entity->commit_data().value())) {
497 // If commit data needs to be loaded at this point, it can only be due to a
498 // re-encryption request. If the commit data matches the base data, it also
499 // must be a re-encryption request. Either way there's no real local change
500 // and the remote data should win.
501 resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION;
502 } else if (entity->MatchesBaseData(remote_data)) {
503 // The remote data isn't actually changing from the last remote data that
504 // was seen, so it must have been a re-encryption and can be ignored.
505 resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION;
506 } else {
507 // There's a real data conflict here; let the service resolve it.
508 ConflictResolution resolution =
509 service_->ResolveConflict(entity->commit_data().value(), remote_data);
510 resolution_type = resolution.type();
511 new_data = resolution.ExtractData();
512 }
513
514 // Apply the resolution.
515 switch (resolution_type) {
516 case ConflictResolution::CHANGES_MATCH:
517 // Record the update and squash the pending commit.
518 entity->RecordForcedUpdate(update);
519 break;
520 case ConflictResolution::USE_LOCAL:
521 case ConflictResolution::IGNORE_REMOTE_ENCRYPTION:
522 // Record that we received the update from the server but leave the
523 // pending commit intact.
524 entity->RecordIgnoredUpdate(update);
525 break;
526 case ConflictResolution::USE_REMOTE:
527 case ConflictResolution::IGNORE_LOCAL_ENCRYPTION:
528 // Squash the pending commit.
529 entity->RecordForcedUpdate(update);
530 // Update client data to match server.
531 changes->push_back(
532 EntityChange::CreateUpdate(entity->client_tag(), update.entity));
533 break;
534 case ConflictResolution::USE_NEW:
535 // Record that we received the update.
536 entity->RecordIgnoredUpdate(update);
537 // Make a new pending commit to update the server.
538 entity->MakeLocalChange(std::move(new_data));
539 // Update the client with the new entity.
540 changes->push_back(EntityChange::CreateUpdate(entity->client_tag(),
541 entity->commit_data()));
542 break;
543 case ConflictResolution::TYPE_SIZE:
544 NOTREACHED();
545 break;
546 }
547 DCHECK(!new_data);
548
549 return resolution_type;
550 }
551
552 void SharedModelTypeProcessor::RecommitAllForEncryption(
553 std::unordered_set<std::string> already_updated,
554 MetadataChangeList* metadata_changes) {
555 ModelTypeService::ClientTagList entities_needing_data;
556
557 for (auto it = entities_.begin(); it != entities_.end(); ++it) {
558 ProcessorEntityTracker* entity = it->second.get();
559 if (already_updated.find(entity->client_tag()) != already_updated.end()) {
560 continue;
561 }
562 entity->IncrementSequenceNumber();
563 if (entity->RequiresCommitData()) {
564 entities_needing_data.push_back(entity->client_tag());
565 }
566 metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata());
567 }
568
569 if (!entities_needing_data.empty()) {
570 service_->GetData(
571 entities_needing_data,
572 base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption,
573 weak_ptr_factory_.GetWeakPtr()));
574 }
575 }
576
577 void SharedModelTypeProcessor::OnInitialUpdateReceived(
578 const sync_pb::DataTypeState& data_type_state,
579 const UpdateResponseDataList& updates) {
580 DCHECK(entities_.empty());
581 // Ensure that initial sync was not already done and that the worker
582 // correctly marked initial sync as done for this update.
583 DCHECK(!data_type_state_.initial_sync_done());
584 DCHECK(data_type_state.initial_sync_done());
585
586 std::unique_ptr<MetadataChangeList> metadata_changes =
587 service_->CreateMetadataChangeList();
588 EntityDataMap data_map;
589
590 data_type_state_ = data_type_state;
591 metadata_changes->UpdateDataTypeState(data_type_state_);
592
593 for (const UpdateResponseData& update : updates) {
594 ProcessorEntityTracker* entity = CreateEntity(update.entity.value());
595 const std::string& tag = entity->client_tag();
596 entity->RecordAcceptedUpdate(update);
597 metadata_changes->UpdateMetadata(tag, entity->metadata());
598 data_map[tag] = update.entity;
599 }
600
601 // Let the service handle associating and merging the data.
602 syncer::SyncError error =
603 service_->MergeSyncData(std::move(metadata_changes), data_map);
604
605 if (error.IsSet()) {
606 error_handler_->OnSingleDataTypeUnrecoverableError(error);
607 } else {
608 // We may have new reasons to commit by the time this function is done.
609 FlushPendingCommitRequests();
610 }
611 }
612
613 void SharedModelTypeProcessor::OnInitialPendingDataLoaded(
614 syncer::SyncError error,
615 std::unique_ptr<DataBatch> data_batch) {
616 DCHECK(!is_initial_pending_data_loaded_);
617
618 if (error.IsSet()) {
619 start_error_ = error;
620 } else {
621 ConsumeDataBatch(std::move(data_batch));
622 }
623
624 is_initial_pending_data_loaded_ = true;
625 ConnectIfReady();
626 }
627
628 void SharedModelTypeProcessor::OnDataLoadedForReEncryption(
629 syncer::SyncError error,
630 std::unique_ptr<DataBatch> data_batch) {
631 DCHECK(is_initial_pending_data_loaded_);
632
633 if (error.IsSet()) {
634 error_handler_->OnSingleDataTypeUnrecoverableError(error);
635 return;
636 }
637
638 ConsumeDataBatch(std::move(data_batch));
639 FlushPendingCommitRequests();
640 }
641
642 void SharedModelTypeProcessor::ConsumeDataBatch(
643 std::unique_ptr<DataBatch> data_batch) {
644 while (data_batch->HasNext()) {
645 TagAndData data = data_batch->Next();
646 ProcessorEntityTracker* entity = GetEntityForTag(data.first);
647 // If the entity wasn't deleted or updated with new commit.
648 if (entity != nullptr && entity->RequiresCommitData()) {
649 entity->CacheCommitData(data.second.get());
650 }
651 }
652 }
653
654 std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) {
655 return syncer::syncable::GenerateSyncableHash(type_, tag);
656 }
657
658 ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag(
659 const std::string& tag) {
660 return GetEntityForTagHash(GetHashForTag(tag));
661 }
662
663 ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash(
664 const std::string& tag_hash) {
665 auto it = entities_.find(tag_hash);
666 return it != entities_.end() ? it->second.get() : nullptr;
667 }
668
669 ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
670 const std::string& tag,
671 const EntityData& data) {
672 DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
673 std::unique_ptr<ProcessorEntityTracker> entity =
674 ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id,
675 data.creation_time);
676 ProcessorEntityTracker* entity_ptr = entity.get();
677 entities_[data.client_tag_hash] = std::move(entity);
678 return entity_ptr;
679 }
680
681 ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity(
682 const EntityData& data) {
683 // Let the service define |client_tag| based on the entity data.
684 const std::string tag = service_->GetClientTag(data);
685 // This constraint may be relaxed in the future.
686 DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag));
687 return CreateEntity(tag, data);
688 }
689
690 } // namespace syncer_v2
OLDNEW
« no previous file with comments | « sync/internal_api/read_transaction.cc ('k') | sync/internal_api/shared_model_type_processor_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698