Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(920)

Side by Side Diff: sync/engine/sync_scheduler.cc

Issue 10483015: [Sync] Refactor sync configuration logic. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: SetBool -> CallbackCounter Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler.h" 5 #include "sync/engine/sync_scheduler.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
60 return false; 60 return false;
61 } 61 }
62 } 62 }
63 63
64 bool IsActionableError( 64 bool IsActionableError(
65 const browser_sync::SyncProtocolError& error) { 65 const browser_sync::SyncProtocolError& error) {
66 return (error.action != browser_sync::UNKNOWN_ACTION); 66 return (error.action != browser_sync::UNKNOWN_ACTION);
67 } 67 }
68 } // namespace 68 } // namespace
69 69
70 ConfigureParams::ConfigureParams()
71 : source(GetUpdatesCallerInfo::UNKNOWN),
72 need_encryption_key(false) {}
73 ConfigureParams::ConfigureParams(
74 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
75 const syncable::ModelTypeSet& types_to_config,
76 const browser_sync::ModelSafeRoutingInfo& routing_info,
77 bool need_encryption_key,
78 const base::Closure& ready_task)
79 : source(source),
80 types_to_config(types_to_config),
81 routing_info(routing_info),
82 need_encryption_key(need_encryption_key),
83 ready_task(ready_task) {
84 DCHECK(!ready_task.is_null());
85 }
86 ConfigureParams::~ConfigureParams() {}
87
70 SyncScheduler::DelayProvider::DelayProvider() {} 88 SyncScheduler::DelayProvider::DelayProvider() {}
71 SyncScheduler::DelayProvider::~DelayProvider() {} 89 SyncScheduler::DelayProvider::~DelayProvider() {}
72 90
73 SyncScheduler::WaitInterval::WaitInterval() 91 SyncScheduler::WaitInterval::WaitInterval()
74 : mode(UNKNOWN), 92 : mode(UNKNOWN),
75 had_nudge(false) { 93 had_nudge(false) {
76 } 94 }
77 95
78 SyncScheduler::WaitInterval::~WaitInterval() {} 96 SyncScheduler::WaitInterval::~WaitInterval() {}
79 97
(...skipping 12 matching lines...) Expand all
92 SyncScheduler::SyncSessionJob::SyncSessionJob() 110 SyncScheduler::SyncSessionJob::SyncSessionJob()
93 : purpose(UNKNOWN), 111 : purpose(UNKNOWN),
94 is_canary_job(false) { 112 is_canary_job(false) {
95 } 113 }
96 114
97 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} 115 SyncScheduler::SyncSessionJob::~SyncSessionJob() {}
98 116
99 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, 117 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
100 base::TimeTicks start, 118 base::TimeTicks start,
101 linked_ptr<sessions::SyncSession> session, bool is_canary_job, 119 linked_ptr<sessions::SyncSession> session, bool is_canary_job,
102 const tracked_objects::Location& from_here) : purpose(purpose), 120 ConfigureParams config_params, const tracked_objects::Location& from_here)
103 scheduled_start(start), 121 : purpose(purpose),
104 session(session), 122 scheduled_start(start),
105 is_canary_job(is_canary_job), 123 session(session),
106 from_here(from_here) { 124 is_canary_job(is_canary_job),
125 config_params(config_params),
126 from_here(from_here) {
107 } 127 }
108 128
109 const char* SyncScheduler::SyncSessionJob::GetPurposeString( 129 const char* SyncScheduler::SyncSessionJob::GetPurposeString(
110 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { 130 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) {
111 switch (purpose) { 131 switch (purpose) {
112 ENUM_CASE(UNKNOWN); 132 ENUM_CASE(UNKNOWN);
113 ENUM_CASE(POLL); 133 ENUM_CASE(POLL);
114 ENUM_CASE(NUDGE); 134 ENUM_CASE(NUDGE);
115 ENUM_CASE(CLEAR_USER_DATA); 135 ENUM_CASE(CLEAR_USER_DATA);
116 ENUM_CASE(CONFIGURATION); 136 ENUM_CASE(CONFIGURATION);
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 259
240 void SyncScheduler::UpdateServerConnectionManagerStatus( 260 void SyncScheduler::UpdateServerConnectionManagerStatus(
241 HttpResponse::ServerConnectionCode code) { 261 HttpResponse::ServerConnectionCode code) {
242 DCHECK_EQ(MessageLoop::current(), sync_loop_); 262 DCHECK_EQ(MessageLoop::current(), sync_loop_);
243 SDVLOG(2) << "New server connection code: " 263 SDVLOG(2) << "New server connection code: "
244 << HttpResponse::GetServerConnectionCodeString(code); 264 << HttpResponse::GetServerConnectionCodeString(code);
245 265
246 connection_code_ = code; 266 connection_code_ = code;
247 } 267 }
248 268
249 void SyncScheduler::Start(Mode mode, const base::Closure& callback) { 269 void SyncScheduler::Start(Mode mode) {
250 DCHECK_EQ(MessageLoop::current(), sync_loop_); 270 DCHECK_EQ(MessageLoop::current(), sync_loop_);
251 std::string thread_name = MessageLoop::current()->thread_name(); 271 std::string thread_name = MessageLoop::current()->thread_name();
252 if (thread_name.empty()) 272 if (thread_name.empty())
253 thread_name = "<Main thread>"; 273 thread_name = "<Main thread>";
254 SDVLOG(2) << "Start called from thread " 274 SDVLOG(2) << "Start called from thread "
255 << thread_name << " with mode " << GetModeString(mode); 275 << thread_name << " with mode " << GetModeString(mode);
256 if (!started_) { 276 if (!started_) {
257 started_ = true; 277 started_ = true;
258 SendInitialSnapshot(); 278 SendInitialSnapshot();
259 } 279 }
260 280
261 DCHECK(!session_context_->account_name().empty()); 281 DCHECK(!session_context_->account_name().empty());
262 DCHECK(syncer_.get()); 282 DCHECK(syncer_.get());
263 Mode old_mode = mode_; 283 Mode old_mode = mode_;
264 mode_ = mode; 284 mode_ = mode;
265 AdjustPolling(NULL); // Will kick start poll timer if needed. 285 AdjustPolling(NULL); // Will kick start poll timer if needed.
266 if (!callback.is_null())
267 callback.Run();
268 286
269 if (old_mode != mode_) { 287 if (old_mode != mode_) {
270 // We just changed our mode. See if there are any pending jobs that we could 288 // We just changed our mode. See if there are any pending jobs that we could
271 // execute in the new mode. 289 // execute in the new mode.
272 DoPendingJobIfPossible(false); 290 DoPendingJobIfPossible(false);
273 } 291 }
274 } 292 }
275 293
276 void SyncScheduler::SendInitialSnapshot() { 294 void SyncScheduler::SendInitialSnapshot() {
277 DCHECK_EQ(MessageLoop::current(), sync_loop_); 295 DCHECK_EQ(MessageLoop::current(), sync_loop_);
278 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 296 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
279 SyncSourceInfo(), ModelSafeRoutingInfo(), 297 SyncSourceInfo(), ModelSafeRoutingInfo(),
280 std::vector<ModelSafeWorker*>())); 298 std::vector<ModelSafeWorker*>()));
281 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 299 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
282 event.snapshot = dummy->TakeSnapshot(); 300 event.snapshot = dummy->TakeSnapshot();
283 session_context_->NotifyListeners(event); 301 session_context_->NotifyListeners(event);
284 } 302 }
285 303
304 namespace {
305
306 // Helper to extract the routing info and workers corresponding to types in
307 // |types| from |current_routes| and |current_workers|.
308 void BuildModelSafeParams(
309 const ModelTypeSet& types_to_config,
310 const ModelSafeRoutingInfo& current_routes,
311 const std::vector<ModelSafeWorker*>& current_workers,
312 ModelSafeRoutingInfo* result_routes,
313 std::vector<ModelSafeWorker*>* result_workers) {
314 std::set<ModelSafeGroup> active_groups;
315 active_groups.insert(GROUP_PASSIVE);
316 for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good();
317 iter.Inc()) {
318 syncable::ModelType type = iter.Get();
319 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
320 DCHECK(route != current_routes.end());
321 ModelSafeGroup group = route->second;
322 (*result_routes)[type] = group;
323 active_groups.insert(group);
324 }
325
326 for(std::vector<ModelSafeWorker*>::const_iterator iter =
327 current_workers.begin(); iter != current_workers.end(); ++iter) {
328 if (active_groups.count((*iter)->GetModelSafeGroup()) > 0)
329 result_workers->push_back(*iter);
330 }
331 }
332
333 } // namespace.
334
335 bool SyncScheduler::ScheduleConfiguration(const ConfigureParams& params) {
336 DCHECK_EQ(MessageLoop::current(), sync_loop_);
337 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
338 DCHECK_EQ(CONFIGURATION_MODE, mode_);
339 DCHECK(!params.ready_task.is_null());
340 SDVLOG(2) << "Reconfiguring syncer.";
341
342 // Only one configuration is allowed at a time. Verify we're not waiting
343 // for a pending configure job.
344 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get());
345
346 // TODO(sync): now that ModelChanging commands only use those workers within
347 // the routing info, we don't really need |restricted_workers|. Remove it.
348 browser_sync::ModelSafeRoutingInfo restricted_routes;
349 std::vector<ModelSafeWorker*> restricted_workers;
350 BuildModelSafeParams(params.types_to_config,
351 params.routing_info,
352 session_context_->workers(),
353 &restricted_routes,
354 &restricted_workers);
355 session_context_->set_routing_info(params.routing_info);
356
357 // TODO(sync): if it's confirmed that Cleanup has no effect on non-configures,
358 // remove this command and add a call to PurgeEntriesWithTypeIn here. This
tim (not reviewing) 2012/06/14 00:26:53 nit - well, we don't want to poke the directory di
Nicolas Zea 2012/06/14 22:35:52 Removed comment. Note that this is being remove an
359 // will also allow us to get rid of previous_session_routing_info.
360 SyncSessionJob cleanup_job(
361 SyncSessionJob::CLEANUP_DISABLED_TYPES,
362 TimeTicks::Now(),
363 make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
364 false,
365 ConfigureParams(),
366 FROM_HERE);
367 DoSyncSessionJob(cleanup_job);
tim (not reviewing) 2012/06/14 00:26:53 Hmm. So, this might choose not to actually execute
Nicolas Zea 2012/06/14 22:35:52 Done, and mentioned bug number for getting rid of
368
369 if (params.need_encryption_key) {
370 // TODO(zea): implement in such a way that we can handle failures and the
371 // subsequent retrys the scheduler might perform.
372 NOTIMPLEMENTED();
tim (not reviewing) 2012/06/14 00:26:53 bug?
Nicolas Zea 2012/06/14 22:35:52 Done.
373 }
374
375 // Only reconfigure if we have types to config.
tim (not reviewing) 2012/06/14 00:26:53 It seems weird that this whole flow is called conf
Nicolas Zea 2012/06/14 22:35:52 Done.
376 if (!params.types_to_config.Empty()) {
377 DCHECK(!restricted_routes.empty());
378 // TODO(tim): config-specific GetUpdatesCallerInfo value?
tim (not reviewing) 2012/06/14 00:26:53 We have a config specific GetUpdatesCallerInfo...
Nicolas Zea 2012/06/14 22:35:52 Woops, I removed this is the other patch, but it c
379 linked_ptr<SyncSession> session(new SyncSession(
380 session_context_,
381 this,
382 SyncSourceInfo(params.source,
383 syncable::ModelTypePayloadMapFromRoutingInfo(
384 restricted_routes,
385 std::string())),
386 restricted_routes,
387 restricted_workers));
388 SyncSessionJob job(SyncSessionJob::CONFIGURATION,
389 TimeTicks::Now(),
390 session,
391 false,
392 params,
393 FROM_HERE);
394 DoSyncSessionJob(job);
395
396 // If we failed, the job would have been saved as the pending configure
397 // job and a wait interval would have been set.
398 if (!session->Succeeded()) {
399 DCHECK(wait_interval_.get() &&
400 wait_interval_->pending_configure_job.get());
401 return false;
402 }
403 } else {
404 SDVLOG(2) << "No change in routing info, calling ready task directly.";
405 params.ready_task.Run();
406 }
407
408 return true;
409 }
410
286 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( 411 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval(
287 const SyncSessionJob& job) { 412 const SyncSessionJob& job) {
288 DCHECK_EQ(MessageLoop::current(), sync_loop_); 413 DCHECK_EQ(MessageLoop::current(), sync_loop_);
289 DCHECK(wait_interval_.get()); 414 DCHECK(wait_interval_.get());
290 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); 415 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
291 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); 416 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES);
292 417
293 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 418 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
294 << WaitInterval::GetModeString(wait_interval_->mode) 419 << WaitInterval::GetModeString(wait_interval_->mode)
295 << (wait_interval_->had_nudge ? " (had nudge)" : "") 420 << (wait_interval_->had_nudge ? " (had nudge)" : "")
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
374 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { 499 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) {
375 DCHECK_EQ(MessageLoop::current(), sync_loop_); 500 DCHECK_EQ(MessageLoop::current(), sync_loop_);
376 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); 501 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
377 if (pending_nudge_.get() == NULL) { 502 if (pending_nudge_.get() == NULL) {
378 SDVLOG(2) << "Creating a pending nudge job"; 503 SDVLOG(2) << "Creating a pending nudge job";
379 SyncSession* s = job.session.get(); 504 SyncSession* s = job.session.get();
380 scoped_ptr<SyncSession> session(new SyncSession(s->context(), 505 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
381 s->delegate(), s->source(), s->routing_info(), s->workers())); 506 s->delegate(), s->source(), s->routing_info(), s->workers()));
382 507
383 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, 508 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
384 make_linked_ptr(session.release()), false, job.from_here); 509 make_linked_ptr(session.release()), false,
510 ConfigureParams(), job.from_here);
385 pending_nudge_.reset(new SyncSessionJob(new_job)); 511 pending_nudge_.reset(new SyncSessionJob(new_job));
386 512
387 return; 513 return;
388 } 514 }
389 515
390 SDVLOG(2) << "Coalescing a pending nudge"; 516 SDVLOG(2) << "Coalescing a pending nudge";
391 pending_nudge_->session->Coalesce(*(job.session.get())); 517 pending_nudge_->session->Coalesce(*(job.session.get()));
392 pending_nudge_->scheduled_start = job.scheduled_start; 518 pending_nudge_->scheduled_start = job.scheduled_start;
393 519
394 // Unfortunately the nudge location cannot be modified. So it stores the 520 // Unfortunately the nudge location cannot be modified. So it stores the
(...skipping 25 matching lines...) Expand all
420 // TODO(sync): Should we also check that job.purpose != 546 // TODO(sync): Should we also check that job.purpose !=
421 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) 547 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.)
422 if (job.purpose == SyncSessionJob::NUDGE) { 548 if (job.purpose == SyncSessionJob::NUDGE) {
423 SDVLOG(2) << "Saving a nudge job"; 549 SDVLOG(2) << "Saving a nudge job";
424 InitOrCoalescePendingJob(job); 550 InitOrCoalescePendingJob(job);
425 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ 551 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
426 SDVLOG(2) << "Saving a configuration job"; 552 SDVLOG(2) << "Saving a configuration job";
427 DCHECK(wait_interval_.get()); 553 DCHECK(wait_interval_.get());
428 DCHECK(mode_ == CONFIGURATION_MODE); 554 DCHECK(mode_ == CONFIGURATION_MODE);
429 555
556 // Config params should always get set.
557 DCHECK(!job.config_params.ready_task.is_null());
430 SyncSession* old = job.session.get(); 558 SyncSession* old = job.session.get();
431 SyncSession* s(new SyncSession(session_context_, this, old->source(), 559 SyncSession* s(new SyncSession(session_context_, this, old->source(),
432 old->routing_info(), old->workers())); 560 old->routing_info(), old->workers()));
433 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), 561 SyncSessionJob new_job(job.purpose,
434 make_linked_ptr(s), false, job.from_here); 562 TimeTicks::Now(),
563 make_linked_ptr(s),
564 false,
565 job.config_params,
566 job.from_here);
435 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); 567 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
436 } // drop the rest. 568 } // drop the rest.
437 // TODO(sync): Is it okay to drop the rest? It's weird that 569 // TODO(sync): Is it okay to drop the rest? It's weird that
438 // SaveJob() only does what it says sometimes. (See 570 // SaveJob() only does what it says sometimes. (See
439 // http://crbug.com/90868.) 571 // http://crbug.com/90868.)
440 } 572 }
441 573
442 // Functor for std::find_if to search by ModelSafeGroup. 574 // Functor for std::find_if to search by ModelSafeGroup.
443 struct ModelSafeWorkerGroupIs { 575 struct ModelSafeWorkerGroupIs {
444 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 576 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
445 bool operator()(ModelSafeWorker* w) { 577 bool operator()(ModelSafeWorker* w) {
446 return group == w->GetModelSafeGroup(); 578 return group == w->GetModelSafeGroup();
447 } 579 }
448 ModelSafeGroup group; 580 ModelSafeGroup group;
449 }; 581 };
450 582
451 void SyncScheduler::ClearUserData() { 583 void SyncScheduler::ClearUserData() {
452 DCHECK_EQ(MessageLoop::current(), sync_loop_); 584 DCHECK_EQ(MessageLoop::current(), sync_loop_);
453 SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA, TimeTicks::Now(), 585 SyncSessionJob job(SyncSessionJob::CLEAR_USER_DATA,
586 TimeTicks::Now(),
454 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), 587 make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
455 false, 588 false,
589 ConfigureParams(),
456 FROM_HERE); 590 FROM_HERE);
457 591
458 DoSyncSessionJob(job); 592 DoSyncSessionJob(job);
459 } 593 }
460 594
461 void SyncScheduler::CleanupDisabledTypes() {
462 DCHECK_EQ(MessageLoop::current(), sync_loop_);
463 SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(),
464 make_linked_ptr(CreateSyncSession(SyncSourceInfo())),
465 false,
466 FROM_HERE);
467 DoSyncSessionJob(job);
468 }
469
470 void SyncScheduler::ScheduleNudgeAsync( 595 void SyncScheduler::ScheduleNudgeAsync(
471 const TimeDelta& delay, 596 const TimeDelta& delay,
472 NudgeSource source, ModelTypeSet types, 597 NudgeSource source, ModelTypeSet types,
473 const tracked_objects::Location& nudge_location) { 598 const tracked_objects::Location& nudge_location) {
474 DCHECK_EQ(MessageLoop::current(), sync_loop_); 599 DCHECK_EQ(MessageLoop::current(), sync_loop_);
475 SDVLOG_LOC(nudge_location, 2) 600 SDVLOG_LOC(nudge_location, 2)
476 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 601 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
477 << "source " << GetNudgeSourceString(source) << ", " 602 << "source " << GetNudgeSourceString(source) << ", "
478 << "types " << ModelTypeSetToString(types); 603 << "types " << ModelTypeSetToString(types);
479 604
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
517 << "source " << GetUpdatesSourceString(source) << ", " 642 << "source " << GetUpdatesSourceString(source) << ", "
518 << "payloads " 643 << "payloads "
519 << syncable::ModelTypePayloadMapToString(types_with_payloads) 644 << syncable::ModelTypePayloadMapToString(types_with_payloads)
520 << (is_canary_job ? " (canary)" : ""); 645 << (is_canary_job ? " (canary)" : "");
521 646
522 SyncSourceInfo info(source, types_with_payloads); 647 SyncSourceInfo info(source, types_with_payloads);
523 648
524 SyncSession* session(CreateSyncSession(info)); 649 SyncSession* session(CreateSyncSession(info));
525 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 650 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
526 make_linked_ptr(session), is_canary_job, 651 make_linked_ptr(session), is_canary_job,
527 nudge_location); 652 ConfigureParams(), nudge_location);
528 653
529 session = NULL; 654 session = NULL;
530 if (!ShouldRunJob(job)) 655 if (!ShouldRunJob(job))
531 return; 656 return;
532 657
533 if (pending_nudge_.get()) { 658 if (pending_nudge_.get()) {
534 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { 659 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
535 SDVLOG(2) << "Dropping the nudge because we are in backoff"; 660 SDVLOG(2) << "Dropping the nudge because we are in backoff";
536 return; 661 return;
537 } 662 }
(...skipping 10 matching lines...) Expand all
548 job.scheduled_start = std::min(job.scheduled_start, 673 job.scheduled_start = std::min(job.scheduled_start,
549 pending_nudge_->scheduled_start); 674 pending_nudge_->scheduled_start);
550 pending_nudge_.reset(); 675 pending_nudge_.reset();
551 } 676 }
552 677
553 // TODO(zea): Consider adding separate throttling/backoff for datatype 678 // TODO(zea): Consider adding separate throttling/backoff for datatype
554 // refresh requests. 679 // refresh requests.
555 ScheduleSyncSessionJob(job); 680 ScheduleSyncSessionJob(job);
556 } 681 }
557 682
558 // Helper to extract the routing info and workers corresponding to types in
559 // |types| from |current_routes| and |current_workers|.
560 void GetModelSafeParamsForTypes(ModelTypeSet types,
561 const ModelSafeRoutingInfo& current_routes,
562 const std::vector<ModelSafeWorker*>& current_workers,
563 ModelSafeRoutingInfo* result_routes,
564 std::vector<ModelSafeWorker*>* result_workers) {
565 bool passive_group_added = false;
566
567 typedef std::vector<ModelSafeWorker*>::const_iterator iter;
568 for (ModelTypeSet::Iterator it = types.First();
569 it.Good(); it.Inc()) {
570 const syncable::ModelType t = it.Get();
571 ModelSafeRoutingInfo::const_iterator route = current_routes.find(t);
572 DCHECK(route != current_routes.end());
573 ModelSafeGroup group = route->second;
574
575 (*result_routes)[t] = group;
576 iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(),
577 ModelSafeWorkerGroupIs(group));
578 if (w_tmp_it != current_workers.end()) {
579 iter result_workers_it = std::find_if(
580 result_workers->begin(), result_workers->end(),
581 ModelSafeWorkerGroupIs(group));
582 if (result_workers_it == result_workers->end())
583 result_workers->push_back(*w_tmp_it);
584
585 if (group == GROUP_PASSIVE)
586 passive_group_added = true;
587 } else {
588 NOTREACHED();
589 }
590 }
591
592 // Always add group passive.
593 if (passive_group_added == false) {
594 iter it = std::find_if(current_workers.begin(), current_workers.end(),
595 ModelSafeWorkerGroupIs(GROUP_PASSIVE));
596 if (it != current_workers.end())
597 result_workers->push_back(*it);
598 else
599 NOTREACHED();
600 }
601 }
602
603 void SyncScheduler::ScheduleConfiguration(
604 ModelTypeSet types,
605 GetUpdatesCallerInfo::GetUpdatesSource source) {
606 DCHECK_EQ(MessageLoop::current(), sync_loop_);
607 DCHECK(IsConfigRelatedUpdateSourceValue(source));
608 SDVLOG(2) << "Scheduling a config";
609
610 ModelSafeRoutingInfo routes;
611 std::vector<ModelSafeWorker*> workers;
612 GetModelSafeParamsForTypes(types,
613 session_context_->routing_info(),
614 session_context_->workers(),
615 &routes, &workers);
616
617 SyncSession* session = new SyncSession(session_context_, this,
618 SyncSourceInfo(source,
619 syncable::ModelTypePayloadMapFromRoutingInfo(
620 routes, std::string())),
621 routes, workers);
622 SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(),
623 make_linked_ptr(session),
624 false,
625 FROM_HERE);
626 DoSyncSessionJob(job);
627 }
628
629 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { 683 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) {
630 switch (mode) { 684 switch (mode) {
631 ENUM_CASE(CONFIGURATION_MODE); 685 ENUM_CASE(CONFIGURATION_MODE);
632 ENUM_CASE(NORMAL_MODE); 686 ENUM_CASE(NORMAL_MODE);
633 } 687 }
634 return ""; 688 return "";
635 } 689 }
636 690
637 const char* SyncScheduler::GetDecisionString( 691 const char* SyncScheduler::GetDecisionString(
638 SyncScheduler::JobProcessDecision mode) { 692 SyncScheduler::JobProcessDecision mode) {
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
838 DCHECK_EQ(MessageLoop::current(), sync_loop_); 892 DCHECK_EQ(MessageLoop::current(), sync_loop_);
839 DCHECK(!old_job.session->HasMoreToSync()); 893 DCHECK(!old_job.session->HasMoreToSync());
840 894
841 AdjustPolling(&old_job); 895 AdjustPolling(&old_job);
842 896
843 if (old_job.session->Succeeded()) { 897 if (old_job.session->Succeeded()) {
844 // Only reset backoff if we actually reached the server. 898 // Only reset backoff if we actually reached the server.
845 if (old_job.session->SuccessfullyReachedServer()) 899 if (old_job.session->SuccessfullyReachedServer())
846 wait_interval_.reset(); 900 wait_interval_.reset();
847 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 901 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
902
903 // If this was a configuration job with a ready task, invoke it now that
904 // we finished successfully.
tim (not reviewing) 2012/06/14 00:26:53 Why do this in 'ScheduleNextSync'? It's not reall
Nicolas Zea 2012/06/14 22:35:52 Primarily just because this is where we were alrea
905 if (!old_job.config_params.ready_task.is_null())
906 old_job.config_params.ready_task.Run();
848 return; 907 return;
849 } 908 }
850 909
851 if (old_job.purpose == SyncSessionJob::POLL) { 910 if (old_job.purpose == SyncSessionJob::POLL) {
852 return; // We don't retry POLL jobs. 911 return; // We don't retry POLL jobs.
853 } 912 }
854 913
855 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 914 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
856 // if we don't succeed. Some types of errors are not likely to disappear on 915 // if we don't succeed. Some types of errors are not likely to disappear on
857 // their own. With the return values now available in the old_job.session, we 916 // their own. With the return values now available in the old_job.session, we
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
923 982
924 SDVLOG(2) << "In handle continuation error with " 983 SDVLOG(2) << "In handle continuation error with "
925 << SyncSessionJob::GetPurposeString(old_job.purpose) 984 << SyncSessionJob::GetPurposeString(old_job.purpose)
926 << " job. The time delta(ms) is " 985 << " job. The time delta(ms) is "
927 << length.InMilliseconds(); 986 << length.InMilliseconds();
928 987
929 // This will reset the had_nudge variable as well. 988 // This will reset the had_nudge variable as well.
930 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 989 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
931 length)); 990 length));
932 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 991 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
992 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
993 // Config params should always get set.
994 DCHECK(!old_job.config_params.ready_task.is_null());
933 SyncSession* old = old_job.session.get(); 995 SyncSession* old = old_job.session.get();
934 SyncSession* s(new SyncSession(session_context_, this, 996 SyncSession* s(new SyncSession(session_context_, this,
935 old->source(), old->routing_info(), old->workers())); 997 old->source(), old->routing_info(), old->workers()));
936 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, 998 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
937 make_linked_ptr(s), false, FROM_HERE); 999 make_linked_ptr(s), false, old_job.config_params,
1000 FROM_HERE);
938 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); 1001 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
939 } else { 1002 } else {
940 // We are not in configuration mode. So wait_interval's pending job 1003 // We are not in configuration mode. So wait_interval's pending job
941 // should be null. 1004 // should be null.
942 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 1005 DCHECK(wait_interval_->pending_configure_job.get() == NULL);
943 1006
944 // TODO(lipalani) - handle clear user data. 1007 // TODO(lipalani) - handle clear user data.
945 InitOrCoalescePendingJob(old_job); 1008 InitOrCoalescePendingJob(old_job);
946 } 1009 }
947 RestartWaiting(); 1010 RestartWaiting();
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1049 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1112 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1050 ModelSafeRoutingInfo r; 1113 ModelSafeRoutingInfo r;
1051 ModelTypePayloadMap types_with_payloads = 1114 ModelTypePayloadMap types_with_payloads =
1052 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); 1115 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
1053 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); 1116 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
1054 SyncSession* s = CreateSyncSession(info); 1117 SyncSession* s = CreateSyncSession(info);
1055 1118
1056 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), 1119 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(),
1057 make_linked_ptr(s), 1120 make_linked_ptr(s),
1058 false, 1121 false,
1122 ConfigureParams(),
1059 FROM_HERE); 1123 FROM_HERE);
1060 1124
1061 ScheduleSyncSessionJob(job); 1125 ScheduleSyncSessionJob(job);
1062 } 1126 }
1063 1127
1064 void SyncScheduler::Unthrottle() { 1128 void SyncScheduler::Unthrottle() {
1065 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1129 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1066 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1130 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1067 SDVLOG(2) << "Unthrottled."; 1131 SDVLOG(2) << "Unthrottled.";
1068 DoCanaryJob(); 1132 DoCanaryJob();
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
1151 1215
1152 #undef SDVLOG_LOC 1216 #undef SDVLOG_LOC
1153 1217
1154 #undef SDVLOG 1218 #undef SDVLOG
1155 1219
1156 #undef SLOG 1220 #undef SLOG
1157 1221
1158 #undef ENUM_CASE 1222 #undef ENUM_CASE
1159 1223
1160 } // browser_sync 1224 } // browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698