Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1292)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/sync_scheduler_impl.h"
6
7 #include <algorithm>
8 #include <cstring>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/single_thread_task_runner.h"
17 #include "base/threading/platform_thread.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "sync/engine/backoff_delay_provider.h"
20 #include "sync/engine/syncer.h"
21 #include "sync/protocol/proto_enum_conversions.h"
22 #include "sync/protocol/sync.pb.h"
23 #include "sync/util/data_type_histogram.h"
24 #include "sync/util/logging.h"
25
26 using base::TimeDelta;
27 using base::TimeTicks;
28
29 namespace syncer {
30
31 using sessions::SyncSession;
32 using sessions::SyncSessionSnapshot;
33 using sync_pb::GetUpdatesCallerInfo;
34
35 namespace {
36
37 bool IsConfigRelatedUpdateSourceValue(
38 GetUpdatesCallerInfo::GetUpdatesSource source) {
39 switch (source) {
40 case GetUpdatesCallerInfo::RECONFIGURATION:
41 case GetUpdatesCallerInfo::MIGRATION:
42 case GetUpdatesCallerInfo::NEW_CLIENT:
43 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
44 case GetUpdatesCallerInfo::PROGRAMMATIC:
45 return true;
46 default:
47 return false;
48 }
49 }
50
51 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
52 switch (error.error_type) {
53 case SYNC_SUCCESS:
54 case MIGRATION_DONE:
55 case THROTTLED:
56 case TRANSIENT_ERROR:
57 case PARTIAL_FAILURE:
58 return false;
59 case NOT_MY_BIRTHDAY:
60 case CLIENT_DATA_OBSOLETE:
61 case CLEAR_PENDING:
62 case DISABLED_BY_ADMIN:
63 // If we send terminate sync early then |sync_cycle_ended| notification
64 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
65 // notification wouldnt be sent either. Then the UI layer would be left
66 // waiting forever. So assert we would send something.
67 DCHECK_NE(error.action, UNKNOWN_ACTION);
68 return true;
69 case INVALID_CREDENTIAL:
70 // The notification for this is handled by PostAndProcessHeaders|.
71 // Server does no have to send any action for this.
72 return true;
73 // Make UNKNOWN_ERROR a NOTREACHED. All the other error should be explicitly
74 // handled.
75 case UNKNOWN_ERROR:
76 NOTREACHED();
77 return false;
78 }
79 return false;
80 }
81
82 bool IsActionableError(
83 const SyncProtocolError& error) {
84 return (error.action != UNKNOWN_ACTION);
85 }
86
87 void RunAndReset(base::Closure* task) {
88 DCHECK(task);
89 if (task->is_null())
90 return;
91 task->Run();
92 task->Reset();
93 }
94
95 } // namespace
96
97 ConfigurationParams::ConfigurationParams()
98 : source(GetUpdatesCallerInfo::UNKNOWN) {}
99 ConfigurationParams::ConfigurationParams(
100 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
101 ModelTypeSet types_to_download,
102 const ModelSafeRoutingInfo& routing_info,
103 const base::Closure& ready_task,
104 const base::Closure& retry_task)
105 : source(source),
106 types_to_download(types_to_download),
107 routing_info(routing_info),
108 ready_task(ready_task),
109 retry_task(retry_task) {
110 DCHECK(!ready_task.is_null());
111 }
112 ConfigurationParams::ConfigurationParams(const ConfigurationParams& other) =
113 default;
114 ConfigurationParams::~ConfigurationParams() {}
115
116 ClearParams::ClearParams(const base::Closure& report_success_task)
117 : report_success_task(report_success_task) {
118 DCHECK(!report_success_task.is_null());
119 }
120 ClearParams::ClearParams(const ClearParams& other) = default;
121 ClearParams::~ClearParams() {}
122
123 SyncSchedulerImpl::WaitInterval::WaitInterval()
124 : mode(UNKNOWN) {}
125
126 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
127 : mode(mode), length(length) {}
128
129 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
130
131 #define ENUM_CASE(x) case x: return #x; break;
132
133 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
134 switch (mode) {
135 ENUM_CASE(UNKNOWN);
136 ENUM_CASE(EXPONENTIAL_BACKOFF);
137 ENUM_CASE(THROTTLED);
138 }
139 NOTREACHED();
140 return "";
141 }
142
143 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
144 NudgeSource source) {
145 switch (source) {
146 case NUDGE_SOURCE_NOTIFICATION:
147 return GetUpdatesCallerInfo::NOTIFICATION;
148 case NUDGE_SOURCE_LOCAL:
149 return GetUpdatesCallerInfo::LOCAL;
150 case NUDGE_SOURCE_LOCAL_REFRESH:
151 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
152 case NUDGE_SOURCE_UNKNOWN:
153 return GetUpdatesCallerInfo::UNKNOWN;
154 default:
155 NOTREACHED();
156 return GetUpdatesCallerInfo::UNKNOWN;
157 }
158 }
159
160 // Helper macros to log with the syncer thread name; useful when there
161 // are multiple syncer threads involved.
162
163 #define SLOG(severity) LOG(severity) << name_ << ": "
164
165 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
166
167 #define SDVLOG_LOC(from_here, verbose_level) \
168 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
169
170 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
171 BackoffDelayProvider* delay_provider,
172 sessions::SyncSessionContext* context,
173 Syncer* syncer)
174 : name_(name),
175 started_(false),
176 syncer_short_poll_interval_seconds_(
177 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
178 syncer_long_poll_interval_seconds_(
179 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
180 mode_(CONFIGURATION_MODE),
181 delay_provider_(delay_provider),
182 syncer_(syncer),
183 session_context_(context),
184 next_sync_session_job_priority_(NORMAL_PRIORITY),
185 weak_ptr_factory_(this),
186 weak_ptr_factory_for_weak_handle_(this) {
187 weak_handle_this_ = MakeWeakHandle(
188 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
189 }
190
191 SyncSchedulerImpl::~SyncSchedulerImpl() {
192 DCHECK(CalledOnValidThread());
193 Stop();
194 }
195
196 void SyncSchedulerImpl::OnCredentialsUpdated() {
197 DCHECK(CalledOnValidThread());
198
199 if (HttpResponse::SYNC_AUTH_ERROR ==
200 session_context_->connection_manager()->server_status()) {
201 OnServerConnectionErrorFixed();
202 }
203 }
204
205 void SyncSchedulerImpl::OnConnectionStatusChange() {
206 if (HttpResponse::CONNECTION_UNAVAILABLE ==
207 session_context_->connection_manager()->server_status()) {
208 // Optimistically assume that the connection is fixed and try
209 // connecting.
210 OnServerConnectionErrorFixed();
211 }
212 }
213
214 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
215 // There could be a pending nudge or configuration job in several cases:
216 //
217 // 1. We're in exponential backoff.
218 // 2. We're silenced / throttled.
219 // 3. A nudge was saved previously due to not having a valid auth token.
220 // 4. A nudge was scheduled + saved while in configuration mode.
221 //
222 // In all cases except (2), we want to retry contacting the server. We
223 // call TryCanaryJob to achieve this, and note that nothing -- not even a
224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
225 // has the authority to do that is the Unthrottle timer.
226 TryCanaryJob();
227 }
228
229 void SyncSchedulerImpl::Start(Mode mode, base::Time last_poll_time) {
230 DCHECK(CalledOnValidThread());
231 std::string thread_name = base::PlatformThread::GetName();
232 if (thread_name.empty())
233 thread_name = "<Main thread>";
234 SDVLOG(2) << "Start called from thread "
235 << thread_name << " with mode " << GetModeString(mode);
236 if (!started_) {
237 started_ = true;
238 SendInitialSnapshot();
239 }
240
241 DCHECK(syncer_.get());
242
243 if (mode == CLEAR_SERVER_DATA_MODE) {
244 DCHECK_EQ(mode_, CONFIGURATION_MODE);
245 }
246 Mode old_mode = mode_;
247 mode_ = mode;
248 // Only adjust the poll reset time if it was valid and in the past.
249 if (!last_poll_time.is_null() && last_poll_time < base::Time::Now()) {
250 // Convert from base::Time to base::TimeTicks. The reason we use Time
251 // for persisting is that TimeTicks can stop making forward progress when
252 // the machine is suspended. This implies that on resume the client might
253 // actually have miss the real poll, unless the client is restarted. Fixing
254 // that would require using an AlarmTimer though, which is only supported
255 // on certain platforms.
256 last_poll_reset_ =
257 base::TimeTicks::Now() - (base::Time::Now() - last_poll_time);
258 }
259
260 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
261 // We just got back to normal mode. Let's try to run the work that was
262 // queued up while we were configuring.
263
264 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
265
266 // Update our current time before checking IsRetryRequired().
267 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
268 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
269 TrySyncSessionJob();
270 }
271 }
272 }
273
274 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
275 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
276 ModelTypeSet enabled_protocol_types =
277 Intersection(ProtocolTypes(), enabled_types);
278 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
279 return Difference(enabled_protocol_types, throttled_types);
280 }
281
282 void SyncSchedulerImpl::SendInitialSnapshot() {
283 DCHECK(CalledOnValidThread());
284 std::unique_ptr<SyncSession> dummy(
285 SyncSession::Build(session_context_, this));
286 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
287 event.snapshot = dummy->TakeSnapshot();
288 FOR_EACH_OBSERVER(SyncEngineEventListener,
289 *session_context_->listeners(),
290 OnSyncCycleEvent(event));
291 }
292
293 namespace {
294
295 // Helper to extract the routing info corresponding to types in
296 // |types_to_download| from |current_routes|.
297 void BuildModelSafeParams(
298 ModelTypeSet types_to_download,
299 const ModelSafeRoutingInfo& current_routes,
300 ModelSafeRoutingInfo* result_routes) {
301 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
302 iter.Inc()) {
303 ModelType type = iter.Get();
304 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
305 DCHECK(route != current_routes.end());
306 ModelSafeGroup group = route->second;
307 (*result_routes)[type] = group;
308 }
309 }
310
311 } // namespace.
312
313 void SyncSchedulerImpl::ScheduleConfiguration(
314 const ConfigurationParams& params) {
315 DCHECK(CalledOnValidThread());
316 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
317 DCHECK_EQ(CONFIGURATION_MODE, mode_);
318 DCHECK(!params.ready_task.is_null());
319 CHECK(started_) << "Scheduler must be running to configure.";
320 SDVLOG(2) << "Reconfiguring syncer.";
321
322 // Only one configuration is allowed at a time. Verify we're not waiting
323 // for a pending configure job.
324 DCHECK(!pending_configure_params_);
325
326 ModelSafeRoutingInfo restricted_routes;
327 BuildModelSafeParams(params.types_to_download,
328 params.routing_info,
329 &restricted_routes);
330 session_context_->SetRoutingInfo(restricted_routes);
331
332 // Only reconfigure if we have types to download.
333 if (!params.types_to_download.Empty()) {
334 pending_configure_params_.reset(new ConfigurationParams(params));
335 TrySyncSessionJob();
336 } else {
337 SDVLOG(2) << "No change in routing info, calling ready task directly.";
338 params.ready_task.Run();
339 }
340 }
341
342 void SyncSchedulerImpl::ScheduleClearServerData(const ClearParams& params) {
343 DCHECK(CalledOnValidThread());
344 DCHECK_EQ(CLEAR_SERVER_DATA_MODE, mode_);
345 DCHECK(!pending_configure_params_);
346 DCHECK(!params.report_success_task.is_null());
347 CHECK(started_) << "Scheduler must be running to clear.";
348 pending_clear_params_.reset(new ClearParams(params));
349 TrySyncSessionJob();
350 }
351
352 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
353 DCHECK(CalledOnValidThread());
354 if (IsCurrentlyThrottled()) {
355 SDVLOG(1) << "Unable to run a job because we're throttled.";
356 return false;
357 }
358
359 if (IsBackingOff() && priority != CANARY_PRIORITY) {
360 SDVLOG(1) << "Unable to run a job because we're backing off.";
361 return false;
362 }
363
364 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
365 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
366 return false;
367 }
368
369 return true;
370 }
371
372 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
373 DCHECK(CalledOnValidThread());
374
375 if (!CanRunJobNow(priority)) {
376 SDVLOG(1) << "Unable to run a nudge job right now";
377 return false;
378 }
379
380 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
381 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
382 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
383 return false;
384 }
385
386 if (mode_ != NORMAL_MODE) {
387 SDVLOG(1) << "Not running nudge because we're not in normal mode.";
388 return false;
389 }
390
391 return true;
392 }
393
394 void SyncSchedulerImpl::ScheduleLocalNudge(
395 ModelTypeSet types,
396 const tracked_objects::Location& nudge_location) {
397 DCHECK(CalledOnValidThread());
398 DCHECK(!types.Empty());
399
400 SDVLOG_LOC(nudge_location, 2)
401 << "Scheduling sync because of local change to "
402 << ModelTypeSetToString(types);
403 UpdateNudgeTimeRecords(types);
404 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
405 ScheduleNudgeImpl(nudge_delay, nudge_location);
406 }
407
408 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
409 ModelTypeSet types,
410 const tracked_objects::Location& nudge_location) {
411 DCHECK(CalledOnValidThread());
412 DCHECK(!types.Empty());
413
414 SDVLOG_LOC(nudge_location, 2)
415 << "Scheduling sync because of local refresh request for "
416 << ModelTypeSetToString(types);
417 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
418 ScheduleNudgeImpl(nudge_delay, nudge_location);
419 }
420
421 void SyncSchedulerImpl::ScheduleInvalidationNudge(
422 syncer::ModelType model_type,
423 std::unique_ptr<InvalidationInterface> invalidation,
424 const tracked_objects::Location& nudge_location) {
425 DCHECK(CalledOnValidThread());
426
427 SDVLOG_LOC(nudge_location, 2)
428 << "Scheduling sync because we received invalidation for "
429 << ModelTypeToString(model_type);
430 base::TimeDelta nudge_delay = nudge_tracker_.RecordRemoteInvalidation(
431 model_type, std::move(invalidation));
432 ScheduleNudgeImpl(nudge_delay, nudge_location);
433 }
434
435 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
436 DCHECK(CalledOnValidThread());
437
438 SDVLOG(2) << "Scheduling non-blocking initial sync for "
439 << ModelTypeToString(model_type);
440 nudge_tracker_.RecordInitialSyncRequired(model_type);
441 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
442 }
443
444 // TODO(zea): Consider adding separate throttling/backoff for datatype
445 // refresh requests.
446 void SyncSchedulerImpl::ScheduleNudgeImpl(
447 const TimeDelta& delay,
448 const tracked_objects::Location& nudge_location) {
449 DCHECK(CalledOnValidThread());
450 CHECK(!syncer_->IsSyncing());
451
452 if (!started_) {
453 SDVLOG_LOC(nudge_location, 2)
454 << "Dropping nudge, scheduler is not running.";
455 return;
456 }
457
458 SDVLOG_LOC(nudge_location, 2)
459 << "In ScheduleNudgeImpl with delay "
460 << delay.InMilliseconds() << " ms";
461
462 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
463 return;
464
465 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
466 if (!scheduled_nudge_time_.is_null() &&
467 (scheduled_nudge_time_ < incoming_run_time)) {
468 // Old job arrives sooner than this one. Don't reschedule it.
469 return;
470 }
471
472 // Either there is no existing nudge in flight or the incoming nudge should be
473 // made to arrive first (preempt) the existing nudge. We reschedule in either
474 // case.
475 SDVLOG_LOC(nudge_location, 2)
476 << "Scheduling a nudge with "
477 << delay.InMilliseconds() << " ms delay";
478 scheduled_nudge_time_ = incoming_run_time;
479 pending_wakeup_timer_.Start(
480 nudge_location,
481 delay,
482 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
483 weak_ptr_factory_.GetWeakPtr()));
484 }
485
486 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
487 switch (mode) {
488 ENUM_CASE(CONFIGURATION_MODE);
489 ENUM_CASE(CLEAR_SERVER_DATA_MODE);
490 ENUM_CASE(NORMAL_MODE);
491 }
492 return "";
493 }
494
495 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
496 DCHECK(CalledOnValidThread());
497 nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
498 }
499
500 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
501 DCHECK(CalledOnValidThread());
502 DCHECK(CanRunNudgeJobNow(priority));
503
504 DVLOG(2) << "Will run normal mode sync cycle with types "
505 << ModelTypeSetToString(session_context_->GetEnabledTypes());
506 std::unique_ptr<SyncSession> session(
507 SyncSession::Build(session_context_, this));
508 bool success = syncer_->NormalSyncShare(
509 GetEnabledAndUnthrottledTypes(), &nudge_tracker_, session.get());
510
511 if (success) {
512 // That cycle took care of any outstanding work we had.
513 SDVLOG(2) << "Nudge succeeded.";
514 nudge_tracker_.RecordSuccessfulSyncCycle();
515 scheduled_nudge_time_ = base::TimeTicks();
516 HandleSuccess();
517
518 // If this was a canary, we may need to restart the poll timer (the poll
519 // timer may have fired while the scheduler was in an error state, ignoring
520 // the poll).
521 if (!poll_timer_.IsRunning()) {
522 SDVLOG(1) << "Canary succeeded, restarting polling.";
523 AdjustPolling(UPDATE_INTERVAL);
524 }
525 } else {
526 HandleFailure(session->status_controller().model_neutral_state());
527 }
528 }
529
530 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
531 DCHECK(CalledOnValidThread());
532 DCHECK_EQ(mode_, CONFIGURATION_MODE);
533 DCHECK(pending_configure_params_ != NULL);
534
535 if (!CanRunJobNow(priority)) {
536 SDVLOG(2) << "Unable to run configure job right now.";
537 RunAndReset(&pending_configure_params_->retry_task);
538 return;
539 }
540
541 SDVLOG(2) << "Will run configure SyncShare with types "
542 << ModelTypeSetToString(session_context_->GetEnabledTypes());
543 std::unique_ptr<SyncSession> session(
544 SyncSession::Build(session_context_, this));
545 bool success = syncer_->ConfigureSyncShare(
546 pending_configure_params_->types_to_download,
547 pending_configure_params_->source,
548 session.get());
549
550 if (success) {
551 SDVLOG(2) << "Configure succeeded.";
552 pending_configure_params_->ready_task.Run();
553 pending_configure_params_.reset();
554 HandleSuccess();
555 } else {
556 HandleFailure(session->status_controller().model_neutral_state());
557 // Sync cycle might receive response from server that causes scheduler to
558 // stop and draws pending_configure_params_ invalid.
559 if (started_)
560 RunAndReset(&pending_configure_params_->retry_task);
561 }
562 }
563
564 void SyncSchedulerImpl::DoClearServerDataSyncSessionJob(JobPriority priority) {
565 DCHECK(CalledOnValidThread());
566 DCHECK_EQ(mode_, CLEAR_SERVER_DATA_MODE);
567
568 if (!CanRunJobNow(priority)) {
569 SDVLOG(2) << "Unable to run clear server data job right now.";
570 RunAndReset(&pending_configure_params_->retry_task);
571 return;
572 }
573
574 std::unique_ptr<SyncSession> session(
575 SyncSession::Build(session_context_, this));
576 const bool success = syncer_->PostClearServerData(session.get());
577 if (!success) {
578 HandleFailure(session->status_controller().model_neutral_state());
579 return;
580 }
581
582 SDVLOG(2) << "Clear succeeded.";
583 pending_clear_params_->report_success_task.Run();
584 pending_clear_params_.reset();
585 HandleSuccess();
586 }
587
588 void SyncSchedulerImpl::HandleSuccess() {
589 // If we're here, then we successfully reached the server. End all backoff.
590 wait_interval_.reset();
591 NotifyRetryTime(base::Time());
592 }
593
594 void SyncSchedulerImpl::HandleFailure(
595 const sessions::ModelNeutralState& model_neutral_state) {
596 if (IsCurrentlyThrottled()) {
597 SDVLOG(2) << "Was throttled during previous sync cycle.";
598 } else if (!IsBackingOff()) {
599 // Setup our backoff if this is our first such failure.
600 TimeDelta length = delay_provider_->GetDelay(
601 delay_provider_->GetInitialDelay(model_neutral_state));
602 wait_interval_.reset(
603 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
604 SDVLOG(2) << "Sync cycle failed. Will back off for "
605 << wait_interval_->length.InMilliseconds() << "ms.";
606 } else {
607 // Increase our backoff interval and schedule another retry.
608 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
609 wait_interval_.reset(
610 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
611 SDVLOG(2) << "Sync cycle failed. Will back off for "
612 << wait_interval_->length.InMilliseconds() << "ms.";
613 }
614 RestartWaiting();
615 }
616
617 void SyncSchedulerImpl::DoPollSyncSessionJob() {
618 SDVLOG(2) << "Polling with types "
619 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
620 std::unique_ptr<SyncSession> session(
621 SyncSession::Build(session_context_, this));
622 bool success = syncer_->PollSyncShare(
623 GetEnabledAndUnthrottledTypes(),
624 session.get());
625
626 // Only restart the timer if the poll succeeded. Otherwise rely on normal
627 // failure handling to retry with backoff.
628 if (success) {
629 AdjustPolling(FORCE_RESET);
630 HandleSuccess();
631 } else {
632 HandleFailure(session->status_controller().model_neutral_state());
633 }
634 }
635
636 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
637 DCHECK(CalledOnValidThread());
638 base::TimeTicks now = TimeTicks::Now();
639 // Update timing information for how often datatypes are triggering nudges.
640 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
641 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
642 last_local_nudges_by_model_type_[iter.Get()] = now;
643 if (previous.is_null())
644 continue;
645
646 #define PER_DATA_TYPE_MACRO(type_str) \
647 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
648 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
649 #undef PER_DATA_TYPE_MACRO
650 }
651 }
652
653 TimeDelta SyncSchedulerImpl::GetPollInterval() {
654 return (!session_context_->notifications_enabled() ||
655 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
656 syncer_short_poll_interval_seconds_ :
657 syncer_long_poll_interval_seconds_;
658 }
659
660 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
661 DCHECK(CalledOnValidThread());
662 if (!started_)
663 return;
664
665 TimeDelta poll_interval = GetPollInterval();
666 TimeDelta poll_delay = poll_interval;
667 const TimeTicks now = TimeTicks::Now();
668
669 if (type == UPDATE_INTERVAL) {
670 if (!last_poll_reset_.is_null()) {
671 // Override the delay based on the last successful poll time (if it was
672 // set).
673 TimeTicks new_poll_time = poll_interval + last_poll_reset_;
674 poll_delay = new_poll_time - TimeTicks::Now();
675
676 if (poll_delay < TimeDelta()) {
677 // The desired poll time was in the past, so trigger a poll now (the
678 // timer will post the task asynchronously, so re-entrancy isn't an
679 // issue).
680 poll_delay = TimeDelta();
681 }
682 } else {
683 // There was no previous poll. Keep the delay set to the normal interval,
684 // as if we had just completed a poll.
685 DCHECK_EQ(GetPollInterval(), poll_delay);
686 last_poll_reset_ = now;
687 }
688 } else {
689 // Otherwise just restart the timer.
690 DCHECK_EQ(FORCE_RESET, type);
691 DCHECK_EQ(GetPollInterval(), poll_delay);
692 last_poll_reset_ = now;
693 }
694
695 SDVLOG(1) << "Updating polling delay to " << poll_delay.InMinutes()
696 << " minutes.";
697
698 // Adjust poll rate. Start will reset the timer if it was already running.
699 poll_timer_.Start(FROM_HERE, poll_delay, this,
700 &SyncSchedulerImpl::PollTimerCallback);
701 }
702
703 void SyncSchedulerImpl::RestartWaiting() {
704 CHECK(wait_interval_.get());
705 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
706 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
707 SDVLOG(2) << "Starting WaitInterval timer of length "
708 << wait_interval_->length.InMilliseconds() << "ms.";
709 if (wait_interval_->mode == WaitInterval::THROTTLED) {
710 pending_wakeup_timer_.Start(
711 FROM_HERE,
712 wait_interval_->length,
713 base::Bind(&SyncSchedulerImpl::Unthrottle,
714 weak_ptr_factory_.GetWeakPtr()));
715 } else {
716 pending_wakeup_timer_.Start(
717 FROM_HERE,
718 wait_interval_->length,
719 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
720 weak_ptr_factory_.GetWeakPtr()));
721 }
722 }
723
724 void SyncSchedulerImpl::Stop() {
725 DCHECK(CalledOnValidThread());
726 SDVLOG(2) << "Stop called";
727
728 // Kill any in-flight method calls.
729 weak_ptr_factory_.InvalidateWeakPtrs();
730 wait_interval_.reset();
731 NotifyRetryTime(base::Time());
732 poll_timer_.Stop();
733 pending_wakeup_timer_.Stop();
734 pending_configure_params_.reset();
735 pending_clear_params_.reset();
736 if (started_)
737 started_ = false;
738 }
739
740 // This is the only place where we invoke DoSyncSessionJob with canary
741 // privileges. Everyone else should use NORMAL_PRIORITY.
742 void SyncSchedulerImpl::TryCanaryJob() {
743 next_sync_session_job_priority_ = CANARY_PRIORITY;
744 SDVLOG(2) << "Attempting canary job";
745 TrySyncSessionJob();
746 }
747
748 void SyncSchedulerImpl::TrySyncSessionJob() {
749 // Post call to TrySyncSessionJobImpl on current thread. Later request for
750 // access token will be here.
751 base::ThreadTaskRunnerHandle::Get()->PostTask(
752 FROM_HERE, base::Bind(&SyncSchedulerImpl::TrySyncSessionJobImpl,
753 weak_ptr_factory_.GetWeakPtr()));
754 }
755
756 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
757 JobPriority priority = next_sync_session_job_priority_;
758 next_sync_session_job_priority_ = NORMAL_PRIORITY;
759
760 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
761
762 DCHECK(CalledOnValidThread());
763 if (mode_ == CONFIGURATION_MODE) {
764 if (pending_configure_params_) {
765 SDVLOG(2) << "Found pending configure job";
766 DoConfigurationSyncSessionJob(priority);
767 }
768 } else if (mode_ == CLEAR_SERVER_DATA_MODE) {
769 if (pending_clear_params_) {
770 DoClearServerDataSyncSessionJob(priority);
771 }
772 } else if (CanRunNudgeJobNow(priority)) {
773 if (nudge_tracker_.IsSyncRequired()) {
774 SDVLOG(2) << "Found pending nudge job";
775 DoNudgeSyncSessionJob(priority);
776 } else if (((base::TimeTicks::Now() - last_poll_reset_) >=
777 GetPollInterval())) {
778 SDVLOG(2) << "Found pending poll";
779 DoPollSyncSessionJob();
780 }
781 } else {
782 // We must be in an error state. Transitioning out of each of these
783 // error states should trigger a canary job.
784 DCHECK(IsCurrentlyThrottled() || IsBackingOff() ||
785 session_context_->connection_manager()->HasInvalidAuthToken());
786 }
787
788 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
789 // If we succeeded, our wait interval would have been cleared. If it hasn't
790 // been cleared, then we should increase our backoff interval and schedule
791 // another retry.
792 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
793 wait_interval_.reset(
794 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
795 SDVLOG(2) << "Sync cycle failed. Will back off for "
796 << wait_interval_->length.InMilliseconds() << "ms.";
797 RestartWaiting();
798 }
799 }
800
801 void SyncSchedulerImpl::PollTimerCallback() {
802 DCHECK(CalledOnValidThread());
803 CHECK(!syncer_->IsSyncing());
804
805 TrySyncSessionJob();
806 }
807
808 void SyncSchedulerImpl::RetryTimerCallback() {
809 TrySyncSessionJob();
810 }
811
812 void SyncSchedulerImpl::Unthrottle() {
813 DCHECK(CalledOnValidThread());
814 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
815
816 // We're no longer throttled, so clear the wait interval.
817 wait_interval_.reset();
818 NotifyRetryTime(base::Time());
819 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
820
821 // We treat this as a 'canary' in the sense that it was originally scheduled
822 // to run some time ago, failed, and we now want to retry, versus a job that
823 // was just created (e.g via ScheduleNudgeImpl). The main implication is
824 // that we're careful to update routing info (etc) with such potentially
825 // stale canary jobs.
826 TryCanaryJob();
827 }
828
829 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
830 DCHECK(CalledOnValidThread());
831 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
832 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
833
834 if (nudge_tracker_.IsAnyTypeThrottled()) {
835 const base::TimeTicks now = base::TimeTicks::Now();
836 base::TimeDelta time_until_next_unthrottle =
837 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
838 type_unthrottle_timer_.Start(
839 FROM_HERE,
840 time_until_next_unthrottle,
841 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
842 weak_ptr_factory_.GetWeakPtr(),
843 now + time_until_next_unthrottle));
844 }
845
846 // Maybe this is a good time to run a nudge job. Let's try it.
847 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
848 TrySyncSessionJob();
849 }
850
851 void SyncSchedulerImpl::PerformDelayedNudge() {
852 // Circumstances may have changed since we scheduled this delayed nudge.
853 // We must check to see if it's OK to run the job before we do so.
854 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
855 TrySyncSessionJob();
856
857 // We're not responsible for setting up any retries here. The functions that
858 // first put us into a state that prevents successful sync cycles (eg. global
859 // throttling, type throttling, network errors, transient errors) will also
860 // setup the appropriate retry logic (eg. retry after timeout, exponential
861 // backoff, retry when the network changes).
862 }
863
864 void SyncSchedulerImpl::ExponentialBackoffRetry() {
865 TryCanaryJob();
866 }
867
868 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
869 FOR_EACH_OBSERVER(SyncEngineEventListener,
870 *session_context_->listeners(),
871 OnRetryTimeChanged(retry_time));
872 }
873
874 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
875 FOR_EACH_OBSERVER(SyncEngineEventListener,
876 *session_context_->listeners(),
877 OnThrottledTypesChanged(types));
878 }
879
880 bool SyncSchedulerImpl::IsBackingOff() const {
881 DCHECK(CalledOnValidThread());
882 return wait_interval_.get() && wait_interval_->mode ==
883 WaitInterval::EXPONENTIAL_BACKOFF;
884 }
885
886 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
887 DCHECK(CalledOnValidThread());
888 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
889 throttle_duration));
890 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
891 NotifyThrottledTypesChanged(ModelTypeSet::All());
892 }
893
894 void SyncSchedulerImpl::OnTypesThrottled(
895 ModelTypeSet types,
896 const base::TimeDelta& throttle_duration) {
897 base::TimeTicks now = base::TimeTicks::Now();
898
899 SDVLOG(1) << "Throttling " << ModelTypeSetToString(types) << " for "
900 << throttle_duration.InMinutes() << " minutes.";
901
902 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
903 base::TimeDelta time_until_next_unthrottle =
904 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
905 type_unthrottle_timer_.Start(
906 FROM_HERE,
907 time_until_next_unthrottle,
908 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
909 weak_ptr_factory_.GetWeakPtr(),
910 now + time_until_next_unthrottle));
911 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
912 }
913
914 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
915 DCHECK(CalledOnValidThread());
916 return wait_interval_.get() && wait_interval_->mode ==
917 WaitInterval::THROTTLED;
918 }
919
920 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
921 const base::TimeDelta& new_interval) {
922 DCHECK(CalledOnValidThread());
923 if (new_interval == syncer_short_poll_interval_seconds_)
924 return;
925 SDVLOG(1) << "Updating short poll interval to " << new_interval.InMinutes()
926 << " minutes.";
927 syncer_short_poll_interval_seconds_ = new_interval;
928 AdjustPolling(UPDATE_INTERVAL);
929 }
930
931 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
932 const base::TimeDelta& new_interval) {
933 DCHECK(CalledOnValidThread());
934 if (new_interval == syncer_long_poll_interval_seconds_)
935 return;
936 SDVLOG(1) << "Updating long poll interval to " << new_interval.InMinutes()
937 << " minutes.";
938 syncer_long_poll_interval_seconds_ = new_interval;
939 AdjustPolling(UPDATE_INTERVAL);
940 }
941
942 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
943 const std::map<ModelType, base::TimeDelta>& nudge_delays) {
944 DCHECK(CalledOnValidThread());
945 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
946 }
947
948 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
949 if (size > 0)
950 nudge_tracker_.SetHintBufferSize(size);
951 else
952 NOTREACHED() << "Hint buffer size should be > 0.";
953 }
954
955 void SyncSchedulerImpl::OnSyncProtocolError(
956 const SyncProtocolError& sync_protocol_error) {
957 DCHECK(CalledOnValidThread());
958 if (ShouldRequestEarlyExit(sync_protocol_error)) {
959 SDVLOG(2) << "Sync Scheduler requesting early exit.";
960 Stop();
961 }
962 if (IsActionableError(sync_protocol_error)) {
963 SDVLOG(2) << "OnActionableError";
964 FOR_EACH_OBSERVER(SyncEngineEventListener,
965 *session_context_->listeners(),
966 OnActionableError(sync_protocol_error));
967 }
968 }
969
970 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
971 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
972 retry_timer_.Start(FROM_HERE, delay, this,
973 &SyncSchedulerImpl::RetryTimerCallback);
974 }
975
976 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
977 FOR_EACH_OBSERVER(SyncEngineEventListener,
978 *session_context_->listeners(),
979 OnMigrationRequested(types));
980 }
981
982 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
983 DCHECK(CalledOnValidThread());
984 session_context_->set_notifications_enabled(notifications_enabled);
985 if (notifications_enabled)
986 nudge_tracker_.OnInvalidationsEnabled();
987 else
988 nudge_tracker_.OnInvalidationsDisabled();
989 }
990
991 #undef SDVLOG_LOC
992
993 #undef SDVLOG
994
995 #undef SLOG
996
997 #undef ENUM_CASE
998
999 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698