Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(279)

Side by Side Diff: sync/engine/model_type_worker.cc

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sync/engine/model_type_worker.h ('k') | sync/engine/model_type_worker_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/model_type_worker.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <utility>
11 #include <vector>
12
13 #include "base/bind.h"
14 #include "base/format_macros.h"
15 #include "base/guid.h"
16 #include "base/logging.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/strings/stringprintf.h"
19 #include "sync/engine/commit_contribution.h"
20 #include "sync/engine/non_blocking_type_commit_contribution.h"
21 #include "sync/engine/worker_entity_tracker.h"
22 #include "sync/internal_api/public/model_type_processor.h"
23 #include "sync/syncable/syncable_util.h"
24 #include "sync/util/cryptographer.h"
25 #include "sync/util/time.h"
26
27 namespace syncer_v2 {
28
29 using syncer::CommitContribution;
30 using syncer::Cryptographer;
31 using syncer::ModelType;
32 using syncer::NudgeHandler;
33 using syncer::SyncerError;
34
35 ModelTypeWorker::ModelTypeWorker(
36 ModelType type,
37 const sync_pb::DataTypeState& initial_state,
38 std::unique_ptr<Cryptographer> cryptographer,
39 NudgeHandler* nudge_handler,
40 std::unique_ptr<ModelTypeProcessor> model_type_processor)
41 : type_(type),
42 data_type_state_(initial_state),
43 model_type_processor_(std::move(model_type_processor)),
44 cryptographer_(std::move(cryptographer)),
45 nudge_handler_(nudge_handler),
46 weak_ptr_factory_(this) {
47 DCHECK(model_type_processor_);
48
49 // Request an initial sync if it hasn't been completed yet.
50 if (!data_type_state_.initial_sync_done()) {
51 nudge_handler_->NudgeForInitialDownload(type_);
52 }
53
54 if (cryptographer_) {
55 DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key "
56 << cryptographer_->GetDefaultNigoriKeyName();
57 OnCryptographerUpdated();
58 }
59 }
60
61 ModelTypeWorker::~ModelTypeWorker() {
62 model_type_processor_->DisconnectSync();
63 }
64
65 ModelType ModelTypeWorker::GetModelType() const {
66 DCHECK(CalledOnValidThread());
67 return type_;
68 }
69
70 bool ModelTypeWorker::IsEncryptionRequired() const {
71 return !!cryptographer_;
72 }
73
74 void ModelTypeWorker::UpdateCryptographer(
75 std::unique_ptr<Cryptographer> cryptographer) {
76 DCHECK(cryptographer);
77 cryptographer_ = std::move(cryptographer);
78
79 // Update our state and that of the proxy.
80 OnCryptographerUpdated();
81
82 // Nudge the scheduler if we're now allowed to commit.
83 if (CanCommitItems())
84 nudge_handler_->NudgeForCommit(type_);
85 }
86
87 // UpdateHandler implementation.
88 bool ModelTypeWorker::IsInitialSyncEnded() const {
89 return data_type_state_.initial_sync_done();
90 }
91
92 void ModelTypeWorker::GetDownloadProgress(
93 sync_pb::DataTypeProgressMarker* progress_marker) const {
94 DCHECK(CalledOnValidThread());
95 progress_marker->CopyFrom(data_type_state_.progress_marker());
96 }
97
98 void ModelTypeWorker::GetDataTypeContext(
99 sync_pb::DataTypeContext* context) const {
100 DCHECK(CalledOnValidThread());
101 context->CopyFrom(data_type_state_.type_context());
102 }
103
104 SyncerError ModelTypeWorker::ProcessGetUpdatesResponse(
105 const sync_pb::DataTypeProgressMarker& progress_marker,
106 const sync_pb::DataTypeContext& mutated_context,
107 const SyncEntityList& applicable_updates,
108 syncer::sessions::StatusController* status) {
109 DCHECK(CalledOnValidThread());
110
111 // TODO(rlarocque): Handle data type context conflicts.
112 *data_type_state_.mutable_type_context() = mutated_context;
113 *data_type_state_.mutable_progress_marker() = progress_marker;
114
115 for (const sync_pb::SyncEntity* update_entity : applicable_updates) {
116 // Skip updates for permanent folders.
117 // TODO(stanisc): crbug.com/516866: might need to handle this for
118 // hierarchical datatypes.
119 if (!update_entity->server_defined_unique_tag().empty())
120 continue;
121
122 // Normal updates are handled here.
123 const std::string& client_tag_hash =
124 update_entity->client_defined_unique_tag();
125
126 // TODO(stanisc): crbug.com/516866: this wouldn't be true for bookmarks.
127 DCHECK(!client_tag_hash.empty());
128
129 // Prepare the message for the model thread.
130 EntityData data;
131 data.id = update_entity->id_string();
132 data.client_tag_hash = client_tag_hash;
133 data.creation_time = syncer::ProtoTimeToTime(update_entity->ctime());
134 data.modification_time = syncer::ProtoTimeToTime(update_entity->mtime());
135 data.non_unique_name = update_entity->name();
136
137 UpdateResponseData response_data;
138 response_data.response_version = update_entity->version();
139
140 WorkerEntityTracker* entity = GetOrCreateEntityTracker(data);
141
142 // Check if specifics are encrypted and try to decrypt if so.
143 const sync_pb::EntitySpecifics& specifics = update_entity->specifics();
144 if (!specifics.has_encrypted()) {
145 // No encryption.
146 entity->ReceiveUpdate(update_entity->version());
147 data.specifics = specifics;
148 response_data.entity = data.PassToPtr();
149 pending_updates_.push_back(response_data);
150 } else if (specifics.has_encrypted() && cryptographer_ &&
151 cryptographer_->CanDecrypt(specifics.encrypted())) {
152 // Encrypted, but we know the key.
153 if (DecryptSpecifics(cryptographer_.get(), specifics, &data.specifics)) {
154 entity->ReceiveUpdate(update_entity->version());
155 response_data.entity = data.PassToPtr();
156 response_data.encryption_key_name = specifics.encrypted().key_name();
157 pending_updates_.push_back(response_data);
158 }
159 } else if (specifics.has_encrypted() &&
160 (!cryptographer_ ||
161 !cryptographer_->CanDecrypt(specifics.encrypted()))) {
162 // Can't decrypt right now. Ask the entity tracker to handle it.
163 data.specifics = specifics;
164 response_data.entity = data.PassToPtr();
165 entity->ReceiveEncryptedUpdate(response_data);
166 }
167 }
168
169 return syncer::SYNCER_OK;
170 }
171
172 void ModelTypeWorker::ApplyUpdates(syncer::sessions::StatusController* status) {
173 DCHECK(CalledOnValidThread());
174 // This should only ever be called after one PassiveApplyUpdates.
175 DCHECK(data_type_state_.initial_sync_done());
176 // Download cycle is done, pass all updates to the processor.
177 ApplyPendingUpdates();
178 }
179
180 void ModelTypeWorker::PassiveApplyUpdates(
181 syncer::sessions::StatusController* status) {
182 // This should only be called at the end of the very first download cycle.
183 DCHECK(!data_type_state_.initial_sync_done());
184 // Indicate to the processor that the initial download is done. The initial
185 // sync technically isn't done yet but by the time this value is persisted to
186 // disk on the model thread it will be.
187 data_type_state_.set_initial_sync_done(true);
188 ApplyPendingUpdates();
189 }
190
191 void ModelTypeWorker::ApplyPendingUpdates() {
192 DVLOG(1) << ModelTypeToString(type_) << ": "
193 << base::StringPrintf("Delivering %" PRIuS " applicable updates.",
194 pending_updates_.size());
195 model_type_processor_->OnUpdateReceived(data_type_state_, pending_updates_);
196 pending_updates_.clear();
197 }
198
199 void ModelTypeWorker::EnqueueForCommit(const CommitRequestDataList& list) {
200 DCHECK(CalledOnValidThread());
201
202 DCHECK(IsTypeInitialized())
203 << "Asked to commit items before type was initialized. "
204 << "ModelType is: " << ModelTypeToString(type_);
205
206 for (const CommitRequestData& commit : list) {
207 const EntityData& data = commit.entity.value();
208 if (!data.is_deleted()) {
209 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data.specifics));
210 }
211 GetOrCreateEntityTracker(data)->RequestCommit(commit);
212 }
213
214 if (CanCommitItems())
215 nudge_handler_->NudgeForCommit(type_);
216 }
217
218 // CommitContributor implementation.
219 std::unique_ptr<CommitContribution> ModelTypeWorker::GetContribution(
220 size_t max_entries) {
221 DCHECK(CalledOnValidThread());
222 // There shouldn't be a GetUpdates in progress when a commit is triggered.
223 DCHECK(pending_updates_.empty());
224
225 size_t space_remaining = max_entries;
226 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
227
228 if (!CanCommitItems())
229 return std::unique_ptr<CommitContribution>();
230
231 // TODO(rlarocque): Avoid iterating here.
232 for (EntityMap::const_iterator it = entities_.begin();
233 it != entities_.end() && space_remaining > 0; ++it) {
234 WorkerEntityTracker* entity = it->second.get();
235 if (entity->HasPendingCommit()) {
236 sync_pb::SyncEntity* commit_entity = commit_entities.Add();
237 entity->PopulateCommitProto(commit_entity);
238 AdjustCommitProto(commit_entity);
239 space_remaining--;
240 }
241 }
242
243 if (commit_entities.size() == 0)
244 return std::unique_ptr<CommitContribution>();
245
246 return std::unique_ptr<CommitContribution>(
247 new NonBlockingTypeCommitContribution(data_type_state_.type_context(),
248 commit_entities, this));
249 }
250
251 void ModelTypeWorker::OnCommitResponse(CommitResponseDataList* response_list) {
252 for (CommitResponseData& response : *response_list) {
253 WorkerEntityTracker* entity = GetEntityTracker(response.client_tag_hash);
254
255 // There's no way we could have committed an entry we know nothing about.
256 if (entity == nullptr) {
257 NOTREACHED() << "Received commit response for item unknown to us."
258 << " Model type: " << ModelTypeToString(type_)
259 << " ID: " << response.id;
260 continue;
261 }
262
263 entity->ReceiveCommitResponse(&response);
264 }
265
266 // Send the responses back to the model thread. It needs to know which
267 // items have been successfully committed so it can save that information in
268 // permanent storage.
269 model_type_processor_->OnCommitCompleted(data_type_state_, *response_list);
270 }
271
272 base::WeakPtr<ModelTypeWorker> ModelTypeWorker::AsWeakPtr() {
273 return weak_ptr_factory_.GetWeakPtr();
274 }
275
276 bool ModelTypeWorker::IsTypeInitialized() const {
277 return data_type_state_.initial_sync_done() &&
278 !data_type_state_.progress_marker().token().empty();
279 }
280
281 bool ModelTypeWorker::CanCommitItems() const {
282 // We can't commit anything until we know the type's parent node.
283 // We'll get it in the first update response.
284 if (!IsTypeInitialized())
285 return false;
286
287 // Don't commit if we should be encrypting but don't have the required keys.
288 if (IsEncryptionRequired() &&
289 (!cryptographer_ || !cryptographer_->is_ready())) {
290 return false;
291 }
292
293 return true;
294 }
295
296 void ModelTypeWorker::AdjustCommitProto(sync_pb::SyncEntity* sync_entity) {
297 DCHECK(CanCommitItems());
298
299 // Initial commits need our help to generate a client ID.
300 if (sync_entity->version() == kUncommittedVersion) {
301 DCHECK(!sync_entity->has_id_string());
302 // TODO(stanisc): This is incorrect for bookmarks for two reasons:
303 // 1) Won't be able to match previously committed bookmarks to the ones
304 // with server ID.
305 // 2) Recommitting an item in a case of failing to receive commit response
306 // would result in generating a different client ID, which in turn
307 // would result in a duplication.
308 // We should generate client ID on the frontend side instead.
309 sync_entity->set_id_string(base::GenerateGUID());
310 sync_entity->set_version(0);
311 } else {
312 DCHECK(sync_entity->has_id_string());
313 }
314
315 // Encrypt the specifics and hide the title if necessary.
316 if (IsEncryptionRequired()) {
317 // IsEncryptionRequired() && CanCommitItems() implies
318 // that the cryptographer is valid and ready to encrypt.
319 sync_pb::EntitySpecifics encrypted_specifics;
320 bool result = cryptographer_->Encrypt(
321 sync_entity->specifics(), encrypted_specifics.mutable_encrypted());
322 DCHECK(result);
323 sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics);
324 sync_entity->set_name("encrypted");
325 }
326
327 // Always include enough specifics to identify the type. Do this even in
328 // deletion requests, where the specifics are otherwise invalid.
329 AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
330
331 // TODO(stanisc): crbug.com/516866:
332 // Call sync_entity->set_parent_id_string(...) for hierarchical entities here.
333 }
334
335 void ModelTypeWorker::OnCryptographerUpdated() {
336 DCHECK(cryptographer_);
337
338 bool new_encryption_key = false;
339 UpdateResponseDataList response_datas;
340
341 const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName();
342
343 // Handle a change in encryption key.
344 if (data_type_state_.encryption_key_name() != new_key_name) {
345 DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key "
346 << data_type_state_.encryption_key_name() << " -> "
347 << new_key_name;
348 data_type_state_.set_encryption_key_name(new_key_name);
349 new_encryption_key = true;
350 }
351
352 for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end();
353 ++it) {
354 if (it->second->HasEncryptedUpdate()) {
355 const UpdateResponseData& encrypted_update =
356 it->second->GetEncryptedUpdate();
357 const EntityData& data = encrypted_update.entity.value();
358
359 // We assume all pending updates are encrypted items for which we
360 // don't have the key.
361 DCHECK(data.specifics.has_encrypted());
362
363 if (cryptographer_->CanDecrypt(data.specifics.encrypted())) {
364 EntityData decrypted_data;
365 if (DecryptSpecifics(cryptographer_.get(), data.specifics,
366 &decrypted_data.specifics)) {
367 // Copy other fields one by one since EntityData doesn't allow
368 // copying.
369 // TODO(stanisc): this code is likely to be removed once we get
370 // rid of pending updates.
371 decrypted_data.id = data.id;
372 decrypted_data.client_tag_hash = data.client_tag_hash;
373 decrypted_data.non_unique_name = data.non_unique_name;
374 decrypted_data.creation_time = data.creation_time;
375 decrypted_data.modification_time = data.modification_time;
376
377 UpdateResponseData decrypted_update;
378 decrypted_update.entity = decrypted_data.PassToPtr();
379 decrypted_update.response_version = encrypted_update.response_version;
380 decrypted_update.encryption_key_name =
381 data.specifics.encrypted().key_name();
382 response_datas.push_back(decrypted_update);
383
384 it->second->ClearEncryptedUpdate();
385 }
386 }
387 }
388 }
389
390 if (new_encryption_key || response_datas.size() > 0) {
391 DVLOG(1) << ModelTypeToString(type_) << ": "
392 << base::StringPrintf("Delivering encryption key and %" PRIuS
393 " decrypted updates.",
394 response_datas.size());
395 model_type_processor_->OnUpdateReceived(data_type_state_, response_datas);
396 }
397 }
398
399 bool ModelTypeWorker::DecryptSpecifics(Cryptographer* cryptographer,
400 const sync_pb::EntitySpecifics& in,
401 sync_pb::EntitySpecifics* out) {
402 DCHECK(in.has_encrypted());
403 DCHECK(cryptographer->CanDecrypt(in.encrypted()));
404
405 std::string plaintext;
406 plaintext = cryptographer->DecryptToString(in.encrypted());
407 if (plaintext.empty()) {
408 LOG(ERROR) << "Failed to decrypt a decryptable entity";
409 return false;
410 }
411 if (!out->ParseFromString(plaintext)) {
412 LOG(ERROR) << "Failed to parse decrypted entity";
413 return false;
414 }
415 return true;
416 }
417
418 WorkerEntityTracker* ModelTypeWorker::GetEntityTracker(
419 const std::string& tag_hash) {
420 auto it = entities_.find(tag_hash);
421 return it != entities_.end() ? it->second.get() : nullptr;
422 }
423
424 WorkerEntityTracker* ModelTypeWorker::CreateEntityTracker(
425 const EntityData& data) {
426 DCHECK(entities_.find(data.client_tag_hash) == entities_.end());
427 std::unique_ptr<WorkerEntityTracker> entity =
428 base::WrapUnique(new WorkerEntityTracker(data.id, data.client_tag_hash));
429 WorkerEntityTracker* entity_ptr = entity.get();
430 entities_[data.client_tag_hash] = std::move(entity);
431 return entity_ptr;
432 }
433
434 WorkerEntityTracker* ModelTypeWorker::GetOrCreateEntityTracker(
435 const EntityData& data) {
436 WorkerEntityTracker* entity = GetEntityTracker(data.client_tag_hash);
437 return entity ? entity : CreateEntityTracker(data);
438 }
439
440 } // namespace syncer_v2
OLDNEW
« no previous file with comments | « sync/engine/model_type_worker.h ('k') | sync/engine/model_type_worker_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698