Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sync/engine/sync_scheduler.cc

Issue 10689185: Revert 146262 - Revert "Revert 142517 - [Sync] Refactor sync configuration logic." (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 8 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sync/engine/sync_scheduler.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler.h" 5 #include "sync/engine/sync_scheduler.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
58 return false; 58 return false;
59 } 59 }
60 } 60 }
61 61
62 bool IsActionableError( 62 bool IsActionableError(
63 const syncer::SyncProtocolError& error) { 63 const syncer::SyncProtocolError& error) {
64 return (error.action != syncer::UNKNOWN_ACTION); 64 return (error.action != syncer::UNKNOWN_ACTION);
65 } 65 }
66 } // namespace 66 } // namespace
67 67
68 ConfigurationParams::ConfigurationParams()
69 : source(GetUpdatesCallerInfo::UNKNOWN),
70 keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {}
71 ConfigurationParams::ConfigurationParams(
72 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
73 const syncer::ModelTypeSet& types_to_download,
74 const syncer::ModelSafeRoutingInfo& routing_info,
75 KeystoreKeyStatus keystore_key_status,
76 const base::Closure& ready_task)
77 : source(source),
78 types_to_download(types_to_download),
79 routing_info(routing_info),
80 keystore_key_status(keystore_key_status),
81 ready_task(ready_task) {
82 DCHECK(!ready_task.is_null());
83 }
84 ConfigurationParams::~ConfigurationParams() {}
85
86 SyncScheduler::DelayProvider::DelayProvider() {} 68 SyncScheduler::DelayProvider::DelayProvider() {}
87 SyncScheduler::DelayProvider::~DelayProvider() {} 69 SyncScheduler::DelayProvider::~DelayProvider() {}
88 70
89 SyncScheduler::WaitInterval::WaitInterval() 71 SyncScheduler::WaitInterval::WaitInterval()
90 : mode(UNKNOWN), 72 : mode(UNKNOWN),
91 had_nudge(false) { 73 had_nudge(false) {
92 } 74 }
93 75
94 SyncScheduler::WaitInterval::~WaitInterval() {} 76 SyncScheduler::WaitInterval::~WaitInterval() {}
95 77
(...skipping 11 matching lines...) Expand all
107 89
108 SyncScheduler::SyncSessionJob::SyncSessionJob() 90 SyncScheduler::SyncSessionJob::SyncSessionJob()
109 : purpose(UNKNOWN), 91 : purpose(UNKNOWN),
110 is_canary_job(false) { 92 is_canary_job(false) {
111 } 93 }
112 94
113 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} 95 SyncScheduler::SyncSessionJob::~SyncSessionJob() {}
114 96
115 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, 97 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
116 base::TimeTicks start, 98 base::TimeTicks start,
117 linked_ptr<sessions::SyncSession> session, 99 linked_ptr<sessions::SyncSession> session, bool is_canary_job,
118 bool is_canary_job, 100 const tracked_objects::Location& from_here) : purpose(purpose),
119 const ConfigurationParams& config_params, 101 scheduled_start(start),
120 const tracked_objects::Location& from_here) 102 session(session),
121 : purpose(purpose), 103 is_canary_job(is_canary_job),
122 scheduled_start(start), 104 from_here(from_here) {
123 session(session),
124 is_canary_job(is_canary_job),
125 config_params(config_params),
126 from_here(from_here) {
127 } 105 }
128 106
129 const char* SyncScheduler::SyncSessionJob::GetPurposeString( 107 const char* SyncScheduler::SyncSessionJob::GetPurposeString(
130 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { 108 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) {
131 switch (purpose) { 109 switch (purpose) {
132 ENUM_CASE(UNKNOWN); 110 ENUM_CASE(UNKNOWN);
133 ENUM_CASE(POLL); 111 ENUM_CASE(POLL);
134 ENUM_CASE(NUDGE); 112 ENUM_CASE(NUDGE);
135 ENUM_CASE(CONFIGURATION); 113 ENUM_CASE(CONFIGURATION);
136 ENUM_CASE(CLEANUP_DISABLED_TYPES); 114 ENUM_CASE(CLEANUP_DISABLED_TYPES);
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
258 236
259 void SyncScheduler::UpdateServerConnectionManagerStatus( 237 void SyncScheduler::UpdateServerConnectionManagerStatus(
260 HttpResponse::ServerConnectionCode code) { 238 HttpResponse::ServerConnectionCode code) {
261 DCHECK_EQ(MessageLoop::current(), sync_loop_); 239 DCHECK_EQ(MessageLoop::current(), sync_loop_);
262 SDVLOG(2) << "New server connection code: " 240 SDVLOG(2) << "New server connection code: "
263 << HttpResponse::GetServerConnectionCodeString(code); 241 << HttpResponse::GetServerConnectionCodeString(code);
264 242
265 connection_code_ = code; 243 connection_code_ = code;
266 } 244 }
267 245
268 void SyncScheduler::Start(Mode mode) { 246 void SyncScheduler::Start(Mode mode, const base::Closure& callback) {
269 DCHECK_EQ(MessageLoop::current(), sync_loop_); 247 DCHECK_EQ(MessageLoop::current(), sync_loop_);
270 std::string thread_name = MessageLoop::current()->thread_name(); 248 std::string thread_name = MessageLoop::current()->thread_name();
271 if (thread_name.empty()) 249 if (thread_name.empty())
272 thread_name = "<Main thread>"; 250 thread_name = "<Main thread>";
273 SDVLOG(2) << "Start called from thread " 251 SDVLOG(2) << "Start called from thread "
274 << thread_name << " with mode " << GetModeString(mode); 252 << thread_name << " with mode " << GetModeString(mode);
275 if (!started_) { 253 if (!started_) {
276 started_ = true; 254 started_ = true;
277 SendInitialSnapshot(); 255 SendInitialSnapshot();
278 } 256 }
279 257
280 DCHECK(!session_context_->account_name().empty()); 258 DCHECK(!session_context_->account_name().empty());
281 DCHECK(syncer_.get()); 259 DCHECK(syncer_.get());
282 Mode old_mode = mode_; 260 Mode old_mode = mode_;
283 mode_ = mode; 261 mode_ = mode;
284 AdjustPolling(NULL); // Will kick start poll timer if needed. 262 AdjustPolling(NULL); // Will kick start poll timer if needed.
263 if (!callback.is_null())
264 callback.Run();
285 265
286 if (old_mode != mode_) { 266 if (old_mode != mode_) {
287 // We just changed our mode. See if there are any pending jobs that we could 267 // We just changed our mode. See if there are any pending jobs that we could
288 // execute in the new mode. 268 // execute in the new mode.
289 DoPendingJobIfPossible(false); 269 DoPendingJobIfPossible(false);
290 } 270 }
291 } 271 }
292 272
293 void SyncScheduler::SendInitialSnapshot() { 273 void SyncScheduler::SendInitialSnapshot() {
294 DCHECK_EQ(MessageLoop::current(), sync_loop_); 274 DCHECK_EQ(MessageLoop::current(), sync_loop_);
295 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 275 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
296 SyncSourceInfo(), ModelSafeRoutingInfo(), 276 SyncSourceInfo(), ModelSafeRoutingInfo(),
297 std::vector<ModelSafeWorker*>())); 277 std::vector<ModelSafeWorker*>()));
298 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 278 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
299 event.snapshot = dummy->TakeSnapshot(); 279 event.snapshot = dummy->TakeSnapshot();
300 session_context_->NotifyListeners(event); 280 session_context_->NotifyListeners(event);
301 } 281 }
302 282
303 namespace {
304
305 // Helper to extract the routing info and workers corresponding to types in
306 // |types| from |current_routes| and |current_workers|.
307 void BuildModelSafeParams(
308 const ModelTypeSet& types_to_download,
309 const ModelSafeRoutingInfo& current_routes,
310 const std::vector<ModelSafeWorker*>& current_workers,
311 ModelSafeRoutingInfo* result_routes,
312 std::vector<ModelSafeWorker*>* result_workers) {
313 std::set<ModelSafeGroup> active_groups;
314 active_groups.insert(GROUP_PASSIVE);
315 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
316 iter.Inc()) {
317 syncer::ModelType type = iter.Get();
318 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
319 DCHECK(route != current_routes.end());
320 ModelSafeGroup group = route->second;
321 (*result_routes)[type] = group;
322 active_groups.insert(group);
323 }
324
325 for(std::vector<ModelSafeWorker*>::const_iterator iter =
326 current_workers.begin(); iter != current_workers.end(); ++iter) {
327 if (active_groups.count((*iter)->GetModelSafeGroup()) > 0)
328 result_workers->push_back(*iter);
329 }
330 }
331
332 } // namespace.
333
334 bool SyncScheduler::ScheduleConfiguration(const ConfigurationParams& params) {
335 DCHECK_EQ(MessageLoop::current(), sync_loop_);
336 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
337 DCHECK_EQ(CONFIGURATION_MODE, mode_);
338 DCHECK(!params.ready_task.is_null());
339 SDVLOG(2) << "Reconfiguring syncer.";
340
341 // Only one configuration is allowed at a time. Verify we're not waiting
342 // for a pending configure job.
343 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
344
345 // TODO(sync): now that ModelChanging commands only use those workers within
346 // the routing info, we don't really need |restricted_workers|. Remove it.
347 // crbug.com/133030
348 syncer::ModelSafeRoutingInfo restricted_routes;
349 std::vector<ModelSafeWorker*> restricted_workers;
350 BuildModelSafeParams(params.types_to_download,
351 params.routing_info,
352 session_context_->workers(),
353 &restricted_routes,
354 &restricted_workers);
355 session_context_->set_routing_info(params.routing_info);
356
357 // We rely on this not failing, so don't need to worry about checking for
358 // success. In addition, this will be removed as part of crbug.com/131433.
359 SyncSessionJob cleanup_job(
360 SyncSessionJob::CLEANUP_DISABLED_TYPES,
361 TimeTicks::Now(),
362 make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
363 false,
364 ConfigurationParams(),
365 FROM_HERE);
366 DoSyncSessionJob(cleanup_job);
367
368 if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) {
369 // TODO(zea): implement in such a way that we can handle failures and the
370 // subsequent retrys the scheduler might perform. See crbug.com/129665.
371 NOTIMPLEMENTED();
372 }
373
374 // Only reconfigure if we have types to download.
375 if (!params.types_to_download.Empty()) {
376 DCHECK(!restricted_routes.empty());
377 linked_ptr<SyncSession> session(new SyncSession(
378 session_context_,
379 this,
380 SyncSourceInfo(params.source,
381 ModelSafeRoutingInfoToPayloadMap(
382 restricted_routes,
383 std::string())),
384 restricted_routes,
385 restricted_workers));
386 SyncSessionJob job(SyncSessionJob::CONFIGURATION,
387 TimeTicks::Now(),
388 session,
389 false,
390 params,
391 FROM_HERE);
392 DoSyncSessionJob(job);
393
394 // If we failed, the job would have been saved as the pending configure
395 // job and a wait interval would have been set.
396 if (!session->Succeeded()) {
397 DCHECK(wait_interval_.get() &&
398 wait_interval_->pending_configure_job.get());
399 return false;
400 }
401 } else {
402 SDVLOG(2) << "No change in routing info, calling ready task directly.";
403 params.ready_task.Run();
404 }
405
406 return true;
407 }
408
409 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( 283 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval(
410 const SyncSessionJob& job) { 284 const SyncSessionJob& job) {
411 DCHECK_EQ(MessageLoop::current(), sync_loop_); 285 DCHECK_EQ(MessageLoop::current(), sync_loop_);
412 DCHECK(wait_interval_.get()); 286 DCHECK(wait_interval_.get());
413 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); 287 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES);
414 288
415 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 289 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
416 << WaitInterval::GetModeString(wait_interval_->mode) 290 << WaitInterval::GetModeString(wait_interval_->mode)
417 << (wait_interval_->had_nudge ? " (had nudge)" : "") 291 << (wait_interval_->had_nudge ? " (had nudge)" : "")
418 << (job.is_canary_job ? " (canary)" : ""); 292 << (job.is_canary_job ? " (canary)" : "");
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
495 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { 369 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) {
496 DCHECK_EQ(MessageLoop::current(), sync_loop_); 370 DCHECK_EQ(MessageLoop::current(), sync_loop_);
497 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); 371 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
498 if (pending_nudge_.get() == NULL) { 372 if (pending_nudge_.get() == NULL) {
499 SDVLOG(2) << "Creating a pending nudge job"; 373 SDVLOG(2) << "Creating a pending nudge job";
500 SyncSession* s = job.session.get(); 374 SyncSession* s = job.session.get();
501 scoped_ptr<SyncSession> session(new SyncSession(s->context(), 375 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
502 s->delegate(), s->source(), s->routing_info(), s->workers())); 376 s->delegate(), s->source(), s->routing_info(), s->workers()));
503 377
504 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, 378 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
505 make_linked_ptr(session.release()), false, 379 make_linked_ptr(session.release()), false, job.from_here);
506 ConfigurationParams(), job.from_here);
507 pending_nudge_.reset(new SyncSessionJob(new_job)); 380 pending_nudge_.reset(new SyncSessionJob(new_job));
508 381
509 return; 382 return;
510 } 383 }
511 384
512 SDVLOG(2) << "Coalescing a pending nudge"; 385 SDVLOG(2) << "Coalescing a pending nudge";
513 pending_nudge_->session->Coalesce(*(job.session.get())); 386 pending_nudge_->session->Coalesce(*(job.session.get()));
514 pending_nudge_->scheduled_start = job.scheduled_start; 387 pending_nudge_->scheduled_start = job.scheduled_start;
515 388
516 // Unfortunately the nudge location cannot be modified. So it stores the 389 // Unfortunately the nudge location cannot be modified. So it stores the
(...skipping 24 matching lines...) Expand all
541 // TODO(sync): Should we also check that job.purpose != 414 // TODO(sync): Should we also check that job.purpose !=
542 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) 415 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.)
543 if (job.purpose == SyncSessionJob::NUDGE) { 416 if (job.purpose == SyncSessionJob::NUDGE) {
544 SDVLOG(2) << "Saving a nudge job"; 417 SDVLOG(2) << "Saving a nudge job";
545 InitOrCoalescePendingJob(job); 418 InitOrCoalescePendingJob(job);
546 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ 419 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
547 SDVLOG(2) << "Saving a configuration job"; 420 SDVLOG(2) << "Saving a configuration job";
548 DCHECK(wait_interval_.get()); 421 DCHECK(wait_interval_.get());
549 DCHECK(mode_ == CONFIGURATION_MODE); 422 DCHECK(mode_ == CONFIGURATION_MODE);
550 423
551 // Config params should always get set.
552 DCHECK(!job.config_params.ready_task.is_null());
553 SyncSession* old = job.session.get(); 424 SyncSession* old = job.session.get();
554 SyncSession* s(new SyncSession(session_context_, this, old->source(), 425 SyncSession* s(new SyncSession(session_context_, this, old->source(),
555 old->routing_info(), old->workers())); 426 old->routing_info(), old->workers()));
556 SyncSessionJob new_job(job.purpose, 427 SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
557 TimeTicks::Now(), 428 make_linked_ptr(s), false, job.from_here);
558 make_linked_ptr(s),
559 false,
560 job.config_params,
561 job.from_here);
562 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); 429 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
563 } // drop the rest. 430 } // drop the rest.
564 // TODO(sync): Is it okay to drop the rest? It's weird that 431 // TODO(sync): Is it okay to drop the rest? It's weird that
565 // SaveJob() only does what it says sometimes. (See 432 // SaveJob() only does what it says sometimes. (See
566 // http://crbug.com/90868.) 433 // http://crbug.com/90868.)
567 } 434 }
568 435
569 // Functor for std::find_if to search by ModelSafeGroup. 436 // Functor for std::find_if to search by ModelSafeGroup.
570 struct ModelSafeWorkerGroupIs { 437 struct ModelSafeWorkerGroupIs {
571 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 438 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
572 bool operator()(ModelSafeWorker* w) { 439 bool operator()(ModelSafeWorker* w) {
573 return group == w->GetModelSafeGroup(); 440 return group == w->GetModelSafeGroup();
574 } 441 }
575 ModelSafeGroup group; 442 ModelSafeGroup group;
576 }; 443 };
577 444
445 void SyncScheduler::CleanupDisabledTypes() {
446 DCHECK_EQ(MessageLoop::current(), sync_loop_);
447 SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(),
448 make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
449 false,
450 FROM_HERE);
451 DoSyncSessionJob(job);
452 }
453
578 void SyncScheduler::ScheduleNudgeAsync( 454 void SyncScheduler::ScheduleNudgeAsync(
579 const TimeDelta& delay, 455 const TimeDelta& delay,
580 NudgeSource source, ModelTypeSet types, 456 NudgeSource source, ModelTypeSet types,
581 const tracked_objects::Location& nudge_location) { 457 const tracked_objects::Location& nudge_location) {
582 DCHECK_EQ(MessageLoop::current(), sync_loop_); 458 DCHECK_EQ(MessageLoop::current(), sync_loop_);
583 SDVLOG_LOC(nudge_location, 2) 459 SDVLOG_LOC(nudge_location, 2)
584 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 460 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
585 << "source " << GetNudgeSourceString(source) << ", " 461 << "source " << GetNudgeSourceString(source) << ", "
586 << "types " << ModelTypeSetToString(types); 462 << "types " << ModelTypeSetToString(types);
587 463
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
625 << "source " << GetUpdatesSourceString(source) << ", " 501 << "source " << GetUpdatesSourceString(source) << ", "
626 << "payloads " 502 << "payloads "
627 << syncer::ModelTypePayloadMapToString(types_with_payloads) 503 << syncer::ModelTypePayloadMapToString(types_with_payloads)
628 << (is_canary_job ? " (canary)" : ""); 504 << (is_canary_job ? " (canary)" : "");
629 505
630 SyncSourceInfo info(source, types_with_payloads); 506 SyncSourceInfo info(source, types_with_payloads);
631 507
632 SyncSession* session(CreateSyncSession(info)); 508 SyncSession* session(CreateSyncSession(info));
633 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 509 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
634 make_linked_ptr(session), is_canary_job, 510 make_linked_ptr(session), is_canary_job,
635 ConfigurationParams(), nudge_location); 511 nudge_location);
636 512
637 session = NULL; 513 session = NULL;
638 if (!ShouldRunJob(job)) 514 if (!ShouldRunJob(job))
639 return; 515 return;
640 516
641 if (pending_nudge_.get()) { 517 if (pending_nudge_.get()) {
642 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { 518 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
643 SDVLOG(2) << "Dropping the nudge because we are in backoff"; 519 SDVLOG(2) << "Dropping the nudge because we are in backoff";
644 return; 520 return;
645 } 521 }
(...skipping 10 matching lines...) Expand all
656 job.scheduled_start = std::min(job.scheduled_start, 532 job.scheduled_start = std::min(job.scheduled_start,
657 pending_nudge_->scheduled_start); 533 pending_nudge_->scheduled_start);
658 pending_nudge_.reset(); 534 pending_nudge_.reset();
659 } 535 }
660 536
661 // TODO(zea): Consider adding separate throttling/backoff for datatype 537 // TODO(zea): Consider adding separate throttling/backoff for datatype
662 // refresh requests. 538 // refresh requests.
663 ScheduleSyncSessionJob(job); 539 ScheduleSyncSessionJob(job);
664 } 540 }
665 541
542 // Helper to extract the routing info and workers corresponding to types in
543 // |types| from |current_routes| and |current_workers|.
544 void GetModelSafeParamsForTypes(ModelTypeSet types,
545 const ModelSafeRoutingInfo& current_routes,
546 const std::vector<ModelSafeWorker*>& current_workers,
547 ModelSafeRoutingInfo* result_routes,
548 std::vector<ModelSafeWorker*>* result_workers) {
549 bool passive_group_added = false;
550
551 typedef std::vector<ModelSafeWorker*>::const_iterator iter;
552 for (ModelTypeSet::Iterator it = types.First();
553 it.Good(); it.Inc()) {
554 const syncer::ModelType t = it.Get();
555 ModelSafeRoutingInfo::const_iterator route = current_routes.find(t);
556 DCHECK(route != current_routes.end());
557 ModelSafeGroup group = route->second;
558
559 (*result_routes)[t] = group;
560 iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(),
561 ModelSafeWorkerGroupIs(group));
562 if (w_tmp_it != current_workers.end()) {
563 iter result_workers_it = std::find_if(
564 result_workers->begin(), result_workers->end(),
565 ModelSafeWorkerGroupIs(group));
566 if (result_workers_it == result_workers->end())
567 result_workers->push_back(*w_tmp_it);
568
569 if (group == GROUP_PASSIVE)
570 passive_group_added = true;
571 } else {
572 NOTREACHED();
573 }
574 }
575
576 // Always add group passive.
577 if (passive_group_added == false) {
578 iter it = std::find_if(current_workers.begin(), current_workers.end(),
579 ModelSafeWorkerGroupIs(GROUP_PASSIVE));
580 if (it != current_workers.end())
581 result_workers->push_back(*it);
582 else
583 NOTREACHED();
584 }
585 }
586
587 void SyncScheduler::ScheduleConfiguration(
588 ModelTypeSet types,
589 GetUpdatesCallerInfo::GetUpdatesSource source) {
590 DCHECK_EQ(MessageLoop::current(), sync_loop_);
591 DCHECK(IsConfigRelatedUpdateSourceValue(source));
592 SDVLOG(2) << "Scheduling a config";
593
594 ModelSafeRoutingInfo routes;
595 std::vector<ModelSafeWorker*> workers;
596 GetModelSafeParamsForTypes(types,
597 session_context_->routing_info(),
598 session_context_->workers(),
599 &routes, &workers);
600
601 SyncSession* session = new SyncSession(session_context_, this,
602 SyncSourceInfo(source,
603 ModelSafeRoutingInfoToPayloadMap(routes, std::string())),
604 routes, workers);
605 SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(),
606 make_linked_ptr(session),
607 false,
608 FROM_HERE);
609 DoSyncSessionJob(job);
610 }
611
666 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { 612 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) {
667 switch (mode) { 613 switch (mode) {
668 ENUM_CASE(CONFIGURATION_MODE); 614 ENUM_CASE(CONFIGURATION_MODE);
669 ENUM_CASE(NORMAL_MODE); 615 ENUM_CASE(NORMAL_MODE);
670 } 616 }
671 return ""; 617 return "";
672 } 618 }
673 619
674 const char* SyncScheduler::GetDecisionString( 620 const char* SyncScheduler::GetDecisionString(
675 SyncScheduler::JobProcessDecision mode) { 621 SyncScheduler::JobProcessDecision mode) {
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
854 ServerConnectionManager* scm = session_context_->connection_manager(); 800 ServerConnectionManager* scm = session_context_->connection_manager();
855 UpdateServerConnectionManagerStatus(scm->server_status()); 801 UpdateServerConnectionManagerStatus(scm->server_status());
856 802
857 UpdateCarryoverSessionState(job); 803 UpdateCarryoverSessionState(job);
858 if (IsSyncingCurrentlySilenced()) { 804 if (IsSyncingCurrentlySilenced()) {
859 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; 805 SDVLOG(2) << "We are currently throttled; not scheduling the next sync.";
860 // TODO(sync): Investigate whether we need to check job.purpose 806 // TODO(sync): Investigate whether we need to check job.purpose
861 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) 807 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
862 SaveJob(job); 808 SaveJob(job);
863 return; // Nothing to do. 809 return; // Nothing to do.
864 } else if (job.session->Succeeded() &&
865 !job.config_params.ready_task.is_null()) {
866 // If this was a configuration job with a ready task, invoke it now that
867 // we finished successfully.
868 job.config_params.ready_task.Run();
869 } 810 }
870 811
871 SDVLOG(2) << "Updating the next polling time after SyncMain"; 812 SDVLOG(2) << "Updating the next polling time after SyncMain";
872 ScheduleNextSync(job); 813 ScheduleNextSync(job);
873 } 814 }
874 815
875 void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { 816 void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) {
876 DCHECK_EQ(MessageLoop::current(), sync_loop_); 817 DCHECK_EQ(MessageLoop::current(), sync_loop_);
877 DCHECK(!old_job.session->HasMoreToSync()); 818 DCHECK(!old_job.session->HasMoreToSync());
878 819
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
961 902
962 SDVLOG(2) << "In handle continuation error with " 903 SDVLOG(2) << "In handle continuation error with "
963 << SyncSessionJob::GetPurposeString(old_job.purpose) 904 << SyncSessionJob::GetPurposeString(old_job.purpose)
964 << " job. The time delta(ms) is " 905 << " job. The time delta(ms) is "
965 << length.InMilliseconds(); 906 << length.InMilliseconds();
966 907
967 // This will reset the had_nudge variable as well. 908 // This will reset the had_nudge variable as well.
968 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 909 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
969 length)); 910 length));
970 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 911 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
971 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
972 // Config params should always get set.
973 DCHECK(!old_job.config_params.ready_task.is_null());
974 SyncSession* old = old_job.session.get(); 912 SyncSession* old = old_job.session.get();
975 SyncSession* s(new SyncSession(session_context_, this, 913 SyncSession* s(new SyncSession(session_context_, this,
976 old->source(), old->routing_info(), old->workers())); 914 old->source(), old->routing_info(), old->workers()));
977 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, 915 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
978 make_linked_ptr(s), false, old_job.config_params, 916 make_linked_ptr(s), false, FROM_HERE);
979 FROM_HERE);
980 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); 917 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
981 } else { 918 } else {
982 // We are not in configuration mode. So wait_interval's pending job 919 // We are not in configuration mode. So wait_interval's pending job
983 // should be null. 920 // should be null.
984 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 921 DCHECK(wait_interval_->pending_configure_job.get() == NULL);
985 922
986 // TODO(lipalani) - handle clear user data. 923 // TODO(lipalani) - handle clear user data.
987 InitOrCoalescePendingJob(old_job); 924 InitOrCoalescePendingJob(old_job);
988 } 925 }
989 RestartWaiting(); 926 RestartWaiting();
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1028 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1092 ModelSafeRoutingInfo r; 1029 ModelSafeRoutingInfo r;
1093 ModelTypePayloadMap types_with_payloads = 1030 ModelTypePayloadMap types_with_payloads =
1094 ModelSafeRoutingInfoToPayloadMap(r, std::string()); 1031 ModelSafeRoutingInfoToPayloadMap(r, std::string());
1095 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); 1032 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
1096 SyncSession* s = CreateSyncSession(info); 1033 SyncSession* s = CreateSyncSession(info);
1097 1034
1098 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), 1035 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
1099 make_linked_ptr(s), 1036 make_linked_ptr(s),
1100 false, 1037 false,
1101 ConfigurationParams(),
1102 FROM_HERE); 1038 FROM_HERE);
1103 1039
1104 ScheduleSyncSessionJob(job); 1040 ScheduleSyncSessionJob(job);
1105 } 1041 }
1106 1042
1107 void SyncScheduler::Unthrottle() { 1043 void SyncScheduler::Unthrottle() {
1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1044 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1109 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1045 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1110 SDVLOG(2) << "Unthrottled."; 1046 SDVLOG(2) << "Unthrottled.";
1111 DoCanaryJob(); 1047 DoCanaryJob();
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
1195 1131
1196 #undef SDVLOG_LOC 1132 #undef SDVLOG_LOC
1197 1133
1198 #undef SDVLOG 1134 #undef SDVLOG
1199 1135
1200 #undef SLOG 1136 #undef SLOG
1201 1137
1202 #undef ENUM_CASE 1138 #undef ENUM_CASE
1203 1139
1204 } // namespace syncer 1140 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/engine/sync_scheduler.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698