OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
218 << thread_name << " with mode " << GetModeString(mode); | 218 << thread_name << " with mode " << GetModeString(mode); |
219 if (!started_) { | 219 if (!started_) { |
220 started_ = true; | 220 started_ = true; |
221 SendInitialSnapshot(); | 221 SendInitialSnapshot(); |
222 } | 222 } |
223 | 223 |
224 DCHECK(!session_context_->account_name().empty()); | 224 DCHECK(!session_context_->account_name().empty()); |
225 DCHECK(syncer_.get()); | 225 DCHECK(syncer_.get()); |
226 Mode old_mode = mode_; | 226 Mode old_mode = mode_; |
227 mode_ = mode; | 227 mode_ = mode; |
228 AdjustPolling(NULL); // Will kick start poll timer if needed. | 228 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. |
229 | 229 |
230 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { | 230 if (old_mode != mode_ && mode_ == NORMAL_MODE && !nudge_tracker_.IsEmpty()) { |
231 // We just got back to normal mode. Let's try to run the work that was | 231 // We just got back to normal mode. Let's try to run the work that was |
232 // queued up while we were configuring. | 232 // queued up while we were configuring. |
233 DoNudgeSyncSessionJob(NORMAL_PRIORITY); | 233 DoNudgeSyncSessionJob(NORMAL_PRIORITY); |
234 } | 234 } |
235 } | 235 } |
236 | 236 |
237 void SyncSchedulerImpl::SendInitialSnapshot() { | 237 void SyncSchedulerImpl::SendInitialSnapshot() { |
238 DCHECK(CalledOnValidThread()); | 238 DCHECK(CalledOnValidThread()); |
239 scoped_ptr<SyncSession> dummy(new SyncSession( | 239 scoped_ptr<SyncSession> dummy(new SyncSession( |
240 session_context_, this, SyncSourceInfo())); | 240 session_context_, this, SyncSourceInfo())); |
(...skipping 26 matching lines...) Expand all Loading... |
267 const ConfigurationParams& params) { | 267 const ConfigurationParams& params) { |
268 DCHECK(CalledOnValidThread()); | 268 DCHECK(CalledOnValidThread()); |
269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
270 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 270 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
271 DCHECK(!params.ready_task.is_null()); | 271 DCHECK(!params.ready_task.is_null()); |
272 CHECK(started_) << "Scheduler must be running to configure."; | 272 CHECK(started_) << "Scheduler must be running to configure."; |
273 SDVLOG(2) << "Reconfiguring syncer."; | 273 SDVLOG(2) << "Reconfiguring syncer."; |
274 | 274 |
275 // Only one configuration is allowed at a time. Verify we're not waiting | 275 // Only one configuration is allowed at a time. Verify we're not waiting |
276 // for a pending configure job. | 276 // for a pending configure job. |
277 DCHECK(!pending_configure_job_); | 277 DCHECK(!pending_configure_params_); |
278 | 278 |
279 ModelSafeRoutingInfo restricted_routes; | 279 ModelSafeRoutingInfo restricted_routes; |
280 BuildModelSafeParams(params.types_to_download, | 280 BuildModelSafeParams(params.types_to_download, |
281 params.routing_info, | 281 params.routing_info, |
282 &restricted_routes); | 282 &restricted_routes); |
283 session_context_->set_routing_info(restricted_routes); | 283 session_context_->set_routing_info(restricted_routes); |
284 | 284 |
285 // Only reconfigure if we have types to download. | 285 // Only reconfigure if we have types to download. |
286 if (!params.types_to_download.Empty()) { | 286 if (!params.types_to_download.Empty()) { |
287 DCHECK(!restricted_routes.empty()); | 287 pending_configure_params_.reset(new ConfigurationParams(params)); |
288 pending_configure_job_.reset(new SyncSessionJob( | |
289 SyncSessionJob::CONFIGURATION, | |
290 TimeTicks::Now(), | |
291 SyncSourceInfo(params.source, | |
292 ModelSafeRoutingInfoToInvalidationMap( | |
293 restricted_routes, | |
294 std::string())), | |
295 params)); | |
296 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); | 288 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); |
297 | 289 |
298 // If we failed, the job would have been saved as the pending configure | 290 // If we failed, the job would have been saved as the pending configure |
299 // job and a wait interval would have been set. | 291 // job and a wait interval would have been set. |
300 if (!succeeded) { | 292 if (!succeeded) { |
301 DCHECK(pending_configure_job_); | 293 DCHECK(pending_configure_params_); |
302 return false; | |
303 } else { | 294 } else { |
304 DCHECK(!pending_configure_job_); | 295 DCHECK(!pending_configure_params_); |
305 } | 296 } |
| 297 return succeeded; |
306 } else { | 298 } else { |
307 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 299 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
308 params.ready_task.Run(); | 300 params.ready_task.Run(); |
309 } | 301 } |
310 | 302 |
311 return true; | 303 return true; |
312 } | 304 } |
313 | 305 |
314 SyncSchedulerImpl::JobProcessDecision | 306 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { |
315 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, | |
316 JobPriority priority) { | |
317 DCHECK(CalledOnValidThread()); | 307 DCHECK(CalledOnValidThread()); |
318 DCHECK(wait_interval_.get()); | 308 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) { |
319 DCHECK_NE(job.purpose(), SyncSessionJob::POLL); | 309 SDVLOG(1) << "Unable to run a job because we're throttled."; |
| 310 return false; |
| 311 } |
320 | 312 |
321 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 313 if (wait_interval_ |
322 << WaitInterval::GetModeString(wait_interval_->mode) | 314 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF |
323 << ((priority == CANARY_PRIORITY) ? " (canary)" : ""); | 315 && priority != CANARY_PRIORITY) { |
| 316 SDVLOG(1) << "Unable to run a job because we're backing off."; |
| 317 return false; |
| 318 } |
324 | 319 |
325 // If we save a job while in a WaitInterval, there is a well-defined moment | 320 if (session_context_->connection_manager()->HasInvalidAuthToken()) { |
326 // in time in the future when it makes sense for that SAVE-worthy job to try | 321 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; |
327 // running again -- the end of the WaitInterval. | 322 return false; |
328 DCHECK(job.purpose() == SyncSessionJob::NUDGE || | 323 } |
329 job.purpose() == SyncSessionJob::CONFIGURATION); | |
330 | 324 |
331 // If throttled, there's a clock ticking to unthrottle. We want to get | 325 return true; |
332 // on the same train. | |
333 if (wait_interval_->mode == WaitInterval::THROTTLED) | |
334 return SAVE; | |
335 | |
336 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | |
337 if (job.purpose() == SyncSessionJob::NUDGE) { | |
338 if (mode_ == CONFIGURATION_MODE) | |
339 return SAVE; | |
340 | |
341 if (priority == NORMAL_PRIORITY) | |
342 return DROP; | |
343 else // Either backoff has ended, or we have permission to bypass it. | |
344 return CONTINUE; | |
345 } | |
346 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; | |
347 } | 326 } |
348 | 327 |
349 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 328 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) { |
350 const SyncSessionJob& job, | |
351 JobPriority priority) { | |
352 DCHECK(CalledOnValidThread()); | 329 DCHECK(CalledOnValidThread()); |
353 | 330 |
354 // POLL jobs do not call this function. | 331 if (!CanRunJobNow(priority)) { |
355 DCHECK(job.purpose() == SyncSessionJob::NUDGE || | 332 SDVLOG(1) << "Unable to run a nudge job right now"; |
356 job.purpose() == SyncSessionJob::CONFIGURATION); | 333 return false; |
| 334 } |
357 | 335 |
358 // See if our type is throttled. | 336 // If all types are throttled, do not continue. Today, we don't treat a |
| 337 // per-datatype "unthrottle" event as something that should force a canary |
| 338 // job. For this reason, there's no good time to reschedule this job to run |
| 339 // -- we'll lazily wait for an independent event to trigger a sync. |
359 ModelTypeSet throttled_types = | 340 ModelTypeSet throttled_types = |
360 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 341 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
361 if (job.purpose() == SyncSessionJob::NUDGE && | 342 if (!nudge_tracker_.GetLocallyModifiedTypes().Empty() && |
362 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { | 343 throttled_types.HasAll(nudge_tracker_.GetLocallyModifiedTypes())) { |
363 ModelTypeSet requested_types; | 344 // TODO(sync): Throttled types should be pruned from the sources list. |
364 for (ModelTypeInvalidationMap::const_iterator i = | 345 SDVLOG(1) << "Not running a nudge because we're fully datatype throttled."; |
365 job.source_info().types.begin(); i != job.source_info().types.end(); | 346 return false; |
366 ++i) { | |
367 requested_types.Put(i->first); | |
368 } | |
369 | |
370 // If all types are throttled, do not CONTINUE. Today, we don't treat | |
371 // a per-datatype "unthrottle" event as something that should force a | |
372 // canary job. For this reason, there's no good time to reschedule this job | |
373 // to run -- we'll lazily wait for an independent event to trigger a sync. | |
374 // Note that there may already be such an event if we're in a WaitInterval, | |
375 // so we can retry it then. | |
376 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | |
377 return DROP; // TODO(tim): Don't drop. http://crbug.com/177659 | |
378 } | 347 } |
379 | 348 |
380 if (wait_interval_.get()) | |
381 return DecideWhileInWaitInterval(job, priority); | |
382 | |
383 if (mode_ == CONFIGURATION_MODE) { | 349 if (mode_ == CONFIGURATION_MODE) { |
384 if (job.purpose() == SyncSessionJob::NUDGE) | 350 SDVLOG(1) << "Not running nudge because we're in configuration mode."; |
385 return SAVE; // Running requires a mode switch. | 351 return false; |
386 else // Implies job.purpose() == SyncSessionJob::CONFIGURATION. | |
387 return CONTINUE; | |
388 } | 352 } |
389 | 353 |
390 // We are in normal mode. | 354 return true; |
391 DCHECK_EQ(mode_, NORMAL_MODE); | |
392 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION); | |
393 | |
394 // Note about some subtle scheduling semantics. | |
395 // | |
396 // It's possible at this point that |job| is known to be unnecessary, and | |
397 // dropping it would be perfectly safe and correct. Consider | |
398 // | |
399 // 1) |job| is a NUDGE (for any combination of types) with a | |
400 // |scheduled_start| time that is less than the time that the last | |
401 // successful all-datatype NUDGE completed, and it has a NOTIFICATION | |
402 // GetUpdatesCallerInfo value yet offers no new notification hint. | |
403 // | |
404 // 2) |job| is a NUDGE with a |scheduled_start| time that is less than | |
405 // the time that the last successful matching-datatype NUDGE completed, | |
406 // and payloads (hints) are identical to that last successful NUDGE. | |
407 // | |
408 // We avoid cases 1 and 2 by externally synchronizing NUDGE requests -- | |
409 // scheduling a NUDGE requires command of the sync thread, which is | |
410 // impossible* from outside of SyncScheduler if a NUDGE is taking place. | |
411 // And if you have command of the sync thread when scheduling a NUDGE and a | |
412 // previous NUDGE exists, they will be coalesced and the stale job will be | |
413 // cancelled via the session-equality check in DoSyncSessionJob. | |
414 // | |
415 // * It's not strictly "impossible", but it would be reentrant and hence | |
416 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | |
417 // legal side effect of any of the work being done as part of a sync cycle. | |
418 // See |no_scheduling_allowed_| for details. | |
419 | |
420 // Decision now rests on state of auth tokens. | |
421 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | |
422 return CONTINUE; | |
423 | |
424 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | |
425 // Running the job would require updated auth, so we can't honour | |
426 // job.scheduled_start(). | |
427 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | |
428 } | 355 } |
429 | 356 |
430 void SyncSchedulerImpl::ScheduleNudgeAsync( | 357 void SyncSchedulerImpl::ScheduleNudgeAsync( |
431 const TimeDelta& desired_delay, | 358 const TimeDelta& desired_delay, |
432 NudgeSource source, ModelTypeSet types, | 359 NudgeSource source, ModelTypeSet types, |
433 const tracked_objects::Location& nudge_location) { | 360 const tracked_objects::Location& nudge_location) { |
434 DCHECK(CalledOnValidThread()); | 361 DCHECK(CalledOnValidThread()); |
435 SDVLOG_LOC(nudge_location, 2) | 362 SDVLOG_LOC(nudge_location, 2) |
436 << "Nudge scheduled with delay " | 363 << "Nudge scheduled with delay " |
437 << desired_delay.InMilliseconds() << " ms, " | 364 << desired_delay.InMilliseconds() << " ms, " |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
489 SDVLOG_LOC(nudge_location, 2) | 416 SDVLOG_LOC(nudge_location, 2) |
490 << "In ScheduleNudgeImpl with delay " | 417 << "In ScheduleNudgeImpl with delay " |
491 << delay.InMilliseconds() << " ms, " | 418 << delay.InMilliseconds() << " ms, " |
492 << "source " << GetUpdatesSourceString(source) << ", " | 419 << "source " << GetUpdatesSourceString(source) << ", " |
493 << "payloads " | 420 << "payloads " |
494 << ModelTypeInvalidationMapToString(invalidation_map); | 421 << ModelTypeInvalidationMapToString(invalidation_map); |
495 | 422 |
496 SyncSourceInfo info(source, invalidation_map); | 423 SyncSourceInfo info(source, invalidation_map); |
497 UpdateNudgeTimeRecords(info); | 424 UpdateNudgeTimeRecords(info); |
498 | 425 |
499 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 426 // Coalesce the new nudge information with any existing information. |
500 SyncSessionJob::NUDGE, | 427 nudge_tracker_.CoalesceSources(info); |
501 TimeTicks::Now() + delay, | 428 |
502 info, | 429 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) |
503 ConfigurationParams())); | 430 return; |
504 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); | 431 |
505 SDVLOG(2) << "Should run " | 432 if (!started_) { |
506 << SyncSessionJob::GetPurposeString(job->purpose()) | 433 SDVLOG_LOC(nudge_location, 2) |
507 << " in mode " << GetModeString(mode_) | 434 << "Schedule not started; not running a nudge."; |
508 << ": " << GetDecisionString(decision); | |
509 if (decision == DROP) { | |
510 return; | 435 return; |
511 } | 436 } |
512 | 437 |
513 // Try to coalesce in both SAVE and CONTINUE cases. | 438 TimeTicks incoming_run_time = TimeTicks::Now() + delay; |
514 if (pending_nudge_job_) { | 439 if (!scheduled_nudge_time_.is_null() && |
515 pending_nudge_job_->CoalesceSources(job->source_info()); | 440 (scheduled_nudge_time_ < incoming_run_time)) { |
516 if (decision == CONTINUE) { | 441 // Old job arrives sooner than this one. Don't reschedule it. |
517 // Only update the scheduled_start if we're going to reschedule. | |
518 pending_nudge_job_->set_scheduled_start( | |
519 std::min(job->scheduled_start(), | |
520 pending_nudge_job_->scheduled_start())); | |
521 } | |
522 } else { | |
523 pending_nudge_job_ = job.Pass(); | |
524 } | |
525 | |
526 if (decision == SAVE) { | |
527 return; | 442 return; |
528 } | 443 } |
529 | 444 |
530 TimeDelta run_delay = | 445 // Either there is no existing nudge in flight or the incoming nudge should be |
531 pending_nudge_job_->scheduled_start() - TimeTicks::Now(); | 446 // made to arrive first (preempt) the existing nudge. We reschedule in either |
532 if (run_delay < TimeDelta::FromMilliseconds(0)) | 447 // case. |
533 run_delay = TimeDelta::FromMilliseconds(0); | |
534 SDVLOG_LOC(nudge_location, 2) | 448 SDVLOG_LOC(nudge_location, 2) |
535 << "Scheduling a nudge with " | 449 << "Scheduling a nudge with " |
536 << run_delay.InMilliseconds() << " ms delay"; | 450 << delay.InMilliseconds() << " ms delay"; |
537 | 451 scheduled_nudge_time_ = incoming_run_time; |
538 if (started_) { | 452 pending_wakeup_timer_.Start( |
539 pending_wakeup_timer_.Start( | 453 nudge_location, |
540 nudge_location, | 454 delay, |
541 run_delay, | 455 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob, |
542 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob, | 456 weak_ptr_factory_.GetWeakPtr(), |
543 weak_ptr_factory_.GetWeakPtr(), | 457 NORMAL_PRIORITY)); |
544 NORMAL_PRIORITY)); | |
545 } | |
546 } | 458 } |
547 | 459 |
548 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 460 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
549 switch (mode) { | 461 switch (mode) { |
550 ENUM_CASE(CONFIGURATION_MODE); | 462 ENUM_CASE(CONFIGURATION_MODE); |
551 ENUM_CASE(NORMAL_MODE); | 463 ENUM_CASE(NORMAL_MODE); |
552 } | 464 } |
553 return ""; | 465 return ""; |
554 } | 466 } |
555 | 467 |
556 const char* SyncSchedulerImpl::GetDecisionString( | 468 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { |
557 SyncSchedulerImpl::JobProcessDecision mode) { | 469 DCHECK(CalledOnValidThread()); |
558 switch (mode) { | 470 |
559 ENUM_CASE(CONTINUE); | 471 if (!CanRunNudgeJobNow(priority)) |
560 ENUM_CASE(SAVE); | 472 return; |
561 ENUM_CASE(DROP); | 473 |
| 474 DVLOG(2) << "Will run normal mode sync cycle with routing info " |
| 475 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
| 476 SyncSession session(session_context_, this, nudge_tracker_.source_info()); |
| 477 bool premature_exit = !syncer_->SyncShare(&session, SYNCER_BEGIN, SYNCER_END); |
| 478 AdjustPolling(FORCE_RESET); |
| 479 |
| 480 bool success = !premature_exit |
| 481 && !sessions::HasSyncerError( |
| 482 session.status_controller().model_neutral_state()); |
| 483 |
| 484 if (success) { |
| 485 // That cycle took care of any outstanding work we had. |
| 486 SDVLOG(2) << "Nudge succeeded."; |
| 487 nudge_tracker_.Reset(); |
| 488 scheduled_nudge_time_ = base::TimeTicks(); |
| 489 |
| 490 // If we're here, then we successfully reached the server. End all backoff. |
| 491 wait_interval_.reset(); |
| 492 NotifyRetryTime(base::Time()); |
| 493 return; |
| 494 } else { |
| 495 HandleFailure(session.status_controller().model_neutral_state()); |
562 } | 496 } |
563 return ""; | |
564 } | 497 } |
565 | 498 |
566 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, | 499 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { |
567 JobPriority priority) { | |
568 DCHECK(CalledOnValidThread()); | 500 DCHECK(CalledOnValidThread()); |
| 501 DCHECK_EQ(mode_, CONFIGURATION_MODE); |
569 | 502 |
570 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 503 if (!CanRunJobNow(priority)) { |
571 JobProcessDecision decision = DecideOnJob(*job, priority); | 504 SDVLOG(2) << "Unable to run configure job right now."; |
572 SDVLOG(2) << "Should run " | |
573 << SyncSessionJob::GetPurposeString(job->purpose()) | |
574 << " in mode " << GetModeString(mode_) | |
575 << " with source " << job->source_info().updates_source | |
576 << ": " << GetDecisionString(decision); | |
577 if (decision != CONTINUE) { | |
578 if (decision == SAVE) { | |
579 if (job->purpose() == SyncSessionJob::CONFIGURATION) { | |
580 pending_configure_job_ = job.Pass(); | |
581 } else { | |
582 pending_nudge_job_ = job.Pass(); | |
583 } | |
584 } else { | |
585 DCHECK_EQ(decision, DROP); | |
586 } | |
587 return false; | 505 return false; |
588 } | 506 } |
589 | 507 |
590 DVLOG(2) << "Creating sync session with routes " | 508 SDVLOG(2) << "Will run configure SyncShare with routes " |
591 << ModelSafeRoutingInfoToString(session_context_->routing_info()) | 509 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
592 << "and purpose " << job->purpose(); | 510 SyncSourceInfo source_info(pending_configure_params_->source, |
593 SyncSession session(session_context_, this, job->source_info()); | 511 ModelSafeRoutingInfoToInvalidationMap( |
| 512 session_context_->routing_info(), |
| 513 std::string())); |
| 514 SyncSession session(session_context_, this, source_info); |
594 bool premature_exit = !syncer_->SyncShare(&session, | 515 bool premature_exit = !syncer_->SyncShare(&session, |
595 job->start_step(), | 516 DOWNLOAD_UPDATES, |
596 job->end_step()); | 517 APPLY_UPDATES); |
597 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 518 AdjustPolling(FORCE_RESET); |
598 | 519 |
599 bool success = FinishSyncSessionJob(job.get(), | 520 bool success = !premature_exit |
600 premature_exit, | 521 && !sessions::HasSyncerError( |
601 &session); | 522 session.status_controller().model_neutral_state()); |
602 | 523 |
603 if (IsSyncingCurrentlySilenced()) { | 524 if (success) { |
604 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | 525 SDVLOG(2) << "Configure succeeded."; |
605 // If we're here, it's because |job| was silenced until a server specified | 526 pending_configure_params_->ready_task.Run(); |
606 // time. (Note, it had to be |job|, because DecideOnJob would not permit | 527 pending_configure_params_.reset(); |
607 // any job through while in WaitInterval::THROTTLED). | |
608 if (job->purpose() == SyncSessionJob::NUDGE) | |
609 pending_nudge_job_ = job.Pass(); | |
610 else if (job->purpose() == SyncSessionJob::CONFIGURATION) | |
611 pending_configure_job_ = job.Pass(); | |
612 else | |
613 NOTREACHED(); | |
614 | 528 |
615 RestartWaiting(); | 529 // If we're here, then we successfully reached the server. End all backoff. |
616 return success; | 530 wait_interval_.reset(); |
| 531 NotifyRetryTime(base::Time()); |
| 532 return true; |
| 533 } else { |
| 534 HandleFailure(session.status_controller().model_neutral_state()); |
| 535 return false; |
617 } | 536 } |
618 | |
619 if (!success) | |
620 ScheduleNextSync(job.Pass(), &session); | |
621 | |
622 return success; | |
623 } | 537 } |
624 | 538 |
625 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { | 539 void SyncSchedulerImpl::HandleFailure( |
626 DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority); | 540 const sessions::ModelNeutralState& model_neutral_state) { |
627 } | 541 if (IsSyncingCurrentlySilenced()) { |
628 | 542 SDVLOG(2) << "Was throttled during previous sync cycle."; |
629 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { | 543 RestartWaiting(); |
630 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority); | 544 } else { |
631 } | 545 UpdateExponentialBackoff(model_neutral_state); |
632 | 546 SDVLOG(2) << "Sync cycle failed. Will back off for " |
633 bool SyncSchedulerImpl::ShouldPoll() { | 547 << wait_interval_->length.InMilliseconds() << "ms."; |
634 if (wait_interval_.get()) { | 548 RestartWaiting(); |
635 SDVLOG(2) << "Not running poll in wait interval."; | |
636 return false; | |
637 } | 549 } |
638 | |
639 if (mode_ == CONFIGURATION_MODE) { | |
640 SDVLOG(2) << "Not running poll in configuration mode."; | |
641 return false; | |
642 } | |
643 | |
644 // TODO(rlarocque): Refactor decision-making logic common to all types | |
645 // of jobs into a shared function. | |
646 | |
647 if (session_context_->connection_manager()->HasInvalidAuthToken()) { | |
648 SDVLOG(2) << "Not running poll because auth token is invalid."; | |
649 return false; | |
650 } | |
651 | |
652 return true; | |
653 } | 550 } |
654 | 551 |
655 void SyncSchedulerImpl::DoPollSyncSessionJob() { | 552 void SyncSchedulerImpl::DoPollSyncSessionJob() { |
656 ModelSafeRoutingInfo r; | 553 ModelSafeRoutingInfo r; |
657 ModelTypeInvalidationMap invalidation_map = | 554 ModelTypeInvalidationMap invalidation_map = |
658 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | 555 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
659 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | 556 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
660 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
661 TimeTicks::Now(), | |
662 info, | |
663 ConfigurationParams())); | |
664 | |
665 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 557 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
666 | 558 |
667 if (!ShouldPoll()) | 559 if (!CanRunJobNow(NORMAL_PRIORITY)) { |
| 560 SDVLOG(2) << "Unable to run a poll job right now."; |
668 return; | 561 return; |
| 562 } |
669 | 563 |
670 DVLOG(2) << "Polling with routes " | 564 if (mode_ != NORMAL_MODE) { |
| 565 SDVLOG(2) << "Not running poll job in configure mode."; |
| 566 return; |
| 567 } |
| 568 |
| 569 SDVLOG(2) << "Polling with routes " |
671 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 570 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
672 SyncSession session(session_context_, this, job->source_info()); | 571 SyncSession session(session_context_, this, info); |
673 bool premature_exit = !syncer_->SyncShare(&session, | 572 syncer_->SyncShare(&session, SYNCER_BEGIN, SYNCER_END); |
674 job->start_step(), | |
675 job->end_step()); | |
676 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | |
677 | 573 |
678 FinishSyncSessionJob(job.get(), premature_exit, &session); | 574 AdjustPolling(UPDATE_INTERVAL); |
679 | 575 |
680 if (IsSyncingCurrentlySilenced()) { | 576 if (IsSyncingCurrentlySilenced()) { |
681 // Normally we would only call RestartWaiting() if we had a | 577 SDVLOG(2) << "Poll request got us throttled."; |
682 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's | 578 // The OnSilencedUntil() call set up the WaitInterval for us. All we need |
683 // possible that neither is set. We create the wait interval anyway because | 579 // to do is start the timer. |
684 // we need it to make sure we get unthrottled on time. | |
685 RestartWaiting(); | 580 RestartWaiting(); |
686 } | 581 } |
687 } | 582 } |
688 | 583 |
689 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 584 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
690 DCHECK(CalledOnValidThread()); | 585 DCHECK(CalledOnValidThread()); |
691 | 586 |
692 // We are interested in recording time between local nudges for datatypes. | 587 // We are interested in recording time between local nudges for datatypes. |
693 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 588 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
694 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 589 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
695 return; | 590 return; |
696 | 591 |
697 base::TimeTicks now = TimeTicks::Now(); | 592 base::TimeTicks now = TimeTicks::Now(); |
698 // Update timing information for how often datatypes are triggering nudges. | 593 // Update timing information for how often datatypes are triggering nudges. |
699 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 594 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
700 iter != info.types.end(); | 595 iter != info.types.end(); |
701 ++iter) { | 596 ++iter) { |
702 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 597 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
703 last_local_nudges_by_model_type_[iter->first] = now; | 598 last_local_nudges_by_model_type_[iter->first] = now; |
704 if (previous.is_null()) | 599 if (previous.is_null()) |
705 continue; | 600 continue; |
706 | 601 |
707 #define PER_DATA_TYPE_MACRO(type_str) \ | 602 #define PER_DATA_TYPE_MACRO(type_str) \ |
708 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 603 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
709 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 604 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
710 #undef PER_DATA_TYPE_MACRO | 605 #undef PER_DATA_TYPE_MACRO |
711 } | 606 } |
712 } | 607 } |
713 | 608 |
714 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, | 609 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { |
715 bool exited_prematurely, | |
716 SyncSession* session) { | |
717 DCHECK(CalledOnValidThread()); | |
718 | |
719 // Let job know that we're through syncing (calling SyncShare) at this point. | |
720 bool succeeded = false; | |
721 { | |
722 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | |
723 succeeded = job->Finish(exited_prematurely, session); | |
724 } | |
725 | |
726 SDVLOG(2) << "Updating the next polling time after SyncMain"; | |
727 | |
728 AdjustPolling(job); | |
729 | |
730 if (succeeded) { | |
731 // No job currently supported by the scheduler could succeed without | |
732 // successfully reaching the server. Therefore, if we make it here, it is | |
733 // appropriate to reset the backoff interval. | |
734 wait_interval_.reset(); | |
735 NotifyRetryTime(base::Time()); | |
736 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | |
737 } | |
738 | |
739 return succeeded; | |
740 } | |
741 | |
742 void SyncSchedulerImpl::ScheduleNextSync( | |
743 scoped_ptr<SyncSessionJob> finished_job, | |
744 SyncSession* session) { | |
745 DCHECK(CalledOnValidThread()); | |
746 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION | |
747 || finished_job->purpose() == SyncSessionJob::NUDGE); | |
748 | |
749 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | |
750 // if we don't succeed. Some types of errors are not likely to disappear on | |
751 // their own. With the return values now available in the old_job.session, | |
752 // we should be able to detect such errors and only retry when we detect | |
753 // transient errors. | |
754 | |
755 SDVLOG(2) << "SyncShare job failed; will start or update backoff"; | |
756 HandleContinuationError(finished_job.Pass(), session); | |
757 } | |
758 | |
759 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | |
760 DCHECK(CalledOnValidThread()); | 610 DCHECK(CalledOnValidThread()); |
761 | 611 |
762 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 612 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
763 syncer_short_poll_interval_seconds_ : | 613 syncer_short_poll_interval_seconds_ : |
764 syncer_long_poll_interval_seconds_; | 614 syncer_long_poll_interval_seconds_; |
765 bool rate_changed = !poll_timer_.IsRunning() || | 615 bool rate_changed = !poll_timer_.IsRunning() || |
766 poll != poll_timer_.GetCurrentDelay(); | 616 poll != poll_timer_.GetCurrentDelay(); |
767 | 617 |
768 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) | 618 if (type == FORCE_RESET && !rate_changed) |
769 poll_timer_.Reset(); | 619 poll_timer_.Reset(); |
770 | 620 |
771 if (!rate_changed) | 621 if (!rate_changed) |
772 return; | 622 return; |
773 | 623 |
774 // Adjust poll rate. | 624 // Adjust poll rate. |
775 poll_timer_.Stop(); | 625 poll_timer_.Stop(); |
776 poll_timer_.Start(FROM_HERE, poll, this, | 626 poll_timer_.Start(FROM_HERE, poll, this, |
777 &SyncSchedulerImpl::PollTimerCallback); | 627 &SyncSchedulerImpl::PollTimerCallback); |
778 } | 628 } |
779 | 629 |
780 void SyncSchedulerImpl::RestartWaiting() { | 630 void SyncSchedulerImpl::RestartWaiting() { |
781 CHECK(wait_interval_.get()); | 631 CHECK(wait_interval_.get()); |
782 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | 632 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
| 633 NotifyRetryTime(base::Time::Now() + wait_interval_->length); |
| 634 SDVLOG(2) << "Starting WaitInterval timer of length " |
| 635 << wait_interval_->length.InMilliseconds() << "ms."; |
783 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 636 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
784 pending_wakeup_timer_.Start( | 637 pending_wakeup_timer_.Start( |
785 FROM_HERE, | 638 FROM_HERE, |
786 wait_interval_->length, | 639 wait_interval_->length, |
787 base::Bind(&SyncSchedulerImpl::Unthrottle, | 640 base::Bind(&SyncSchedulerImpl::Unthrottle, |
788 weak_ptr_factory_.GetWeakPtr())); | 641 weak_ptr_factory_.GetWeakPtr())); |
789 } else { | 642 } else { |
790 pending_wakeup_timer_.Start( | 643 pending_wakeup_timer_.Start( |
791 FROM_HERE, | 644 FROM_HERE, |
792 wait_interval_->length, | 645 wait_interval_->length, |
793 base::Bind(&SyncSchedulerImpl::TryCanaryJob, | 646 base::Bind(&SyncSchedulerImpl::TryCanaryJob, |
794 weak_ptr_factory_.GetWeakPtr())); | 647 weak_ptr_factory_.GetWeakPtr())); |
795 } | 648 } |
796 } | 649 } |
797 | 650 |
798 void SyncSchedulerImpl::HandleContinuationError( | 651 void SyncSchedulerImpl::UpdateExponentialBackoff( |
799 scoped_ptr<SyncSessionJob> old_job, | 652 const sessions::ModelNeutralState& model_neutral_state) { |
800 SyncSession* session) { | |
801 DCHECK(CalledOnValidThread()); | 653 DCHECK(CalledOnValidThread()); |
802 | 654 |
803 TimeDelta length = delay_provider_->GetDelay( | 655 TimeDelta length = delay_provider_->GetDelay( |
804 IsBackingOff() ? wait_interval_->length : | 656 IsBackingOff() ? wait_interval_->length : |
805 delay_provider_->GetInitialDelay( | 657 delay_provider_->GetInitialDelay(model_neutral_state)); |
806 session->status_controller().model_neutral_state())); | |
807 | |
808 SDVLOG(2) << "In handle continuation error with " | |
809 << SyncSessionJob::GetPurposeString(old_job->purpose()) | |
810 << " job. The time delta(ms) is " | |
811 << length.InMilliseconds(); | |
812 | |
813 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 658 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
814 length)); | 659 length)); |
815 NotifyRetryTime(base::Time::Now() + length); | |
816 old_job->set_scheduled_start(TimeTicks::Now() + length); | |
817 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | |
818 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | |
819 // Config params should always get set. | |
820 DCHECK(!old_job->config_params().ready_task.is_null()); | |
821 DCHECK(!pending_configure_job_); | |
822 pending_configure_job_ = old_job.Pass(); | |
823 } else { | |
824 // We're not in configure mode so we should not have a configure job. | |
825 DCHECK(!pending_configure_job_); | |
826 DCHECK(!pending_nudge_job_); | |
827 pending_nudge_job_ = old_job.Pass(); | |
828 } | |
829 | |
830 RestartWaiting(); | |
831 } | 660 } |
832 | 661 |
833 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 662 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
834 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 663 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
835 DCHECK(weak_handle_this_.IsInitialized()); | 664 DCHECK(weak_handle_this_.IsInitialized()); |
836 SDVLOG(3) << "Posting StopImpl"; | 665 SDVLOG(3) << "Posting StopImpl"; |
837 weak_handle_this_.Call(FROM_HERE, | 666 weak_handle_this_.Call(FROM_HERE, |
838 &SyncSchedulerImpl::StopImpl, | 667 &SyncSchedulerImpl::StopImpl, |
839 callback); | 668 callback); |
840 } | 669 } |
841 | 670 |
842 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 671 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
843 DCHECK(CalledOnValidThread()); | 672 DCHECK(CalledOnValidThread()); |
844 SDVLOG(2) << "StopImpl called"; | 673 SDVLOG(2) << "StopImpl called"; |
845 | 674 |
846 // Kill any in-flight method calls. | 675 // Kill any in-flight method calls. |
847 weak_ptr_factory_.InvalidateWeakPtrs(); | 676 weak_ptr_factory_.InvalidateWeakPtrs(); |
848 wait_interval_.reset(); | 677 wait_interval_.reset(); |
849 NotifyRetryTime(base::Time()); | 678 NotifyRetryTime(base::Time()); |
850 poll_timer_.Stop(); | 679 poll_timer_.Stop(); |
851 pending_wakeup_timer_.Stop(); | 680 pending_wakeup_timer_.Stop(); |
852 pending_nudge_job_.reset(); | 681 pending_configure_params_.reset(); |
853 pending_configure_job_.reset(); | 682 if (started_) |
854 if (started_) { | |
855 started_ = false; | 683 started_ = false; |
856 } | |
857 if (!callback.is_null()) | 684 if (!callback.is_null()) |
858 callback.Run(); | 685 callback.Run(); |
859 } | 686 } |
860 | 687 |
861 // This is the only place where we invoke DoSyncSessionJob with canary | 688 // This is the only place where we invoke DoSyncSessionJob with canary |
862 // privileges. Everyone else should use NORMAL_PRIORITY. | 689 // privileges. Everyone else should use NORMAL_PRIORITY. |
863 void SyncSchedulerImpl::TryCanaryJob() { | 690 void SyncSchedulerImpl::TryCanaryJob() { |
864 DCHECK(CalledOnValidThread()); | 691 DCHECK(CalledOnValidThread()); |
865 | 692 |
866 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { | 693 if (mode_ == CONFIGURATION_MODE && pending_configure_params_) { |
867 SDVLOG(2) << "Found pending configure job; will run as canary"; | 694 SDVLOG(2) << "Found pending configure job; will run as canary"; |
868 DoConfigurationSyncSessionJob(CANARY_PRIORITY); | 695 DoConfigurationSyncSessionJob(CANARY_PRIORITY); |
869 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { | 696 } else if (mode_ == NORMAL_MODE && !nudge_tracker_.IsEmpty()) { |
870 SDVLOG(2) << "Found pending nudge job; will run as canary"; | 697 SDVLOG(2) << "Found pending nudge job; will run as canary"; |
871 DoNudgeSyncSessionJob(CANARY_PRIORITY); | 698 DoNudgeSyncSessionJob(CANARY_PRIORITY); |
872 } else { | 699 } else { |
873 SDVLOG(2) << "Found no work to do; will not run a canary"; | 700 SDVLOG(2) << "Found no work to do; will not run a canary"; |
874 } | 701 } |
875 } | 702 } |
876 | 703 |
877 void SyncSchedulerImpl::PollTimerCallback() { | 704 void SyncSchedulerImpl::PollTimerCallback() { |
878 DCHECK(CalledOnValidThread()); | 705 DCHECK(CalledOnValidThread()); |
879 if (no_scheduling_allowed_) { | 706 if (no_scheduling_allowed_) { |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
995 | 822 |
996 #undef SDVLOG_LOC | 823 #undef SDVLOG_LOC |
997 | 824 |
998 #undef SDVLOG | 825 #undef SDVLOG |
999 | 826 |
1000 #undef SLOG | 827 #undef SLOG |
1001 | 828 |
1002 #undef ENUM_CASE | 829 #undef ENUM_CASE |
1003 | 830 |
1004 } // namespace syncer | 831 } // namespace syncer |
OLD | NEW |