OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/sessions/model_type_registry.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <utility> | |
10 | |
11 #include "base/bind.h" | |
12 #include "base/memory/ptr_util.h" | |
13 #include "base/observer_list.h" | |
14 #include "base/threading/thread_task_runner_handle.h" | |
15 #include "sync/engine/commit_queue.h" | |
16 #include "sync/engine/directory_commit_contributor.h" | |
17 #include "sync/engine/directory_update_handler.h" | |
18 #include "sync/engine/model_type_worker.h" | |
19 #include "sync/internal_api/public/activation_context.h" | |
20 #include "sync/internal_api/public/model_type_processor.h" | |
21 #include "sync/sessions/directory_type_debug_info_emitter.h" | |
22 #include "sync/util/cryptographer.h" | |
23 | |
24 namespace syncer { | |
25 | |
26 namespace { | |
27 | |
28 class CommitQueueProxy : public syncer_v2::CommitQueue { | |
29 public: | |
30 CommitQueueProxy(const base::WeakPtr<syncer_v2::ModelTypeWorker>& worker, | |
31 const scoped_refptr<base::SequencedTaskRunner>& sync_thread); | |
32 ~CommitQueueProxy() override; | |
33 | |
34 void EnqueueForCommit(const syncer_v2::CommitRequestDataList& list) override; | |
35 | |
36 private: | |
37 base::WeakPtr<syncer_v2::ModelTypeWorker> worker_; | |
38 scoped_refptr<base::SequencedTaskRunner> sync_thread_; | |
39 }; | |
40 | |
41 CommitQueueProxy::CommitQueueProxy( | |
42 const base::WeakPtr<syncer_v2::ModelTypeWorker>& worker, | |
43 const scoped_refptr<base::SequencedTaskRunner>& sync_thread) | |
44 : worker_(worker), sync_thread_(sync_thread) {} | |
45 | |
46 CommitQueueProxy::~CommitQueueProxy() {} | |
47 | |
48 void CommitQueueProxy::EnqueueForCommit( | |
49 const syncer_v2::CommitRequestDataList& list) { | |
50 sync_thread_->PostTask( | |
51 FROM_HERE, | |
52 base::Bind(&syncer_v2::ModelTypeWorker::EnqueueForCommit, worker_, list)); | |
53 } | |
54 | |
55 } // namespace | |
56 | |
57 ModelTypeRegistry::ModelTypeRegistry( | |
58 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, | |
59 syncable::Directory* directory, | |
60 NudgeHandler* nudge_handler) | |
61 : directory_(directory), | |
62 nudge_handler_(nudge_handler), | |
63 weak_ptr_factory_(this) { | |
64 for (size_t i = 0u; i < workers.size(); ++i) { | |
65 workers_map_.insert( | |
66 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i])); | |
67 } | |
68 } | |
69 | |
70 ModelTypeRegistry::~ModelTypeRegistry() { | |
71 } | |
72 | |
73 void ModelTypeRegistry::SetEnabledDirectoryTypes( | |
74 const ModelSafeRoutingInfo& routing_info) { | |
75 // Remove all existing directory processors and delete them. The | |
76 // DebugInfoEmitters are not deleted here, since we want to preserve their | |
77 // counters. | |
78 for (ModelTypeSet::Iterator it = enabled_directory_types_.First(); | |
79 it.Good(); it.Inc()) { | |
80 size_t result1 = update_handler_map_.erase(it.Get()); | |
81 size_t result2 = commit_contributor_map_.erase(it.Get()); | |
82 DCHECK_EQ(1U, result1); | |
83 DCHECK_EQ(1U, result2); | |
84 } | |
85 | |
86 // Clear the old instances of directory update handlers and commit | |
87 // contributors, deleting their contents in the processs. | |
88 directory_update_handlers_.clear(); | |
89 directory_commit_contributors_.clear(); | |
90 | |
91 enabled_directory_types_.Clear(); | |
92 | |
93 // Create new ones and add them to the appropriate containers. | |
94 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin(); | |
95 routing_iter != routing_info.end(); ++routing_iter) { | |
96 ModelType type = routing_iter->first; | |
97 ModelSafeGroup group = routing_iter->second; | |
98 if (group == GROUP_NON_BLOCKING) | |
99 continue; | |
100 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator | |
101 worker_it = workers_map_.find(group); | |
102 DCHECK(worker_it != workers_map_.end()); | |
103 scoped_refptr<ModelSafeWorker> worker = worker_it->second; | |
104 | |
105 // DebugInfoEmitters are never deleted. Use existing one if we have it. | |
106 DirectoryTypeDebugInfoEmitter* emitter = NULL; | |
107 DirectoryTypeDebugInfoEmitterMap::iterator it = | |
108 directory_type_debug_info_emitter_map_.find(type); | |
109 if (it != directory_type_debug_info_emitter_map_.end()) { | |
110 emitter = it->second; | |
111 } else { | |
112 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type, | |
113 &type_debug_info_observers_); | |
114 directory_type_debug_info_emitter_map_.insert( | |
115 std::make_pair(type, emitter)); | |
116 directory_type_debug_info_emitters_.push_back(emitter); | |
117 } | |
118 | |
119 DirectoryCommitContributor* committer = | |
120 new DirectoryCommitContributor(directory_, type, emitter); | |
121 DirectoryUpdateHandler* updater = | |
122 new DirectoryUpdateHandler(directory_, type, worker, emitter); | |
123 | |
124 // These containers take ownership of their contents. | |
125 directory_commit_contributors_.push_back(committer); | |
126 directory_update_handlers_.push_back(updater); | |
127 | |
128 bool inserted1 = | |
129 update_handler_map_.insert(std::make_pair(type, updater)).second; | |
130 DCHECK(inserted1) << "Attempt to override existing type handler in map"; | |
131 | |
132 bool inserted2 = | |
133 commit_contributor_map_.insert(std::make_pair(type, committer)).second; | |
134 DCHECK(inserted2) << "Attempt to override existing type handler in map"; | |
135 enabled_directory_types_.Put(type); | |
136 } | |
137 | |
138 DCHECK(Intersection(GetEnabledDirectoryTypes(), | |
139 GetEnabledNonBlockingTypes()).Empty()); | |
140 } | |
141 | |
142 void ModelTypeRegistry::ConnectType( | |
143 ModelType type, | |
144 std::unique_ptr<syncer_v2::ActivationContext> activation_context) { | |
145 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); | |
146 | |
147 // Initialize Worker -> Processor communication channel. | |
148 syncer_v2::ModelTypeProcessor* type_processor = | |
149 activation_context->type_processor.get(); | |
150 | |
151 std::unique_ptr<Cryptographer> cryptographer_copy; | |
152 if (encrypted_types_.Has(type)) | |
153 cryptographer_copy.reset(new Cryptographer(*cryptographer_)); | |
154 | |
155 std::unique_ptr<syncer_v2::ModelTypeWorker> worker( | |
156 new syncer_v2::ModelTypeWorker( | |
157 type, activation_context->data_type_state, | |
158 std::move(cryptographer_copy), nudge_handler_, | |
159 std::move(activation_context->type_processor))); | |
160 | |
161 // Initialize Processor -> Worker communication channel. | |
162 std::unique_ptr<syncer_v2::CommitQueue> commit_queue_proxy( | |
163 new CommitQueueProxy(worker->AsWeakPtr(), | |
164 scoped_refptr<base::SequencedTaskRunner>( | |
165 base::ThreadTaskRunnerHandle::Get()))); | |
166 | |
167 type_processor->ConnectSync(std::move(commit_queue_proxy)); | |
168 | |
169 DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); | |
170 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); | |
171 | |
172 update_handler_map_.insert(std::make_pair(type, worker.get())); | |
173 commit_contributor_map_.insert(std::make_pair(type, worker.get())); | |
174 | |
175 // The container takes ownership. | |
176 model_type_workers_.push_back(std::move(worker)); | |
177 | |
178 DCHECK(Intersection(GetEnabledDirectoryTypes(), | |
179 GetEnabledNonBlockingTypes()).Empty()); | |
180 } | |
181 | |
182 void ModelTypeRegistry::DisconnectType(ModelType type) { | |
183 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); | |
184 DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); | |
185 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); | |
186 | |
187 size_t updaters_erased = update_handler_map_.erase(type); | |
188 size_t committers_erased = commit_contributor_map_.erase(type); | |
189 | |
190 DCHECK_EQ(1U, updaters_erased); | |
191 DCHECK_EQ(1U, committers_erased); | |
192 | |
193 // Remove from the ScopedVector, deleting the worker in the process. | |
194 for (ScopedVector<syncer_v2::ModelTypeWorker>::iterator it = | |
195 model_type_workers_.begin(); | |
196 it != model_type_workers_.end(); ++it) { | |
197 if ((*it)->GetModelType() == type) { | |
198 model_type_workers_.erase(it); | |
199 break; | |
200 } | |
201 } | |
202 } | |
203 | |
204 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { | |
205 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); | |
206 } | |
207 | |
208 ModelTypeSet ModelTypeRegistry::GetInitialSyncEndedTypes() const { | |
209 // TODO(pavely): GetInitialSyncEndedTypes is queried at the end of sync | |
210 // manager initialization when update handlers aren't set up yet. Returning | |
211 // correct set of types is important because otherwise data for al types will | |
212 // be redownloaded during configuration. For now let's return union of types | |
213 // reported by directory and types reported by update handlers. We need to | |
214 // refactor initialization and configuratrion flow to be able to only query | |
215 // this set from update handlers. | |
216 ModelTypeSet result = directory_->InitialSyncEndedTypes(); | |
217 for (const auto& kv : update_handler_map_) { | |
218 if (kv.second->IsInitialSyncEnded()) | |
219 result.Put(kv.first); | |
220 } | |
221 return result; | |
222 } | |
223 | |
224 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { | |
225 return &update_handler_map_; | |
226 } | |
227 | |
228 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() { | |
229 return &commit_contributor_map_; | |
230 } | |
231 | |
232 DirectoryTypeDebugInfoEmitterMap* | |
233 ModelTypeRegistry::directory_type_debug_info_emitter_map() { | |
234 return &directory_type_debug_info_emitter_map_; | |
235 } | |
236 | |
237 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver( | |
238 syncer::TypeDebugInfoObserver* observer) { | |
239 if (!type_debug_info_observers_.HasObserver(observer)) | |
240 type_debug_info_observers_.AddObserver(observer); | |
241 } | |
242 | |
243 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver( | |
244 syncer::TypeDebugInfoObserver* observer) { | |
245 type_debug_info_observers_.RemoveObserver(observer); | |
246 } | |
247 | |
248 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver( | |
249 const syncer::TypeDebugInfoObserver* observer) const { | |
250 return type_debug_info_observers_.HasObserver(observer); | |
251 } | |
252 | |
253 void ModelTypeRegistry::RequestEmitDebugInfo() { | |
254 for (DirectoryTypeDebugInfoEmitterMap::iterator it = | |
255 directory_type_debug_info_emitter_map_.begin(); | |
256 it != directory_type_debug_info_emitter_map_.end(); ++it) { | |
257 it->second->EmitCommitCountersUpdate(); | |
258 it->second->EmitUpdateCountersUpdate(); | |
259 it->second->EmitStatusCountersUpdate(); | |
260 } | |
261 } | |
262 | |
263 base::WeakPtr<syncer_v2::ModelTypeConnector> ModelTypeRegistry::AsWeakPtr() { | |
264 return weak_ptr_factory_.GetWeakPtr(); | |
265 } | |
266 | |
267 void ModelTypeRegistry::OnPassphraseRequired( | |
268 PassphraseRequiredReason reason, | |
269 const sync_pb::EncryptedData& pending_keys) { | |
270 } | |
271 | |
272 void ModelTypeRegistry::OnPassphraseAccepted() { | |
273 } | |
274 | |
275 void ModelTypeRegistry::OnBootstrapTokenUpdated( | |
276 const std::string& bootstrap_token, | |
277 BootstrapTokenType type) { | |
278 } | |
279 | |
280 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, | |
281 bool encrypt_everything) { | |
282 encrypted_types_ = encrypted_types; | |
283 OnEncryptionStateChanged(); | |
284 } | |
285 | |
286 void ModelTypeRegistry::OnEncryptionComplete() { | |
287 } | |
288 | |
289 void ModelTypeRegistry::OnCryptographerStateChanged( | |
290 Cryptographer* cryptographer) { | |
291 cryptographer_.reset(new Cryptographer(*cryptographer)); | |
292 OnEncryptionStateChanged(); | |
293 } | |
294 | |
295 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type, | |
296 base::Time passphrase_time) { | |
297 } | |
298 | |
299 void ModelTypeRegistry::OnLocalSetPassphraseEncryption( | |
300 const SyncEncryptionHandler::NigoriState& nigori_state) { | |
301 } | |
302 | |
303 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { | |
304 return enabled_directory_types_; | |
305 } | |
306 | |
307 void ModelTypeRegistry::OnEncryptionStateChanged() { | |
308 for (ScopedVector<syncer_v2::ModelTypeWorker>::iterator it = | |
309 model_type_workers_.begin(); | |
310 it != model_type_workers_.end(); ++it) { | |
311 if (encrypted_types_.Has((*it)->GetModelType())) { | |
312 (*it)->UpdateCryptographer( | |
313 base::WrapUnique(new Cryptographer(*cryptographer_))); | |
314 } | |
315 } | |
316 } | |
317 | |
318 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { | |
319 ModelTypeSet enabled_non_blocking_types; | |
320 for (ScopedVector<syncer_v2::ModelTypeWorker>::const_iterator it = | |
321 model_type_workers_.begin(); | |
322 it != model_type_workers_.end(); ++it) { | |
323 enabled_non_blocking_types.Put((*it)->GetModelType()); | |
324 } | |
325 return enabled_non_blocking_types; | |
326 } | |
327 | |
328 } // namespace syncer | |
OLD | NEW |