Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Unified Diff: sync/engine/sync_scheduler_impl.cc

Issue 13422003: sync: Refactor job ownership in SyncScheduler (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Update comment Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sync/engine/sync_scheduler_impl.cc
diff --git a/sync/engine/sync_scheduler_impl.cc b/sync/engine/sync_scheduler_impl.cc
index 7fd9ccda52bcc93e70568725ec07c9ba4b4e314b..b03b3c85bb16613012941a5627f6a7425922c3f7 100644
--- a/sync/engine/sync_scheduler_impl.cc
+++ b/sync/engine/sync_scheduler_impl.cc
@@ -83,13 +83,10 @@ ConfigurationParams::ConfigurationParams(
ConfigurationParams::~ConfigurationParams() {}
SyncSchedulerImpl::WaitInterval::WaitInterval()
- : mode(UNKNOWN),
- had_nudge(false),
- pending_configure_job(NULL) {}
+ : mode(UNKNOWN) {}
SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
- : mode(mode), had_nudge(false), length(length),
- pending_configure_job(NULL) {}
+ : mode(mode), length(length) {}
SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
@@ -160,7 +157,6 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
weak_handle_this_(MakeWeakHandle(
weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
name_(name),
- sync_loop_(MessageLoop::current()),
started_(false),
syncer_short_poll_interval_seconds_(
TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
@@ -169,23 +165,19 @@ SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
sessions_commit_delay_(
TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
mode_(NORMAL_MODE),
- // Start with assuming everything is fine with the connection.
- // At the end of the sync cycle we would have the correct status.
- pending_nudge_(NULL),
delay_provider_(delay_provider),
syncer_(syncer),
session_context_(context),
no_scheduling_allowed_(false) {
- DCHECK(sync_loop_);
}
SyncSchedulerImpl::~SyncSchedulerImpl() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
StopImpl(base::Closure());
}
void SyncSchedulerImpl::OnCredentialsUpdated() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
if (HttpResponse::SYNC_AUTH_ERROR ==
session_context_->connection_manager()->server_status()) {
@@ -214,14 +206,11 @@ void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
// call DoCanaryJob to achieve this, and note that nothing -- not even a
// canary job -- can bypass a THROTTLED WaitInterval. The only thing that
// has the authority to do that is the Unthrottle timer.
- scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
- if (!pending.get())
- return;
- DoCanaryJob(pending.Pass());
+ TryCanaryJob();
}
void SyncSchedulerImpl::Start(Mode mode) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
std::string thread_name = MessageLoop::current()->thread_name();
if (thread_name.empty())
thread_name = "<Main thread>";
@@ -238,25 +227,15 @@ void SyncSchedulerImpl::Start(Mode mode) {
mode_ = mode;
AdjustPolling(NULL); // Will kick start poll timer if needed.
- if (old_mode != mode_) {
- // We just changed our mode. See if there are any pending jobs that we could
- // execute in the new mode.
- if (mode_ == NORMAL_MODE) {
- // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
- // has not yet completed.
- DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
- }
-
- scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
- if (pending.get()) {
- SDVLOG(2) << "Executing pending job. Good luck!";
- DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY);
- }
+ if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) {
+ // We just got back to normal mode. Let's try to run the work that was
+ // queued up while we were configuring.
+ DoNudgeSyncSessionJob(NORMAL_PRIORITY);
}
}
void SyncSchedulerImpl::SendInitialSnapshot() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
scoped_ptr<SyncSession> dummy(new SyncSession(
session_context_, this, SyncSourceInfo()));
SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
@@ -286,7 +265,7 @@ void BuildModelSafeParams(
bool SyncSchedulerImpl::ScheduleConfiguration(
const ConfigurationParams& params) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
DCHECK_EQ(CONFIGURATION_MODE, mode_);
DCHECK(!params.ready_task.is_null());
@@ -295,7 +274,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
// Only one configuration is allowed at a time. Verify we're not waiting
// for a pending configure job.
- DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
+ DCHECK(!pending_configure_job_);
ModelSafeRoutingInfo restricted_routes;
BuildModelSafeParams(params.types_to_download,
@@ -306,7 +285,7 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
// Only reconfigure if we have types to download.
if (!params.types_to_download.Empty()) {
DCHECK(!restricted_routes.empty());
- scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
+ pending_configure_job_.reset(new SyncSessionJob(
SyncSessionJob::CONFIGURATION,
TimeTicks::Now(),
SyncSourceInfo(params.source,
@@ -314,13 +293,15 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
restricted_routes,
std::string())),
params));
- bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY);
+ bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
// If we failed, the job would have been saved as the pending configure
// job and a wait interval would have been set.
if (!succeeded) {
- DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
+ DCHECK(pending_configure_job_);
return false;
+ } else {
+ DCHECK(!pending_configure_job_);
}
} else {
SDVLOG(2) << "No change in routing info, calling ready task directly.";
@@ -333,13 +314,12 @@ bool SyncSchedulerImpl::ScheduleConfiguration(
SyncSchedulerImpl::JobProcessDecision
SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
JobPriority priority) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
DCHECK(wait_interval_.get());
DCHECK_NE(job.purpose(), SyncSessionJob::POLL);
SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
<< WaitInterval::GetModeString(wait_interval_->mode)
- << (wait_interval_->had_nudge ? " (had nudge)" : "")
<< ((priority == CANARY_PRIORITY) ? " (canary)" : "");
// If we save a job while in a WaitInterval, there is a well-defined moment
@@ -358,11 +338,9 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
if (mode_ == CONFIGURATION_MODE)
return SAVE;
- // If we already had one nudge then just drop this nudge. We will retry
- // later when the timer runs out.
if (priority == NORMAL_PRIORITY)
- return wait_interval_->had_nudge ? DROP : CONTINUE;
- else // We are here because timer ran out. So retry.
+ return DROP;
+ else // Either backoff has ended, or we have permission to bypass it.
return CONTINUE;
}
return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE;
@@ -371,7 +349,7 @@ SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
const SyncSessionJob& job,
JobPriority priority) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
// POLL jobs do not call this function.
DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
@@ -449,69 +427,11 @@ SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
}
-void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
- const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
- if (is_nudge && pending_nudge_) {
- SDVLOG(2) << "Coalescing a pending nudge";
- // TODO(tim): This basically means we never use the more-careful coalescing
- // logic in ScheduleNudgeImpl that takes the min of the two nudge start
- // times, because we're calling this function first. Pull this out
- // into a function to coalesce + set start times and reuse.
- pending_nudge_->CoalesceSources(job->source_info());
- return;
- }
-
- scoped_ptr<SyncSessionJob> job_to_save = job->Clone();
- if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
- // This job should be made the new canary.
- if (is_nudge) {
- pending_nudge_ = job_to_save.get();
- } else {
- SDVLOG(2) << "Saving a configuration job";
- DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
- DCHECK(!wait_interval_->pending_configure_job);
- DCHECK_EQ(mode_, CONFIGURATION_MODE);
- DCHECK(!job->config_params().ready_task.is_null());
- // The only nudge that could exist is a scheduled canary nudge.
- DCHECK(!unscheduled_nudge_storage_.get());
- if (pending_nudge_) {
- // Pre-empt the nudge canary and abandon the old nudge (owned by task).
- unscheduled_nudge_storage_ = pending_nudge_->Clone();
- pending_nudge_ = unscheduled_nudge_storage_.get();
- }
- wait_interval_->pending_configure_job = job_to_save.get();
- }
- TimeDelta length =
- wait_interval_->timer.desired_run_time() - TimeTicks::Now();
- wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
- TimeDelta::FromSeconds(0) : length;
- RestartWaiting(job_to_save.Pass());
- return;
- }
-
- // Note that today there are no cases where we SAVE a CONFIGURATION job
- // when we're not in a WaitInterval. See bug 147736.
- DCHECK(is_nudge);
- // There may or may not be a pending_configure_job. Either way this nudge
- // is unschedulable.
- pending_nudge_ = job_to_save.get();
- unscheduled_nudge_storage_ = job_to_save.Pass();
-}
-
-// Functor for std::find_if to search by ModelSafeGroup.
-struct ModelSafeWorkerGroupIs {
- explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
- bool operator()(ModelSafeWorker* w) {
- return group == w->GetModelSafeGroup();
- }
- ModelSafeGroup group;
-};
-
void SyncSchedulerImpl::ScheduleNudgeAsync(
const TimeDelta& desired_delay,
NudgeSource source, ModelTypeSet types,
const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
SDVLOG_LOC(nudge_location, 2)
<< "Nudge scheduled with delay "
<< desired_delay.InMilliseconds() << " ms, "
@@ -530,7 +450,7 @@ void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
const TimeDelta& desired_delay,
NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
SDVLOG_LOC(nudge_location, 2)
<< "Nudge scheduled with delay "
<< desired_delay.InMilliseconds() << " ms, "
@@ -552,7 +472,7 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
GetUpdatesCallerInfo::GetUpdatesSource source,
const ModelTypeInvalidationMap& invalidation_map,
const tracked_objects::Location& nudge_location) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
if (no_scheduling_allowed_) {
@@ -586,51 +506,43 @@ void SyncSchedulerImpl::ScheduleNudgeImpl(
<< SyncSessionJob::GetPurposeString(job->purpose())
<< " in mode " << GetModeString(mode_)
<< ": " << GetDecisionString(decision);
- if (decision != CONTINUE) {
- // End of the line, though we may save the job for later.
- if (decision == SAVE) {
- HandleSaveJobDecision(job.Pass());
- } else {
- DCHECK_EQ(decision, DROP);
- }
+ if (decision == DROP) {
return;
}
- if (pending_nudge_) {
- SDVLOG(2) << "Rescheduling pending nudge";
- pending_nudge_->CoalesceSources(job->source_info());
- // Choose the start time as the earliest of the 2. Note that this means
- // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
- // but a nudge is already scheduled to go out, we'll send the (tab) commit
- // without waiting.
- pending_nudge_->set_scheduled_start(
- std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
- // Abandon the old task by cloning and replacing the session.
- // It's possible that by "rescheduling" we're actually taking a job that
- // was previously unscheduled and giving it wings, so take care to reset
- // unscheduled nudge storage.
- job = pending_nudge_->Clone();
- pending_nudge_ = NULL;
- unscheduled_nudge_storage_.reset();
- // It's also possible we took a canary job, since we allow one nudge
- // per backoff interval.
- DCHECK(!wait_interval_ || !wait_interval_->had_nudge);
+ // Try to coalesce in both SAVE and CONTINUE cases.
+ if (pending_nudge_job_) {
+ pending_nudge_job_->CoalesceSources(job->source_info());
+ if (decision == CONTINUE) {
+ // Only update the scheduled_start if we're going to reschedule.
+ pending_nudge_job_->set_scheduled_start(
+ std::min(job->scheduled_start(),
+ pending_nudge_job_->scheduled_start()));
+ }
+ } else {
+ pending_nudge_job_ = job.Pass();
+ }
+
+ if (decision == SAVE) {
+ return;
}
- TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now();
+ TimeDelta run_delay =
+ pending_nudge_job_->scheduled_start() - TimeTicks::Now();
if (run_delay < TimeDelta::FromMilliseconds(0))
run_delay = TimeDelta::FromMilliseconds(0);
SDVLOG_LOC(nudge_location, 2)
<< "Scheduling a nudge with "
<< run_delay.InMilliseconds() << " ms delay";
- pending_nudge_ = job.get();
- PostDelayedTask(nudge_location, "DoSyncSessionJob",
- base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&job),
- NORMAL_PRIORITY),
- run_delay);
+ if (started_) {
+ pending_wakeup_timer_.Start(
+ nudge_location,
+ run_delay,
+ base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob,
+ weak_ptr_factory_.GetWeakPtr(),
+ NORMAL_PRIORITY));
+ }
}
const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
@@ -651,27 +563,9 @@ const char* SyncSchedulerImpl::GetDecisionString(
return "";
}
-void SyncSchedulerImpl::PostDelayedTask(
- const tracked_objects::Location& from_here,
- const char* name, const base::Closure& task, base::TimeDelta delay) {
- SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
- << delay.InMilliseconds() << " ms delay";
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (!started_) {
- SDVLOG(1) << "Not posting task as scheduler is stopped.";
- return;
- }
- // This cancels the previous task, if one existed.
- pending_wakeup_.Reset(task);
- sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay);
-}
-
-bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
- JobPriority priority) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- if (job->purpose() == SyncSessionJob::NUDGE) {
- pending_nudge_ = NULL;
- }
+bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job,
+ JobPriority priority) {
+ DCHECK(CalledOnValidThread());
base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
JobProcessDecision decision = DecideOnJob(*job, priority);
@@ -682,7 +576,11 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
<< ": " << GetDecisionString(decision);
if (decision != CONTINUE) {
if (decision == SAVE) {
- HandleSaveJobDecision(job.Pass());
+ if (job->purpose() == SyncSessionJob::CONFIGURATION) {
+ pending_configure_job_ = job.Pass();
+ } else {
+ pending_nudge_job_ = job.Pass();
+ }
} else {
DCHECK_EQ(decision, DROP);
}
@@ -707,15 +605,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
// If we're here, it's because |job| was silenced until a server specified
// time. (Note, it had to be |job|, because DecideOnJob would not permit
// any job through while in WaitInterval::THROTTLED).
- scoped_ptr<SyncSessionJob> clone = job->Clone();
- if (clone->purpose() == SyncSessionJob::NUDGE)
- pending_nudge_ = clone.get();
- else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
- wait_interval_->pending_configure_job = clone.get();
+ if (job->purpose() == SyncSessionJob::NUDGE)
+ pending_nudge_job_ = job.Pass();
+ else if (job->purpose() == SyncSessionJob::CONFIGURATION)
+ pending_configure_job_ = job.Pass();
else
NOTREACHED();
- RestartWaiting(clone.Pass());
+ RestartWaiting();
return success;
}
@@ -725,6 +622,14 @@ bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
return success;
}
+void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
+ DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority);
+}
+
+bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
+ return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority);
+}
+
bool SyncSchedulerImpl::ShouldPoll() {
if (wait_interval_.get()) {
SDVLOG(2) << "Not running poll in wait interval.";
@@ -747,8 +652,15 @@ bool SyncSchedulerImpl::ShouldPoll() {
return true;
}
-void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
- DCHECK_EQ(job->purpose(), SyncSessionJob::POLL);
+void SyncSchedulerImpl::DoPollSyncSessionJob() {
+ ModelSafeRoutingInfo r;
+ ModelTypeInvalidationMap invalidation_map =
+ ModelSafeRoutingInfoToInvalidationMap(r, std::string());
+ SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
+ scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
+ TimeTicks::Now(),
+ info,
+ ConfigurationParams()));
base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
@@ -766,17 +678,16 @@ void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
FinishSyncSessionJob(job.get(), premature_exit, &session);
if (IsSyncingCurrentlySilenced()) {
- // This will start the countdown to unthrottle. Other kinds of jobs would
- // schedule themselves as the post-unthrottle canary. A poll job is not
- // that urgent, so it does not get to be the canary. We still need to start
- // the timer regardless. Otherwise there could be no one to clear the
- // WaitInterval when the throttling expires.
- RestartWaiting(scoped_ptr<SyncSessionJob>());
+ // Normally we would only call RestartWaiting() if we had a
+ // pending_nudge_job_ or pending_configure_job_ set. In this case, it's
+ // possible that neither is set. We create the wait interval anyway because
+ // we need it to make sure we get unthrottled on time.
+ RestartWaiting();
}
}
void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
// We are interested in recording time between local nudges for datatypes.
// TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
@@ -803,7 +714,7 @@ void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
bool exited_prematurely,
SyncSession* session) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
// Let job know that we're through syncing (calling SyncShare) at this point.
bool succeeded = false;
@@ -831,7 +742,7 @@ bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
void SyncSchedulerImpl::ScheduleNextSync(
scoped_ptr<SyncSessionJob> finished_job,
SyncSession* session) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION
|| finished_job->purpose() == SyncSessionJob::NUDGE);
@@ -841,34 +752,12 @@ void SyncSchedulerImpl::ScheduleNextSync(
// we should be able to detect such errors and only retry when we detect
// transient errors.
- if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
- mode_ == NORMAL_MODE) {
- // When in normal mode, we allow up to one nudge per backoff interval. It
- // appears that this was our nudge for this interval, and it failed.
- //
- // Note: This does not prevent us from running canary jobs. For example,
- // an IP address change might still result in another nudge being executed
- // during this backoff interval.
- SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
- DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
- DCHECK(!wait_interval_->had_nudge);
-
- wait_interval_->had_nudge = true;
- DCHECK(!pending_nudge_);
-
- scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
- pending_nudge_ = new_job.get();
- RestartWaiting(new_job.Pass());
- } else {
- // Either this is the first failure or a consecutive failure after our
- // backoff timer expired. We handle it the same way in either case.
- SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
- HandleContinuationError(finished_job.Pass(), session);
- }
+ SDVLOG(2) << "SyncShare job failed; will start or update backoff";
+ HandleContinuationError(finished_job.Pass(), session);
}
void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
TimeDelta poll = (!session_context_->notifications_enabled()) ?
syncer_short_poll_interval_seconds_ :
@@ -888,28 +777,28 @@ void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
&SyncSchedulerImpl::PollTimerCallback);
}
-void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
+void SyncSchedulerImpl::RestartWaiting() {
CHECK(wait_interval_.get());
- wait_interval_->timer.Stop();
DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
if (wait_interval_->mode == WaitInterval::THROTTLED) {
- pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&job)));
-
+ pending_wakeup_timer_.Start(
+ FROM_HERE,
+ wait_interval_->length,
+ base::Bind(&SyncSchedulerImpl::Unthrottle,
+ weak_ptr_factory_.GetWeakPtr()));
} else {
- pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob,
- weak_ptr_factory_.GetWeakPtr(),
- base::Passed(&job)));
+ pending_wakeup_timer_.Start(
+ FROM_HERE,
+ wait_interval_->length,
+ base::Bind(&SyncSchedulerImpl::TryCanaryJob,
+ weak_ptr_factory_.GetWeakPtr()));
}
- wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
- pending_wakeup_.callback());
}
void SyncSchedulerImpl::HandleContinuationError(
scoped_ptr<SyncSessionJob> old_job,
SyncSession* session) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
TimeDelta length = delay_provider_->GetDelay(
IsBackingOff() ? wait_interval_->length :
@@ -921,26 +810,24 @@ void SyncSchedulerImpl::HandleContinuationError(
<< " job. The time delta(ms) is "
<< length.InMilliseconds();
- // This will reset the had_nudge variable as well.
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
length));
NotifyRetryTime(base::Time::Now() + length);
- scoped_ptr<SyncSessionJob> new_job(old_job->Clone());
- new_job->set_scheduled_start(TimeTicks::Now() + length);
+ old_job->set_scheduled_start(TimeTicks::Now() + length);
if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
// Config params should always get set.
DCHECK(!old_job->config_params().ready_task.is_null());
- wait_interval_->pending_configure_job = new_job.get();
+ DCHECK(!pending_configure_job_);
+ pending_configure_job_ = old_job.Pass();
} else {
- // We are not in configuration mode. So wait_interval's pending job
- // should be null.
- DCHECK(wait_interval_->pending_configure_job == NULL);
- DCHECK(!pending_nudge_);
- pending_nudge_ = new_job.get();
+ // We're not in configure mode so we should not have a configure job.
+ DCHECK(!pending_configure_job_);
+ DCHECK(!pending_nudge_job_);
+ pending_nudge_job_ = old_job.Pass();
}
- RestartWaiting(new_job.Pass());
+ RestartWaiting();
}
void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
@@ -953,7 +840,7 @@ void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
}
void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
SDVLOG(2) << "StopImpl called";
// Kill any in-flight method calls.
@@ -961,9 +848,9 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
wait_interval_.reset();
NotifyRetryTime(base::Time());
poll_timer_.Stop();
- pending_nudge_ = NULL;
- unscheduled_nudge_storage_.reset();
- pending_wakeup_.Cancel();
+ pending_wakeup_timer_.Stop();
+ pending_nudge_job_.reset();
+ pending_configure_job_.reset();
if (started_) {
started_ = false;
}
@@ -971,48 +858,24 @@ void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
callback.Run();
}
-void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- SDVLOG(2) << "Do canary job";
-
- // This is the only place where we invoke DoSyncSessionJob with canary
- // privileges. Everyone else should use NORMAL_PRIORITY.
- DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY);
-}
+// This is the only place where we invoke DoSyncSessionJob with canary
+// privileges. Everyone else should use NORMAL_PRIORITY.
+void SyncSchedulerImpl::TryCanaryJob() {
+ DCHECK(CalledOnValidThread());
-scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- // If we find a scheduled pending_ job, abandon the old one and return a
- // a clone. If unscheduled, just hand over ownership.
- scoped_ptr<SyncSessionJob> candidate;
- if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
- && wait_interval_->pending_configure_job) {
- SDVLOG(2) << "Found pending configure job";
- candidate =
- wait_interval_->pending_configure_job->Clone().Pass();
- wait_interval_->pending_configure_job = candidate.get();
- } else if (mode_ == NORMAL_MODE && pending_nudge_) {
- SDVLOG(2) << "Found pending nudge job";
- candidate = pending_nudge_->Clone();
- pending_nudge_ = candidate.get();
- unscheduled_nudge_storage_.reset();
+ if (mode_ == CONFIGURATION_MODE && pending_configure_job_) {
+ SDVLOG(2) << "Found pending configure job; will run as canary";
+ DoConfigurationSyncSessionJob(CANARY_PRIORITY);
+ } else if (mode_ == NORMAL_MODE && pending_nudge_job_) {
+ SDVLOG(2) << "Found pending nudge job; will run as canary";
+ DoNudgeSyncSessionJob(CANARY_PRIORITY);
+ } else {
+ SDVLOG(2) << "Found no work to do; will not run a canary";
}
- // If we took a job and there's a wait interval, we took the pending canary.
- if (candidate && wait_interval_)
- wait_interval_->timer.Stop();
- return candidate.Pass();
}
void SyncSchedulerImpl::PollTimerCallback() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
- ModelSafeRoutingInfo r;
- ModelTypeInvalidationMap invalidation_map =
- ModelSafeRoutingInfoToInvalidationMap(r, std::string());
- SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
- scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
- TimeTicks::Now(),
- info,
- ConfigurationParams()));
+ DCHECK(CalledOnValidThread());
if (no_scheduling_allowed_) {
// The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
// functions that are called only on the sync thread. This function is also
@@ -1024,16 +887,12 @@ void SyncSchedulerImpl::PollTimerCallback() {
return;
}
- DoPollSyncSessionJob(job.Pass());
+ DoPollSyncSessionJob();
}
-void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+void SyncSchedulerImpl::Unthrottle() {
+ DCHECK(CalledOnValidThread());
DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
- DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() ||
- wait_interval_->pending_configure_job == to_be_canary.get());
- SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
- << "canary.";
// We're no longer throttled, so clear the wait interval.
wait_interval_.reset();
@@ -1044,15 +903,11 @@ void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
// was just created (e.g via ScheduleNudgeImpl). The main implication is
// that we're careful to update routing info (etc) with such potentially
// stale canary jobs.
- if (to_be_canary.get()) {
- DoCanaryJob(to_be_canary.Pass());
- } else {
- DCHECK(!unscheduled_nudge_storage_.get());
- }
+ TryCanaryJob();
}
void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
session_context_->NotifyListeners(SyncEngineEvent(cause));
}
@@ -1063,45 +918,45 @@ void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
}
bool SyncSchedulerImpl::IsBackingOff() const {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
return wait_interval_.get() && wait_interval_->mode ==
WaitInterval::EXPONENTIAL_BACKOFF;
}
void SyncSchedulerImpl::OnSilencedUntil(
const base::TimeTicks& silenced_until) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
silenced_until - TimeTicks::Now()));
NotifyRetryTime(base::Time::Now() + wait_interval_->length);
}
bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
return wait_interval_.get() && wait_interval_->mode ==
WaitInterval::THROTTLED;
}
void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
syncer_short_poll_interval_seconds_ = new_interval;
}
void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
const base::TimeDelta& new_interval) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
syncer_long_poll_interval_seconds_ = new_interval;
}
void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
const base::TimeDelta& new_delay) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
sessions_commit_delay_ = new_delay;
}
void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
SDVLOG(2) << "OnShouldStopSyncingPermanently";
syncer_->RequestEarlyExit(); // Thread-safe.
Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
@@ -1109,7 +964,7 @@ void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
void SyncSchedulerImpl::OnActionableError(
const sessions::SyncSessionSnapshot& snap) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
SDVLOG(2) << "OnActionableError";
SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
event.snapshot = snap;
@@ -1118,7 +973,7 @@ void SyncSchedulerImpl::OnActionableError(
void SyncSchedulerImpl::OnSyncProtocolError(
const sessions::SyncSessionSnapshot& snapshot) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
if (ShouldRequestEarlyExit(
snapshot.model_neutral_state().sync_protocol_error)) {
SDVLOG(2) << "Sync Scheduler requesting early exit.";
@@ -1129,12 +984,12 @@ void SyncSchedulerImpl::OnSyncProtocolError(
}
void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
session_context_->set_notifications_enabled(notifications_enabled);
}
base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
- DCHECK_EQ(MessageLoop::current(), sync_loop_);
+ DCHECK(CalledOnValidThread());
return sessions_commit_delay_;
}
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698