OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/engine/get_updates_processor.h" | |
6 | |
7 #include <stddef.h> | |
8 | |
9 #include <map> | |
10 | |
11 #include "base/trace_event/trace_event.h" | |
12 #include "sync/engine/get_updates_delegate.h" | |
13 #include "sync/engine/syncer_proto_util.h" | |
14 #include "sync/engine/update_handler.h" | |
15 #include "sync/internal_api/public/events/get_updates_response_event.h" | |
16 #include "sync/protocol/sync.pb.h" | |
17 #include "sync/sessions/status_controller.h" | |
18 #include "sync/sessions/sync_session.h" | |
19 #include "sync/syncable/directory.h" | |
20 #include "sync/syncable/nigori_handler.h" | |
21 #include "sync/syncable/syncable_read_transaction.h" | |
22 | |
23 typedef std::vector<const sync_pb::SyncEntity*> SyncEntityList; | |
24 typedef std::map<syncer::ModelType, SyncEntityList> TypeSyncEntityMap; | |
25 | |
26 namespace syncer { | |
27 | |
28 typedef std::map<ModelType, size_t> TypeToIndexMap; | |
29 | |
30 namespace { | |
31 | |
32 bool ShouldRequestEncryptionKey(sessions::SyncSessionContext* context) { | |
33 syncable::Directory* dir = context->directory(); | |
34 syncable::ReadTransaction trans(FROM_HERE, dir); | |
35 syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler(); | |
36 return nigori_handler->NeedKeystoreKey(&trans); | |
37 } | |
38 | |
39 | |
40 SyncerError HandleGetEncryptionKeyResponse( | |
41 const sync_pb::ClientToServerResponse& update_response, | |
42 syncable::Directory* dir) { | |
43 bool success = false; | |
44 if (update_response.get_updates().encryption_keys_size() == 0) { | |
45 LOG(ERROR) << "Failed to receive encryption key from server."; | |
46 return SERVER_RESPONSE_VALIDATION_FAILED; | |
47 } | |
48 syncable::ReadTransaction trans(FROM_HERE, dir); | |
49 syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler(); | |
50 success = nigori_handler->SetKeystoreKeys( | |
51 update_response.get_updates().encryption_keys(), | |
52 &trans); | |
53 | |
54 DVLOG(1) << "GetUpdates returned " | |
55 << update_response.get_updates().encryption_keys_size() | |
56 << "encryption keys. Nigori keystore key " | |
57 << (success ? "" : "not ") << "updated."; | |
58 return (success ? SYNCER_OK : SERVER_RESPONSE_VALIDATION_FAILED); | |
59 } | |
60 | |
61 // Given a GetUpdates response, iterates over all the returned items and | |
62 // divides them according to their type. Outputs a map from model types to | |
63 // received SyncEntities. The output map will have entries (possibly empty) | |
64 // for all types in |requested_types|. | |
65 void PartitionUpdatesByType(const sync_pb::GetUpdatesResponse& gu_response, | |
66 ModelTypeSet requested_types, | |
67 TypeSyncEntityMap* updates_by_type) { | |
68 int update_count = gu_response.entries().size(); | |
69 for (ModelTypeSet::Iterator it = requested_types.First(); | |
70 it.Good(); it.Inc()) { | |
71 updates_by_type->insert(std::make_pair(it.Get(), SyncEntityList())); | |
72 } | |
73 for (int i = 0; i < update_count; ++i) { | |
74 const sync_pb::SyncEntity& update = gu_response.entries(i); | |
75 ModelType type = GetModelType(update); | |
76 if (!IsRealDataType(type)) { | |
77 NOTREACHED() << "Received update with invalid type."; | |
78 continue; | |
79 } | |
80 | |
81 TypeSyncEntityMap::iterator it = updates_by_type->find(type); | |
82 if (it == updates_by_type->end()) { | |
83 DLOG(WARNING) | |
84 << "Received update for unexpected type or the type is throttled:" | |
85 << ModelTypeToString(type); | |
86 continue; | |
87 } | |
88 | |
89 it->second.push_back(&update); | |
90 } | |
91 } | |
92 | |
93 // Builds a map of ModelTypes to indices to progress markers in the given | |
94 // |gu_response| message. The map is returned in the |index_map| parameter. | |
95 void PartitionProgressMarkersByType( | |
96 const sync_pb::GetUpdatesResponse& gu_response, | |
97 ModelTypeSet request_types, | |
98 TypeToIndexMap* index_map) { | |
99 for (int i = 0; i < gu_response.new_progress_marker_size(); ++i) { | |
100 int field_number = gu_response.new_progress_marker(i).data_type_id(); | |
101 ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number); | |
102 if (!IsRealDataType(model_type)) { | |
103 DLOG(WARNING) << "Unknown field number " << field_number; | |
104 continue; | |
105 } | |
106 if (!request_types.Has(model_type)) { | |
107 DLOG(WARNING) | |
108 << "Skipping unexpected progress marker for non-enabled type " | |
109 << ModelTypeToString(model_type); | |
110 continue; | |
111 } | |
112 index_map->insert(std::make_pair(model_type, i)); | |
113 } | |
114 } | |
115 | |
116 void PartitionContextMutationsByType( | |
117 const sync_pb::GetUpdatesResponse& gu_response, | |
118 ModelTypeSet request_types, | |
119 TypeToIndexMap* index_map) { | |
120 for (int i = 0; i < gu_response.context_mutations_size(); ++i) { | |
121 int field_number = gu_response.context_mutations(i).data_type_id(); | |
122 ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number); | |
123 if (!IsRealDataType(model_type)) { | |
124 DLOG(WARNING) << "Unknown field number " << field_number; | |
125 continue; | |
126 } | |
127 if (!request_types.Has(model_type)) { | |
128 DLOG(WARNING) | |
129 << "Skipping unexpected context mutation for non-enabled type " | |
130 << ModelTypeToString(model_type); | |
131 continue; | |
132 } | |
133 index_map->insert(std::make_pair(model_type, i)); | |
134 } | |
135 } | |
136 | |
137 // Initializes the parts of the GetUpdatesMessage that depend on shared state, | |
138 // like the ShouldRequestEncryptionKey() status. This is kept separate from the | |
139 // other of the message-building functions to make the rest of the code easier | |
140 // to test. | |
141 void InitDownloadUpdatesContext( | |
142 sessions::SyncSession* session, | |
143 bool create_mobile_bookmarks_folder, | |
144 sync_pb::ClientToServerMessage* message) { | |
145 message->set_share(session->context()->account_name()); | |
146 message->set_message_contents(sync_pb::ClientToServerMessage::GET_UPDATES); | |
147 | |
148 sync_pb::GetUpdatesMessage* get_updates = message->mutable_get_updates(); | |
149 | |
150 // We want folders for our associated types, always. If we were to set | |
151 // this to false, the server would send just the non-container items | |
152 // (e.g. Bookmark URLs but not their containing folders). | |
153 get_updates->set_fetch_folders(true); | |
154 | |
155 get_updates->set_create_mobile_bookmarks_folder( | |
156 create_mobile_bookmarks_folder); | |
157 bool need_encryption_key = ShouldRequestEncryptionKey(session->context()); | |
158 get_updates->set_need_encryption_key(need_encryption_key); | |
159 | |
160 // Set legacy GetUpdatesMessage.GetUpdatesCallerInfo information. | |
161 get_updates->mutable_caller_info()->set_notifications_enabled( | |
162 session->context()->notifications_enabled()); | |
163 } | |
164 | |
165 } // namespace | |
166 | |
167 GetUpdatesProcessor::GetUpdatesProcessor(UpdateHandlerMap* update_handler_map, | |
168 const GetUpdatesDelegate& delegate) | |
169 : update_handler_map_(update_handler_map), delegate_(delegate) {} | |
170 | |
171 GetUpdatesProcessor::~GetUpdatesProcessor() {} | |
172 | |
173 SyncerError GetUpdatesProcessor::DownloadUpdates( | |
174 ModelTypeSet* request_types, | |
175 sessions::SyncSession* session, | |
176 bool create_mobile_bookmarks_folder) { | |
177 TRACE_EVENT0("sync", "DownloadUpdates"); | |
178 | |
179 sync_pb::ClientToServerMessage message; | |
180 InitDownloadUpdatesContext(session, | |
181 create_mobile_bookmarks_folder, | |
182 &message); | |
183 PrepareGetUpdates(*request_types, &message); | |
184 | |
185 SyncerError result = ExecuteDownloadUpdates(request_types, session, &message); | |
186 session->mutable_status_controller()->set_last_download_updates_result( | |
187 result); | |
188 return result; | |
189 } | |
190 | |
191 void GetUpdatesProcessor::PrepareGetUpdates( | |
192 ModelTypeSet gu_types, | |
193 sync_pb::ClientToServerMessage* message) { | |
194 sync_pb::GetUpdatesMessage* get_updates = message->mutable_get_updates(); | |
195 | |
196 for (ModelTypeSet::Iterator it = gu_types.First(); it.Good(); it.Inc()) { | |
197 UpdateHandlerMap::iterator handler_it = update_handler_map_->find(it.Get()); | |
198 DCHECK(handler_it != update_handler_map_->end()) | |
199 << "Failed to look up handler for " << ModelTypeToString(it.Get()); | |
200 sync_pb::DataTypeProgressMarker* progress_marker = | |
201 get_updates->add_from_progress_marker(); | |
202 handler_it->second->GetDownloadProgress(progress_marker); | |
203 progress_marker->clear_gc_directive(); | |
204 | |
205 sync_pb::DataTypeContext context; | |
206 handler_it->second->GetDataTypeContext(&context); | |
207 if (!context.context().empty()) | |
208 get_updates->add_client_contexts()->Swap(&context); | |
209 } | |
210 | |
211 delegate_.HelpPopulateGuMessage(get_updates); | |
212 } | |
213 | |
214 SyncerError GetUpdatesProcessor::ExecuteDownloadUpdates( | |
215 ModelTypeSet* request_types, | |
216 sessions::SyncSession* session, | |
217 sync_pb::ClientToServerMessage* msg) { | |
218 sync_pb::ClientToServerResponse update_response; | |
219 sessions::StatusController* status = session->mutable_status_controller(); | |
220 bool need_encryption_key = ShouldRequestEncryptionKey(session->context()); | |
221 | |
222 if (session->context()->debug_info_getter()) { | |
223 sync_pb::DebugInfo* debug_info = msg->mutable_debug_info(); | |
224 CopyClientDebugInfo(session->context()->debug_info_getter(), debug_info); | |
225 } | |
226 | |
227 session->SendProtocolEvent( | |
228 *(delegate_.GetNetworkRequestEvent(base::Time::Now(), *msg))); | |
229 | |
230 ModelTypeSet partial_failure_data_types; | |
231 | |
232 SyncerError result = SyncerProtoUtil::PostClientToServerMessage( | |
233 msg, &update_response, session, &partial_failure_data_types); | |
234 | |
235 DVLOG(2) << SyncerProtoUtil::ClientToServerResponseDebugString( | |
236 update_response); | |
237 | |
238 if (result == SERVER_RETURN_PARTIAL_FAILURE) { | |
239 request_types->RemoveAll(partial_failure_data_types); | |
240 } else if (result != SYNCER_OK) { | |
241 GetUpdatesResponseEvent response_event( | |
242 base::Time::Now(), update_response, result); | |
243 session->SendProtocolEvent(response_event); | |
244 | |
245 // Sync authorization expires every 60 mintues, so SYNC_AUTH_ERROR will | |
246 // appear every 60 minutes, and then sync services will refresh the | |
247 // authorization. Therefore SYNC_AUTH_ERROR is excluded here to reduce the | |
248 // ERROR messages in the log. | |
249 if (result != SYNC_AUTH_ERROR) { | |
250 LOG(ERROR) << "PostClientToServerMessage() failed during GetUpdates"; | |
251 } | |
252 | |
253 return result; | |
254 } | |
255 | |
256 DVLOG(1) << "GetUpdates returned " | |
257 << update_response.get_updates().entries_size() | |
258 << " updates."; | |
259 | |
260 | |
261 if (session->context()->debug_info_getter()) { | |
262 // Clear debug info now that we have successfully sent it to the server. | |
263 DVLOG(1) << "Clearing client debug info."; | |
264 session->context()->debug_info_getter()->ClearDebugInfo(); | |
265 } | |
266 | |
267 if (need_encryption_key || | |
268 update_response.get_updates().encryption_keys_size() > 0) { | |
269 syncable::Directory* dir = session->context()->directory(); | |
270 status->set_last_get_key_result( | |
271 HandleGetEncryptionKeyResponse(update_response, dir)); | |
272 } | |
273 | |
274 SyncerError process_result = | |
275 ProcessResponse(update_response.get_updates(), *request_types, status); | |
276 | |
277 GetUpdatesResponseEvent response_event( | |
278 base::Time::Now(), update_response, process_result); | |
279 session->SendProtocolEvent(response_event); | |
280 | |
281 DVLOG(1) << "GetUpdates result: " << process_result; | |
282 | |
283 return process_result; | |
284 } | |
285 | |
286 SyncerError GetUpdatesProcessor::ProcessResponse( | |
287 const sync_pb::GetUpdatesResponse& gu_response, | |
288 ModelTypeSet request_types, | |
289 sessions::StatusController* status) { | |
290 status->increment_num_updates_downloaded_by(gu_response.entries_size()); | |
291 | |
292 // The changes remaining field is used to prevent the client from looping. If | |
293 // that field is being set incorrectly, we're in big trouble. | |
294 if (!gu_response.has_changes_remaining()) { | |
295 return SERVER_RESPONSE_VALIDATION_FAILED; | |
296 } | |
297 | |
298 syncer::SyncerError result = | |
299 ProcessGetUpdatesResponse(request_types, gu_response, status); | |
300 if (result != syncer::SYNCER_OK) | |
301 return result; | |
302 | |
303 if (gu_response.changes_remaining() == 0) { | |
304 return SYNCER_OK; | |
305 } else { | |
306 return SERVER_MORE_TO_DOWNLOAD; | |
307 } | |
308 } | |
309 | |
310 syncer::SyncerError GetUpdatesProcessor::ProcessGetUpdatesResponse( | |
311 ModelTypeSet gu_types, | |
312 const sync_pb::GetUpdatesResponse& gu_response, | |
313 sessions::StatusController* status_controller) { | |
314 TypeSyncEntityMap updates_by_type; | |
315 PartitionUpdatesByType(gu_response, gu_types, &updates_by_type); | |
316 DCHECK_EQ(gu_types.Size(), updates_by_type.size()); | |
317 | |
318 TypeToIndexMap progress_index_by_type; | |
319 PartitionProgressMarkersByType(gu_response, | |
320 gu_types, | |
321 &progress_index_by_type); | |
322 if (gu_types.Size() != progress_index_by_type.size()) { | |
323 NOTREACHED() << "Missing progress markers in GetUpdates response."; | |
324 return syncer::SERVER_RESPONSE_VALIDATION_FAILED; | |
325 } | |
326 | |
327 TypeToIndexMap context_by_type; | |
328 PartitionContextMutationsByType(gu_response, gu_types, &context_by_type); | |
329 | |
330 // Iterate over these maps in parallel, processing updates for each type. | |
331 TypeToIndexMap::iterator progress_marker_iter = | |
332 progress_index_by_type.begin(); | |
333 TypeSyncEntityMap::iterator updates_iter = updates_by_type.begin(); | |
334 for (; (progress_marker_iter != progress_index_by_type.end() | |
335 && updates_iter != updates_by_type.end()); | |
336 ++progress_marker_iter, ++updates_iter) { | |
337 DCHECK_EQ(progress_marker_iter->first, updates_iter->first); | |
338 ModelType type = progress_marker_iter->first; | |
339 | |
340 UpdateHandlerMap::iterator update_handler_iter = | |
341 update_handler_map_->find(type); | |
342 | |
343 sync_pb::DataTypeContext context; | |
344 TypeToIndexMap::iterator context_iter = context_by_type.find(type); | |
345 if (context_iter != context_by_type.end()) | |
346 context.CopyFrom(gu_response.context_mutations(context_iter->second)); | |
347 | |
348 if (update_handler_iter != update_handler_map_->end()) { | |
349 syncer::SyncerError result = | |
350 update_handler_iter->second->ProcessGetUpdatesResponse( | |
351 gu_response.new_progress_marker(progress_marker_iter->second), | |
352 context, | |
353 updates_iter->second, | |
354 status_controller); | |
355 if (result != syncer::SYNCER_OK) | |
356 return result; | |
357 } else { | |
358 DLOG(WARNING) | |
359 << "Ignoring received updates of a type we can't handle. " | |
360 << "Type is: " << ModelTypeToString(type); | |
361 continue; | |
362 } | |
363 } | |
364 DCHECK(progress_marker_iter == progress_index_by_type.end() && | |
365 updates_iter == updates_by_type.end()); | |
366 | |
367 return syncer::SYNCER_OK; | |
368 } | |
369 | |
370 void GetUpdatesProcessor::ApplyUpdates( | |
371 ModelTypeSet gu_types, | |
372 sessions::StatusController* status_controller) { | |
373 status_controller->set_get_updates_request_types(gu_types); | |
374 delegate_.ApplyUpdates(gu_types, status_controller, update_handler_map_); | |
375 } | |
376 | |
377 void GetUpdatesProcessor::CopyClientDebugInfo( | |
378 sessions::DebugInfoGetter* debug_info_getter, | |
379 sync_pb::DebugInfo* debug_info) { | |
380 DVLOG(1) << "Copying client debug info to send."; | |
381 debug_info_getter->GetDebugInfo(debug_info); | |
382 } | |
383 | |
384 } // namespace syncer | |
OLD | NEW |