Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 13422003: sync: Refactor job ownership in SyncScheduler (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Update comment Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 const base::Closure& ready_task) 76 const base::Closure& ready_task)
77 : source(source), 77 : source(source),
78 types_to_download(types_to_download), 78 types_to_download(types_to_download),
79 routing_info(routing_info), 79 routing_info(routing_info),
80 ready_task(ready_task) { 80 ready_task(ready_task) {
81 DCHECK(!ready_task.is_null()); 81 DCHECK(!ready_task.is_null());
82 } 82 }
83 ConfigurationParams::~ConfigurationParams() {} 83 ConfigurationParams::~ConfigurationParams() {}
84 84
85 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 SyncSchedulerImpl::WaitInterval::WaitInterval()
86 : mode(UNKNOWN), 86 : mode(UNKNOWN) {}
87 had_nudge(false),
88 pending_configure_job(NULL) {}
89 87
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 88 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
91 : mode(mode), had_nudge(false), length(length), 89 : mode(mode), length(length) {}
92 pending_configure_job(NULL) {}
93 90
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 91 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
95 92
96 #define ENUM_CASE(x) case x: return #x; break; 93 #define ENUM_CASE(x) case x: return #x; break;
97 94
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 95 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
99 switch (mode) { 96 switch (mode) {
100 ENUM_CASE(UNKNOWN); 97 ENUM_CASE(UNKNOWN);
101 ENUM_CASE(EXPONENTIAL_BACKOFF); 98 ENUM_CASE(EXPONENTIAL_BACKOFF);
102 ENUM_CASE(THROTTLED); 99 ENUM_CASE(THROTTLED);
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
153 150
154 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, 151 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
155 BackoffDelayProvider* delay_provider, 152 BackoffDelayProvider* delay_provider,
156 sessions::SyncSessionContext* context, 153 sessions::SyncSessionContext* context,
157 Syncer* syncer) 154 Syncer* syncer)
158 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 155 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
159 weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 156 weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
160 weak_handle_this_(MakeWeakHandle( 157 weak_handle_this_(MakeWeakHandle(
161 weak_ptr_factory_for_weak_handle_.GetWeakPtr())), 158 weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
162 name_(name), 159 name_(name),
163 sync_loop_(MessageLoop::current()),
164 started_(false), 160 started_(false),
165 syncer_short_poll_interval_seconds_( 161 syncer_short_poll_interval_seconds_(
166 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 162 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
167 syncer_long_poll_interval_seconds_( 163 syncer_long_poll_interval_seconds_(
168 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 164 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
169 sessions_commit_delay_( 165 sessions_commit_delay_(
170 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 166 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
171 mode_(NORMAL_MODE), 167 mode_(NORMAL_MODE),
172 // Start with assuming everything is fine with the connection.
173 // At the end of the sync cycle we would have the correct status.
174 pending_nudge_(NULL),
175 delay_provider_(delay_provider), 168 delay_provider_(delay_provider),
176 syncer_(syncer), 169 syncer_(syncer),
177 session_context_(context), 170 session_context_(context),
178 no_scheduling_allowed_(false) { 171 no_scheduling_allowed_(false) {
179 DCHECK(sync_loop_);
180 } 172 }
181 173
182 SyncSchedulerImpl::~SyncSchedulerImpl() { 174 SyncSchedulerImpl::~SyncSchedulerImpl() {
183 DCHECK_EQ(MessageLoop::current(), sync_loop_); 175 DCHECK(CalledOnValidThread());
184 StopImpl(base::Closure()); 176 StopImpl(base::Closure());
185 } 177 }
186 178
187 void SyncSchedulerImpl::OnCredentialsUpdated() { 179 void SyncSchedulerImpl::OnCredentialsUpdated() {
188 DCHECK_EQ(MessageLoop::current(), sync_loop_); 180 DCHECK(CalledOnValidThread());
189 181
190 if (HttpResponse::SYNC_AUTH_ERROR == 182 if (HttpResponse::SYNC_AUTH_ERROR ==
191 session_context_->connection_manager()->server_status()) { 183 session_context_->connection_manager()->server_status()) {
192 OnServerConnectionErrorFixed(); 184 OnServerConnectionErrorFixed();
193 } 185 }
194 } 186 }
195 187
196 void SyncSchedulerImpl::OnConnectionStatusChange() { 188 void SyncSchedulerImpl::OnConnectionStatusChange() {
197 if (HttpResponse::CONNECTION_UNAVAILABLE == 189 if (HttpResponse::CONNECTION_UNAVAILABLE ==
198 session_context_->connection_manager()->server_status()) { 190 session_context_->connection_manager()->server_status()) {
199 // Optimistically assume that the connection is fixed and try 191 // Optimistically assume that the connection is fixed and try
200 // connecting. 192 // connecting.
201 OnServerConnectionErrorFixed(); 193 OnServerConnectionErrorFixed();
202 } 194 }
203 } 195 }
204 196
205 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 197 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
206 // There could be a pending nudge or configuration job in several cases: 198 // There could be a pending nudge or configuration job in several cases:
207 // 199 //
208 // 1. We're in exponential backoff. 200 // 1. We're in exponential backoff.
209 // 2. We're silenced / throttled. 201 // 2. We're silenced / throttled.
210 // 3. A nudge was saved previously due to not having a valid auth token. 202 // 3. A nudge was saved previously due to not having a valid auth token.
211 // 4. A nudge was scheduled + saved while in configuration mode. 203 // 4. A nudge was scheduled + saved while in configuration mode.
212 // 204 //
213 // In all cases except (2), we want to retry contacting the server. We 205 // In all cases except (2), we want to retry contacting the server. We
214 // call DoCanaryJob to achieve this, and note that nothing -- not even a 206 // call DoCanaryJob to achieve this, and note that nothing -- not even a
215 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 207 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
216 // has the authority to do that is the Unthrottle timer. 208 // has the authority to do that is the Unthrottle timer.
217 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); 209 TryCanaryJob();
218 if (!pending.get())
219 return;
220 DoCanaryJob(pending.Pass());
221 } 210 }
222 211
223 void SyncSchedulerImpl::Start(Mode mode) { 212 void SyncSchedulerImpl::Start(Mode mode) {
224 DCHECK_EQ(MessageLoop::current(), sync_loop_); 213 DCHECK(CalledOnValidThread());
225 std::string thread_name = MessageLoop::current()->thread_name(); 214 std::string thread_name = MessageLoop::current()->thread_name();
226 if (thread_name.empty()) 215 if (thread_name.empty())
227 thread_name = "<Main thread>"; 216 thread_name = "<Main thread>";
228 SDVLOG(2) << "Start called from thread " 217 SDVLOG(2) << "Start called from thread "
229 << thread_name << " with mode " << GetModeString(mode); 218 << thread_name << " with mode " << GetModeString(mode);
230 if (!started_) { 219 if (!started_) {
231 started_ = true; 220 started_ = true;
232 SendInitialSnapshot(); 221 SendInitialSnapshot();
233 } 222 }
234 223
235 DCHECK(!session_context_->account_name().empty()); 224 DCHECK(!session_context_->account_name().empty());
236 DCHECK(syncer_.get()); 225 DCHECK(syncer_.get());
237 Mode old_mode = mode_; 226 Mode old_mode = mode_;
238 mode_ = mode; 227 mode_ = mode;
239 AdjustPolling(NULL); // Will kick start poll timer if needed. 228 AdjustPolling(NULL); // Will kick start poll timer if needed.
240 229
241 if (old_mode != mode_) { 230 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) {
242 // We just changed our mode. See if there are any pending jobs that we could 231 // We just got back to normal mode. Let's try to run the work that was
243 // execute in the new mode. 232 // queued up while we were configuring.
244 if (mode_ == NORMAL_MODE) { 233 DoNudgeSyncSessionJob(NORMAL_PRIORITY);
245 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
246 // has not yet completed.
247 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
248 }
249
250 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
251 if (pending.get()) {
252 SDVLOG(2) << "Executing pending job. Good luck!";
253 DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY);
254 }
255 } 234 }
256 } 235 }
257 236
258 void SyncSchedulerImpl::SendInitialSnapshot() { 237 void SyncSchedulerImpl::SendInitialSnapshot() {
259 DCHECK_EQ(MessageLoop::current(), sync_loop_); 238 DCHECK(CalledOnValidThread());
260 scoped_ptr<SyncSession> dummy(new SyncSession( 239 scoped_ptr<SyncSession> dummy(new SyncSession(
261 session_context_, this, SyncSourceInfo())); 240 session_context_, this, SyncSourceInfo()));
262 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 241 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
263 event.snapshot = dummy->TakeSnapshot(); 242 event.snapshot = dummy->TakeSnapshot();
264 session_context_->NotifyListeners(event); 243 session_context_->NotifyListeners(event);
265 } 244 }
266 245
267 namespace { 246 namespace {
268 247
269 // Helper to extract the routing info corresponding to types in 248 // Helper to extract the routing info corresponding to types in
270 // |types_to_download| from |current_routes|. 249 // |types_to_download| from |current_routes|.
271 void BuildModelSafeParams( 250 void BuildModelSafeParams(
272 ModelTypeSet types_to_download, 251 ModelTypeSet types_to_download,
273 const ModelSafeRoutingInfo& current_routes, 252 const ModelSafeRoutingInfo& current_routes,
274 ModelSafeRoutingInfo* result_routes) { 253 ModelSafeRoutingInfo* result_routes) {
275 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); 254 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
276 iter.Inc()) { 255 iter.Inc()) {
277 ModelType type = iter.Get(); 256 ModelType type = iter.Get();
278 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); 257 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
279 DCHECK(route != current_routes.end()); 258 DCHECK(route != current_routes.end());
280 ModelSafeGroup group = route->second; 259 ModelSafeGroup group = route->second;
281 (*result_routes)[type] = group; 260 (*result_routes)[type] = group;
282 } 261 }
283 } 262 }
284 263
285 } // namespace. 264 } // namespace.
286 265
287 bool SyncSchedulerImpl::ScheduleConfiguration( 266 bool SyncSchedulerImpl::ScheduleConfiguration(
288 const ConfigurationParams& params) { 267 const ConfigurationParams& params) {
289 DCHECK_EQ(MessageLoop::current(), sync_loop_); 268 DCHECK(CalledOnValidThread());
290 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
291 DCHECK_EQ(CONFIGURATION_MODE, mode_); 270 DCHECK_EQ(CONFIGURATION_MODE, mode_);
292 DCHECK(!params.ready_task.is_null()); 271 DCHECK(!params.ready_task.is_null());
293 CHECK(started_) << "Scheduler must be running to configure."; 272 CHECK(started_) << "Scheduler must be running to configure.";
294 SDVLOG(2) << "Reconfiguring syncer."; 273 SDVLOG(2) << "Reconfiguring syncer.";
295 274
296 // Only one configuration is allowed at a time. Verify we're not waiting 275 // Only one configuration is allowed at a time. Verify we're not waiting
297 // for a pending configure job. 276 // for a pending configure job.
298 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); 277 DCHECK(!pending_configure_job_);
299 278
300 ModelSafeRoutingInfo restricted_routes; 279 ModelSafeRoutingInfo restricted_routes;
301 BuildModelSafeParams(params.types_to_download, 280 BuildModelSafeParams(params.types_to_download,
302 params.routing_info, 281 params.routing_info,
303 &restricted_routes); 282 &restricted_routes);
304 session_context_->set_routing_info(restricted_routes); 283 session_context_->set_routing_info(restricted_routes);
305 284
306 // Only reconfigure if we have types to download. 285 // Only reconfigure if we have types to download.
307 if (!params.types_to_download.Empty()) { 286 if (!params.types_to_download.Empty()) {
308 DCHECK(!restricted_routes.empty()); 287 DCHECK(!restricted_routes.empty());
309 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 288 pending_configure_job_.reset(new SyncSessionJob(
310 SyncSessionJob::CONFIGURATION, 289 SyncSessionJob::CONFIGURATION,
311 TimeTicks::Now(), 290 TimeTicks::Now(),
312 SyncSourceInfo(params.source, 291 SyncSourceInfo(params.source,
313 ModelSafeRoutingInfoToInvalidationMap( 292 ModelSafeRoutingInfoToInvalidationMap(
314 restricted_routes, 293 restricted_routes,
315 std::string())), 294 std::string())),
316 params)); 295 params));
317 bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); 296 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
318 297
319 // If we failed, the job would have been saved as the pending configure 298 // If we failed, the job would have been saved as the pending configure
320 // job and a wait interval would have been set. 299 // job and a wait interval would have been set.
321 if (!succeeded) { 300 if (!succeeded) {
322 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); 301 DCHECK(pending_configure_job_);
323 return false; 302 return false;
303 } else {
304 DCHECK(!pending_configure_job_);
324 } 305 }
325 } else { 306 } else {
326 SDVLOG(2) << "No change in routing info, calling ready task directly."; 307 SDVLOG(2) << "No change in routing info, calling ready task directly.";
327 params.ready_task.Run(); 308 params.ready_task.Run();
328 } 309 }
329 310
330 return true; 311 return true;
331 } 312 }
332 313
333 SyncSchedulerImpl::JobProcessDecision 314 SyncSchedulerImpl::JobProcessDecision
334 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, 315 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job,
335 JobPriority priority) { 316 JobPriority priority) {
336 DCHECK_EQ(MessageLoop::current(), sync_loop_); 317 DCHECK(CalledOnValidThread());
337 DCHECK(wait_interval_.get()); 318 DCHECK(wait_interval_.get());
338 DCHECK_NE(job.purpose(), SyncSessionJob::POLL); 319 DCHECK_NE(job.purpose(), SyncSessionJob::POLL);
339 320
340 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 321 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
341 << WaitInterval::GetModeString(wait_interval_->mode) 322 << WaitInterval::GetModeString(wait_interval_->mode)
342 << (wait_interval_->had_nudge ? " (had nudge)" : "")
343 << ((priority == CANARY_PRIORITY) ? " (canary)" : ""); 323 << ((priority == CANARY_PRIORITY) ? " (canary)" : "");
344 324
345 // If we save a job while in a WaitInterval, there is a well-defined moment 325 // If we save a job while in a WaitInterval, there is a well-defined moment
346 // in time in the future when it makes sense for that SAVE-worthy job to try 326 // in time in the future when it makes sense for that SAVE-worthy job to try
347 // running again -- the end of the WaitInterval. 327 // running again -- the end of the WaitInterval.
348 DCHECK(job.purpose() == SyncSessionJob::NUDGE || 328 DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
349 job.purpose() == SyncSessionJob::CONFIGURATION); 329 job.purpose() == SyncSessionJob::CONFIGURATION);
350 330
351 // If throttled, there's a clock ticking to unthrottle. We want to get 331 // If throttled, there's a clock ticking to unthrottle. We want to get
352 // on the same train. 332 // on the same train.
353 if (wait_interval_->mode == WaitInterval::THROTTLED) 333 if (wait_interval_->mode == WaitInterval::THROTTLED)
354 return SAVE; 334 return SAVE;
355 335
356 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 336 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
357 if (job.purpose() == SyncSessionJob::NUDGE) { 337 if (job.purpose() == SyncSessionJob::NUDGE) {
358 if (mode_ == CONFIGURATION_MODE) 338 if (mode_ == CONFIGURATION_MODE)
359 return SAVE; 339 return SAVE;
360 340
361 // If we already had one nudge then just drop this nudge. We will retry
362 // later when the timer runs out.
363 if (priority == NORMAL_PRIORITY) 341 if (priority == NORMAL_PRIORITY)
364 return wait_interval_->had_nudge ? DROP : CONTINUE; 342 return DROP;
365 else // We are here because timer ran out. So retry. 343 else // Either backoff has ended, or we have permission to bypass it.
366 return CONTINUE; 344 return CONTINUE;
367 } 345 }
368 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; 346 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE;
369 } 347 }
370 348
371 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 349 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
372 const SyncSessionJob& job, 350 const SyncSessionJob& job,
373 JobPriority priority) { 351 JobPriority priority) {
374 DCHECK_EQ(MessageLoop::current(), sync_loop_); 352 DCHECK(CalledOnValidThread());
375 353
376 // POLL jobs do not call this function. 354 // POLL jobs do not call this function.
377 DCHECK(job.purpose() == SyncSessionJob::NUDGE || 355 DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
378 job.purpose() == SyncSessionJob::CONFIGURATION); 356 job.purpose() == SyncSessionJob::CONFIGURATION);
379 357
380 // See if our type is throttled. 358 // See if our type is throttled.
381 ModelTypeSet throttled_types = 359 ModelTypeSet throttled_types =
382 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 360 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
383 if (job.purpose() == SyncSessionJob::NUDGE && 361 if (job.purpose() == SyncSessionJob::NUDGE &&
384 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { 362 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) {
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
442 // Decision now rests on state of auth tokens. 420 // Decision now rests on state of auth tokens.
443 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 421 if (!session_context_->connection_manager()->HasInvalidAuthToken())
444 return CONTINUE; 422 return CONTINUE;
445 423
446 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 424 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
447 // Running the job would require updated auth, so we can't honour 425 // Running the job would require updated auth, so we can't honour
448 // job.scheduled_start(). 426 // job.scheduled_start().
449 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; 427 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
450 } 428 }
451 429
452 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
453 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
454 if (is_nudge && pending_nudge_) {
455 SDVLOG(2) << "Coalescing a pending nudge";
456 // TODO(tim): This basically means we never use the more-careful coalescing
457 // logic in ScheduleNudgeImpl that takes the min of the two nudge start
458 // times, because we're calling this function first. Pull this out
459 // into a function to coalesce + set start times and reuse.
460 pending_nudge_->CoalesceSources(job->source_info());
461 return;
462 }
463
464 scoped_ptr<SyncSessionJob> job_to_save = job->Clone();
465 if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
466 // This job should be made the new canary.
467 if (is_nudge) {
468 pending_nudge_ = job_to_save.get();
469 } else {
470 SDVLOG(2) << "Saving a configuration job";
471 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
472 DCHECK(!wait_interval_->pending_configure_job);
473 DCHECK_EQ(mode_, CONFIGURATION_MODE);
474 DCHECK(!job->config_params().ready_task.is_null());
475 // The only nudge that could exist is a scheduled canary nudge.
476 DCHECK(!unscheduled_nudge_storage_.get());
477 if (pending_nudge_) {
478 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
479 unscheduled_nudge_storage_ = pending_nudge_->Clone();
480 pending_nudge_ = unscheduled_nudge_storage_.get();
481 }
482 wait_interval_->pending_configure_job = job_to_save.get();
483 }
484 TimeDelta length =
485 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
486 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
487 TimeDelta::FromSeconds(0) : length;
488 RestartWaiting(job_to_save.Pass());
489 return;
490 }
491
492 // Note that today there are no cases where we SAVE a CONFIGURATION job
493 // when we're not in a WaitInterval. See bug 147736.
494 DCHECK(is_nudge);
495 // There may or may not be a pending_configure_job. Either way this nudge
496 // is unschedulable.
497 pending_nudge_ = job_to_save.get();
498 unscheduled_nudge_storage_ = job_to_save.Pass();
499 }
500
501 // Functor for std::find_if to search by ModelSafeGroup.
502 struct ModelSafeWorkerGroupIs {
503 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
504 bool operator()(ModelSafeWorker* w) {
505 return group == w->GetModelSafeGroup();
506 }
507 ModelSafeGroup group;
508 };
509
510 void SyncSchedulerImpl::ScheduleNudgeAsync( 430 void SyncSchedulerImpl::ScheduleNudgeAsync(
511 const TimeDelta& desired_delay, 431 const TimeDelta& desired_delay,
512 NudgeSource source, ModelTypeSet types, 432 NudgeSource source, ModelTypeSet types,
513 const tracked_objects::Location& nudge_location) { 433 const tracked_objects::Location& nudge_location) {
514 DCHECK_EQ(MessageLoop::current(), sync_loop_); 434 DCHECK(CalledOnValidThread());
515 SDVLOG_LOC(nudge_location, 2) 435 SDVLOG_LOC(nudge_location, 2)
516 << "Nudge scheduled with delay " 436 << "Nudge scheduled with delay "
517 << desired_delay.InMilliseconds() << " ms, " 437 << desired_delay.InMilliseconds() << " ms, "
518 << "source " << GetNudgeSourceString(source) << ", " 438 << "source " << GetNudgeSourceString(source) << ", "
519 << "types " << ModelTypeSetToString(types); 439 << "types " << ModelTypeSetToString(types);
520 440
521 ModelTypeInvalidationMap invalidation_map = 441 ModelTypeInvalidationMap invalidation_map =
522 ModelTypeSetToInvalidationMap(types, std::string()); 442 ModelTypeSetToInvalidationMap(types, std::string());
523 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, 443 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
524 GetUpdatesFromNudgeSource(source), 444 GetUpdatesFromNudgeSource(source),
525 invalidation_map, 445 invalidation_map,
526 nudge_location); 446 nudge_location);
527 } 447 }
528 448
529 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( 449 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
530 const TimeDelta& desired_delay, 450 const TimeDelta& desired_delay,
531 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, 451 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
532 const tracked_objects::Location& nudge_location) { 452 const tracked_objects::Location& nudge_location) {
533 DCHECK_EQ(MessageLoop::current(), sync_loop_); 453 DCHECK(CalledOnValidThread());
534 SDVLOG_LOC(nudge_location, 2) 454 SDVLOG_LOC(nudge_location, 2)
535 << "Nudge scheduled with delay " 455 << "Nudge scheduled with delay "
536 << desired_delay.InMilliseconds() << " ms, " 456 << desired_delay.InMilliseconds() << " ms, "
537 << "source " << GetNudgeSourceString(source) << ", " 457 << "source " << GetNudgeSourceString(source) << ", "
538 << "payloads " 458 << "payloads "
539 << ModelTypeInvalidationMapToString(invalidation_map); 459 << ModelTypeInvalidationMapToString(invalidation_map);
540 460
541 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, 461 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
542 GetUpdatesFromNudgeSource(source), 462 GetUpdatesFromNudgeSource(source),
543 invalidation_map, 463 invalidation_map,
544 nudge_location); 464 nudge_location);
545 } 465 }
546 466
547 467
548 // TODO(zea): Consider adding separate throttling/backoff for datatype 468 // TODO(zea): Consider adding separate throttling/backoff for datatype
549 // refresh requests. 469 // refresh requests.
550 void SyncSchedulerImpl::ScheduleNudgeImpl( 470 void SyncSchedulerImpl::ScheduleNudgeImpl(
551 const TimeDelta& delay, 471 const TimeDelta& delay,
552 GetUpdatesCallerInfo::GetUpdatesSource source, 472 GetUpdatesCallerInfo::GetUpdatesSource source,
553 const ModelTypeInvalidationMap& invalidation_map, 473 const ModelTypeInvalidationMap& invalidation_map,
554 const tracked_objects::Location& nudge_location) { 474 const tracked_objects::Location& nudge_location) {
555 DCHECK_EQ(MessageLoop::current(), sync_loop_); 475 DCHECK(CalledOnValidThread());
556 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; 476 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
557 477
558 if (no_scheduling_allowed_) { 478 if (no_scheduling_allowed_) {
559 NOTREACHED() << "Illegal to schedule job while session in progress."; 479 NOTREACHED() << "Illegal to schedule job while session in progress.";
560 return; 480 return;
561 } 481 }
562 482
563 if (!started_) { 483 if (!started_) {
564 SDVLOG_LOC(nudge_location, 2) 484 SDVLOG_LOC(nudge_location, 2)
565 << "Dropping nudge, scheduler is not running."; 485 << "Dropping nudge, scheduler is not running.";
(...skipping 13 matching lines...) Expand all
579 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( 499 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
580 SyncSessionJob::NUDGE, 500 SyncSessionJob::NUDGE,
581 TimeTicks::Now() + delay, 501 TimeTicks::Now() + delay,
582 info, 502 info,
583 ConfigurationParams())); 503 ConfigurationParams()));
584 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); 504 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY);
585 SDVLOG(2) << "Should run " 505 SDVLOG(2) << "Should run "
586 << SyncSessionJob::GetPurposeString(job->purpose()) 506 << SyncSessionJob::GetPurposeString(job->purpose())
587 << " in mode " << GetModeString(mode_) 507 << " in mode " << GetModeString(mode_)
588 << ": " << GetDecisionString(decision); 508 << ": " << GetDecisionString(decision);
589 if (decision != CONTINUE) { 509 if (decision == DROP) {
590 // End of the line, though we may save the job for later.
591 if (decision == SAVE) {
592 HandleSaveJobDecision(job.Pass());
593 } else {
594 DCHECK_EQ(decision, DROP);
595 }
596 return; 510 return;
597 } 511 }
598 512
599 if (pending_nudge_) { 513 // Try to coalesce in both SAVE and CONTINUE cases.
600 SDVLOG(2) << "Rescheduling pending nudge"; 514 if (pending_nudge_job_) {
601 pending_nudge_->CoalesceSources(job->source_info()); 515 pending_nudge_job_->CoalesceSources(job->source_info());
602 // Choose the start time as the earliest of the 2. Note that this means 516 if (decision == CONTINUE) {
603 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) 517 // Only update the scheduled_start if we're going to reschedule.
604 // but a nudge is already scheduled to go out, we'll send the (tab) commit 518 pending_nudge_job_->set_scheduled_start(
605 // without waiting. 519 std::min(job->scheduled_start(),
606 pending_nudge_->set_scheduled_start( 520 pending_nudge_job_->scheduled_start()));
607 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); 521 }
608 // Abandon the old task by cloning and replacing the session. 522 } else {
609 // It's possible that by "rescheduling" we're actually taking a job that 523 pending_nudge_job_ = job.Pass();
610 // was previously unscheduled and giving it wings, so take care to reset
611 // unscheduled nudge storage.
612 job = pending_nudge_->Clone();
613 pending_nudge_ = NULL;
614 unscheduled_nudge_storage_.reset();
615 // It's also possible we took a canary job, since we allow one nudge
616 // per backoff interval.
617 DCHECK(!wait_interval_ || !wait_interval_->had_nudge);
618 } 524 }
619 525
620 TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); 526 if (decision == SAVE) {
527 return;
528 }
529
530 TimeDelta run_delay =
531 pending_nudge_job_->scheduled_start() - TimeTicks::Now();
621 if (run_delay < TimeDelta::FromMilliseconds(0)) 532 if (run_delay < TimeDelta::FromMilliseconds(0))
622 run_delay = TimeDelta::FromMilliseconds(0); 533 run_delay = TimeDelta::FromMilliseconds(0);
623 SDVLOG_LOC(nudge_location, 2) 534 SDVLOG_LOC(nudge_location, 2)
624 << "Scheduling a nudge with " 535 << "Scheduling a nudge with "
625 << run_delay.InMilliseconds() << " ms delay"; 536 << run_delay.InMilliseconds() << " ms delay";
626 537
627 pending_nudge_ = job.get(); 538 if (started_) {
628 PostDelayedTask(nudge_location, "DoSyncSessionJob", 539 pending_wakeup_timer_.Start(
629 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), 540 nudge_location,
630 weak_ptr_factory_.GetWeakPtr(), 541 run_delay,
631 base::Passed(&job), 542 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob,
632 NORMAL_PRIORITY), 543 weak_ptr_factory_.GetWeakPtr(),
633 run_delay); 544 NORMAL_PRIORITY));
545 }
634 } 546 }
635 547
636 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 548 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
637 switch (mode) { 549 switch (mode) {
638 ENUM_CASE(CONFIGURATION_MODE); 550 ENUM_CASE(CONFIGURATION_MODE);
639 ENUM_CASE(NORMAL_MODE); 551 ENUM_CASE(NORMAL_MODE);
640 } 552 }
641 return ""; 553 return "";
642 } 554 }
643 555
644 const char* SyncSchedulerImpl::GetDecisionString( 556 const char* SyncSchedulerImpl::GetDecisionString(
645 SyncSchedulerImpl::JobProcessDecision mode) { 557 SyncSchedulerImpl::JobProcessDecision mode) {
646 switch (mode) { 558 switch (mode) {
647 ENUM_CASE(CONTINUE); 559 ENUM_CASE(CONTINUE);
648 ENUM_CASE(SAVE); 560 ENUM_CASE(SAVE);
649 ENUM_CASE(DROP); 561 ENUM_CASE(DROP);
650 } 562 }
651 return ""; 563 return "";
652 } 564 }
653 565
654 void SyncSchedulerImpl::PostDelayedTask( 566 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job,
655 const tracked_objects::Location& from_here, 567 JobPriority priority) {
656 const char* name, const base::Closure& task, base::TimeDelta delay) { 568 DCHECK(CalledOnValidThread());
657 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
658 << delay.InMilliseconds() << " ms delay";
659 DCHECK_EQ(MessageLoop::current(), sync_loop_);
660 if (!started_) {
661 SDVLOG(1) << "Not posting task as scheduler is stopped.";
662 return;
663 }
664 // This cancels the previous task, if one existed.
665 pending_wakeup_.Reset(task);
666 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay);
667 }
668
669 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job,
670 JobPriority priority) {
671 DCHECK_EQ(MessageLoop::current(), sync_loop_);
672 if (job->purpose() == SyncSessionJob::NUDGE) {
673 pending_nudge_ = NULL;
674 }
675 569
676 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 570 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
677 JobProcessDecision decision = DecideOnJob(*job, priority); 571 JobProcessDecision decision = DecideOnJob(*job, priority);
678 SDVLOG(2) << "Should run " 572 SDVLOG(2) << "Should run "
679 << SyncSessionJob::GetPurposeString(job->purpose()) 573 << SyncSessionJob::GetPurposeString(job->purpose())
680 << " in mode " << GetModeString(mode_) 574 << " in mode " << GetModeString(mode_)
681 << " with source " << job->source_info().updates_source 575 << " with source " << job->source_info().updates_source
682 << ": " << GetDecisionString(decision); 576 << ": " << GetDecisionString(decision);
683 if (decision != CONTINUE) { 577 if (decision != CONTINUE) {
684 if (decision == SAVE) { 578 if (decision == SAVE) {
685 HandleSaveJobDecision(job.Pass()); 579 if (job->purpose() == SyncSessionJob::CONFIGURATION) {
580 pending_configure_job_ = job.Pass();
581 } else {
582 pending_nudge_job_ = job.Pass();
583 }
686 } else { 584 } else {
687 DCHECK_EQ(decision, DROP); 585 DCHECK_EQ(decision, DROP);
688 } 586 }
689 return false; 587 return false;
690 } 588 }
691 589
692 DVLOG(2) << "Creating sync session with routes " 590 DVLOG(2) << "Creating sync session with routes "
693 << ModelSafeRoutingInfoToString(session_context_->routing_info()) 591 << ModelSafeRoutingInfoToString(session_context_->routing_info())
694 << "and purpose " << job->purpose(); 592 << "and purpose " << job->purpose();
695 SyncSession session(session_context_, this, job->source_info()); 593 SyncSession session(session_context_, this, job->source_info());
696 bool premature_exit = !syncer_->SyncShare(&session, 594 bool premature_exit = !syncer_->SyncShare(&session,
697 job->start_step(), 595 job->start_step(),
698 job->end_step()); 596 job->end_step());
699 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; 597 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
700 598
701 bool success = FinishSyncSessionJob(job.get(), 599 bool success = FinishSyncSessionJob(job.get(),
702 premature_exit, 600 premature_exit,
703 &session); 601 &session);
704 602
705 if (IsSyncingCurrentlySilenced()) { 603 if (IsSyncingCurrentlySilenced()) {
706 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; 604 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
707 // If we're here, it's because |job| was silenced until a server specified 605 // If we're here, it's because |job| was silenced until a server specified
708 // time. (Note, it had to be |job|, because DecideOnJob would not permit 606 // time. (Note, it had to be |job|, because DecideOnJob would not permit
709 // any job through while in WaitInterval::THROTTLED). 607 // any job through while in WaitInterval::THROTTLED).
710 scoped_ptr<SyncSessionJob> clone = job->Clone(); 608 if (job->purpose() == SyncSessionJob::NUDGE)
711 if (clone->purpose() == SyncSessionJob::NUDGE) 609 pending_nudge_job_ = job.Pass();
712 pending_nudge_ = clone.get(); 610 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
713 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) 611 pending_configure_job_ = job.Pass();
714 wait_interval_->pending_configure_job = clone.get();
715 else 612 else
716 NOTREACHED(); 613 NOTREACHED();
717 614
718 RestartWaiting(clone.Pass()); 615 RestartWaiting();
719 return success; 616 return success;
720 } 617 }
721 618
722 if (!success) 619 if (!success)
723 ScheduleNextSync(job.Pass(), &session); 620 ScheduleNextSync(job.Pass(), &session);
724 621
725 return success; 622 return success;
726 } 623 }
727 624
625 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
626 DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority);
627 }
628
629 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
630 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority);
631 }
632
728 bool SyncSchedulerImpl::ShouldPoll() { 633 bool SyncSchedulerImpl::ShouldPoll() {
729 if (wait_interval_.get()) { 634 if (wait_interval_.get()) {
730 SDVLOG(2) << "Not running poll in wait interval."; 635 SDVLOG(2) << "Not running poll in wait interval.";
731 return false; 636 return false;
732 } 637 }
733 638
734 if (mode_ == CONFIGURATION_MODE) { 639 if (mode_ == CONFIGURATION_MODE) {
735 SDVLOG(2) << "Not running poll in configuration mode."; 640 SDVLOG(2) << "Not running poll in configuration mode.";
736 return false; 641 return false;
737 } 642 }
738 643
739 // TODO(rlarocque): Refactor decision-making logic common to all types 644 // TODO(rlarocque): Refactor decision-making logic common to all types
740 // of jobs into a shared function. 645 // of jobs into a shared function.
741 646
742 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 647 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
743 SDVLOG(2) << "Not running poll because auth token is invalid."; 648 SDVLOG(2) << "Not running poll because auth token is invalid.";
744 return false; 649 return false;
745 } 650 }
746 651
747 return true; 652 return true;
748 } 653 }
749 654
750 void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { 655 void SyncSchedulerImpl::DoPollSyncSessionJob() {
751 DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); 656 ModelSafeRoutingInfo r;
657 ModelTypeInvalidationMap invalidation_map =
658 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
659 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
660 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
661 TimeTicks::Now(),
662 info,
663 ConfigurationParams()));
752 664
753 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 665 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
754 666
755 if (!ShouldPoll()) 667 if (!ShouldPoll())
756 return; 668 return;
757 669
758 DVLOG(2) << "Polling with routes " 670 DVLOG(2) << "Polling with routes "
759 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 671 << ModelSafeRoutingInfoToString(session_context_->routing_info());
760 SyncSession session(session_context_, this, job->source_info()); 672 SyncSession session(session_context_, this, job->source_info());
761 bool premature_exit = !syncer_->SyncShare(&session, 673 bool premature_exit = !syncer_->SyncShare(&session,
762 job->start_step(), 674 job->start_step(),
763 job->end_step()); 675 job->end_step());
764 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; 676 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit;
765 677
766 FinishSyncSessionJob(job.get(), premature_exit, &session); 678 FinishSyncSessionJob(job.get(), premature_exit, &session);
767 679
768 if (IsSyncingCurrentlySilenced()) { 680 if (IsSyncingCurrentlySilenced()) {
769 // This will start the countdown to unthrottle. Other kinds of jobs would 681 // Normally we would only call RestartWaiting() if we had a
770 // schedule themselves as the post-unthrottle canary. A poll job is not 682 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's
771 // that urgent, so it does not get to be the canary. We still need to start 683 // possible that neither is set. We create the wait interval anyway because
772 // the timer regardless. Otherwise there could be no one to clear the 684 // we need it to make sure we get unthrottled on time.
773 // WaitInterval when the throttling expires. 685 RestartWaiting();
774 RestartWaiting(scoped_ptr<SyncSessionJob>());
775 } 686 }
776 } 687 }
777 688
778 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 689 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
779 DCHECK_EQ(MessageLoop::current(), sync_loop_); 690 DCHECK(CalledOnValidThread());
780 691
781 // We are interested in recording time between local nudges for datatypes. 692 // We are interested in recording time between local nudges for datatypes.
782 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 693 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
783 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 694 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
784 return; 695 return;
785 696
786 base::TimeTicks now = TimeTicks::Now(); 697 base::TimeTicks now = TimeTicks::Now();
787 // Update timing information for how often datatypes are triggering nudges. 698 // Update timing information for how often datatypes are triggering nudges.
788 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); 699 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin();
789 iter != info.types.end(); 700 iter != info.types.end();
790 ++iter) { 701 ++iter) {
791 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 702 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
792 last_local_nudges_by_model_type_[iter->first] = now; 703 last_local_nudges_by_model_type_[iter->first] = now;
793 if (previous.is_null()) 704 if (previous.is_null())
794 continue; 705 continue;
795 706
796 #define PER_DATA_TYPE_MACRO(type_str) \ 707 #define PER_DATA_TYPE_MACRO(type_str) \
797 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 708 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
798 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 709 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
799 #undef PER_DATA_TYPE_MACRO 710 #undef PER_DATA_TYPE_MACRO
800 } 711 }
801 } 712 }
802 713
803 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, 714 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
804 bool exited_prematurely, 715 bool exited_prematurely,
805 SyncSession* session) { 716 SyncSession* session) {
806 DCHECK_EQ(MessageLoop::current(), sync_loop_); 717 DCHECK(CalledOnValidThread());
807 718
808 // Let job know that we're through syncing (calling SyncShare) at this point. 719 // Let job know that we're through syncing (calling SyncShare) at this point.
809 bool succeeded = false; 720 bool succeeded = false;
810 { 721 {
811 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 722 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
812 succeeded = job->Finish(exited_prematurely, session); 723 succeeded = job->Finish(exited_prematurely, session);
813 } 724 }
814 725
815 SDVLOG(2) << "Updating the next polling time after SyncMain"; 726 SDVLOG(2) << "Updating the next polling time after SyncMain";
816 727
817 AdjustPolling(job); 728 AdjustPolling(job);
818 729
819 if (succeeded) { 730 if (succeeded) {
820 // No job currently supported by the scheduler could succeed without 731 // No job currently supported by the scheduler could succeed without
821 // successfully reaching the server. Therefore, if we make it here, it is 732 // successfully reaching the server. Therefore, if we make it here, it is
822 // appropriate to reset the backoff interval. 733 // appropriate to reset the backoff interval.
823 wait_interval_.reset(); 734 wait_interval_.reset();
824 NotifyRetryTime(base::Time()); 735 NotifyRetryTime(base::Time());
825 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 736 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
826 } 737 }
827 738
828 return succeeded; 739 return succeeded;
829 } 740 }
830 741
831 void SyncSchedulerImpl::ScheduleNextSync( 742 void SyncSchedulerImpl::ScheduleNextSync(
832 scoped_ptr<SyncSessionJob> finished_job, 743 scoped_ptr<SyncSessionJob> finished_job,
833 SyncSession* session) { 744 SyncSession* session) {
834 DCHECK_EQ(MessageLoop::current(), sync_loop_); 745 DCHECK(CalledOnValidThread());
835 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION 746 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION
836 || finished_job->purpose() == SyncSessionJob::NUDGE); 747 || finished_job->purpose() == SyncSessionJob::NUDGE);
837 748
838 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 749 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
839 // if we don't succeed. Some types of errors are not likely to disappear on 750 // if we don't succeed. Some types of errors are not likely to disappear on
840 // their own. With the return values now available in the old_job.session, 751 // their own. With the return values now available in the old_job.session,
841 // we should be able to detect such errors and only retry when we detect 752 // we should be able to detect such errors and only retry when we detect
842 // transient errors. 753 // transient errors.
843 754
844 if (IsBackingOff() && wait_interval_->timer.IsRunning() && 755 SDVLOG(2) << "SyncShare job failed; will start or update backoff";
845 mode_ == NORMAL_MODE) { 756 HandleContinuationError(finished_job.Pass(), session);
846 // When in normal mode, we allow up to one nudge per backoff interval. It
847 // appears that this was our nudge for this interval, and it failed.
848 //
849 // Note: This does not prevent us from running canary jobs. For example,
850 // an IP address change might still result in another nudge being executed
851 // during this backoff interval.
852 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
853 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
854 DCHECK(!wait_interval_->had_nudge);
855
856 wait_interval_->had_nudge = true;
857 DCHECK(!pending_nudge_);
858
859 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
860 pending_nudge_ = new_job.get();
861 RestartWaiting(new_job.Pass());
862 } else {
863 // Either this is the first failure or a consecutive failure after our
864 // backoff timer expired. We handle it the same way in either case.
865 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
866 HandleContinuationError(finished_job.Pass(), session);
867 }
868 } 757 }
869 758
870 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 759 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
871 DCHECK_EQ(MessageLoop::current(), sync_loop_); 760 DCHECK(CalledOnValidThread());
872 761
873 TimeDelta poll = (!session_context_->notifications_enabled()) ? 762 TimeDelta poll = (!session_context_->notifications_enabled()) ?
874 syncer_short_poll_interval_seconds_ : 763 syncer_short_poll_interval_seconds_ :
875 syncer_long_poll_interval_seconds_; 764 syncer_long_poll_interval_seconds_;
876 bool rate_changed = !poll_timer_.IsRunning() || 765 bool rate_changed = !poll_timer_.IsRunning() ||
877 poll != poll_timer_.GetCurrentDelay(); 766 poll != poll_timer_.GetCurrentDelay();
878 767
879 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) 768 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
880 poll_timer_.Reset(); 769 poll_timer_.Reset();
881 770
882 if (!rate_changed) 771 if (!rate_changed)
883 return; 772 return;
884 773
885 // Adjust poll rate. 774 // Adjust poll rate.
886 poll_timer_.Stop(); 775 poll_timer_.Stop();
887 poll_timer_.Start(FROM_HERE, poll, this, 776 poll_timer_.Start(FROM_HERE, poll, this,
888 &SyncSchedulerImpl::PollTimerCallback); 777 &SyncSchedulerImpl::PollTimerCallback);
889 } 778 }
890 779
891 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { 780 void SyncSchedulerImpl::RestartWaiting() {
892 CHECK(wait_interval_.get()); 781 CHECK(wait_interval_.get());
893 wait_interval_->timer.Stop();
894 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 782 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
895 if (wait_interval_->mode == WaitInterval::THROTTLED) { 783 if (wait_interval_->mode == WaitInterval::THROTTLED) {
896 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, 784 pending_wakeup_timer_.Start(
897 weak_ptr_factory_.GetWeakPtr(), 785 FROM_HERE,
898 base::Passed(&job))); 786 wait_interval_->length,
899 787 base::Bind(&SyncSchedulerImpl::Unthrottle,
788 weak_ptr_factory_.GetWeakPtr()));
900 } else { 789 } else {
901 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, 790 pending_wakeup_timer_.Start(
902 weak_ptr_factory_.GetWeakPtr(), 791 FROM_HERE,
903 base::Passed(&job))); 792 wait_interval_->length,
793 base::Bind(&SyncSchedulerImpl::TryCanaryJob,
794 weak_ptr_factory_.GetWeakPtr()));
904 } 795 }
905 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
906 pending_wakeup_.callback());
907 } 796 }
908 797
909 void SyncSchedulerImpl::HandleContinuationError( 798 void SyncSchedulerImpl::HandleContinuationError(
910 scoped_ptr<SyncSessionJob> old_job, 799 scoped_ptr<SyncSessionJob> old_job,
911 SyncSession* session) { 800 SyncSession* session) {
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); 801 DCHECK(CalledOnValidThread());
913 802
914 TimeDelta length = delay_provider_->GetDelay( 803 TimeDelta length = delay_provider_->GetDelay(
915 IsBackingOff() ? wait_interval_->length : 804 IsBackingOff() ? wait_interval_->length :
916 delay_provider_->GetInitialDelay( 805 delay_provider_->GetInitialDelay(
917 session->status_controller().model_neutral_state())); 806 session->status_controller().model_neutral_state()));
918 807
919 SDVLOG(2) << "In handle continuation error with " 808 SDVLOG(2) << "In handle continuation error with "
920 << SyncSessionJob::GetPurposeString(old_job->purpose()) 809 << SyncSessionJob::GetPurposeString(old_job->purpose())
921 << " job. The time delta(ms) is " 810 << " job. The time delta(ms) is "
922 << length.InMilliseconds(); 811 << length.InMilliseconds();
923 812
924 // This will reset the had_nudge variable as well.
925 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 813 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
926 length)); 814 length));
927 NotifyRetryTime(base::Time::Now() + length); 815 NotifyRetryTime(base::Time::Now() + length);
928 scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); 816 old_job->set_scheduled_start(TimeTicks::Now() + length);
929 new_job->set_scheduled_start(TimeTicks::Now() + length);
930 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { 817 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
931 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 818 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
932 // Config params should always get set. 819 // Config params should always get set.
933 DCHECK(!old_job->config_params().ready_task.is_null()); 820 DCHECK(!old_job->config_params().ready_task.is_null());
934 wait_interval_->pending_configure_job = new_job.get(); 821 DCHECK(!pending_configure_job_);
822 pending_configure_job_ = old_job.Pass();
935 } else { 823 } else {
936 // We are not in configuration mode. So wait_interval's pending job 824 // We're not in configure mode so we should not have a configure job.
937 // should be null. 825 DCHECK(!pending_configure_job_);
938 DCHECK(wait_interval_->pending_configure_job == NULL); 826 DCHECK(!pending_nudge_job_);
939 DCHECK(!pending_nudge_); 827 pending_nudge_job_ = old_job.Pass();
940 pending_nudge_ = new_job.get();
941 } 828 }
942 829
943 RestartWaiting(new_job.Pass()); 830 RestartWaiting();
944 } 831 }
945 832
946 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 833 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
947 syncer_->RequestEarlyExit(); // Safe to call from any thread. 834 syncer_->RequestEarlyExit(); // Safe to call from any thread.
948 DCHECK(weak_handle_this_.IsInitialized()); 835 DCHECK(weak_handle_this_.IsInitialized());
949 SDVLOG(3) << "Posting StopImpl"; 836 SDVLOG(3) << "Posting StopImpl";
950 weak_handle_this_.Call(FROM_HERE, 837 weak_handle_this_.Call(FROM_HERE,
951 &SyncSchedulerImpl::StopImpl, 838 &SyncSchedulerImpl::StopImpl,
952 callback); 839 callback);
953 } 840 }
954 841
955 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 842 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
956 DCHECK_EQ(MessageLoop::current(), sync_loop_); 843 DCHECK(CalledOnValidThread());
957 SDVLOG(2) << "StopImpl called"; 844 SDVLOG(2) << "StopImpl called";
958 845
959 // Kill any in-flight method calls. 846 // Kill any in-flight method calls.
960 weak_ptr_factory_.InvalidateWeakPtrs(); 847 weak_ptr_factory_.InvalidateWeakPtrs();
961 wait_interval_.reset(); 848 wait_interval_.reset();
962 NotifyRetryTime(base::Time()); 849 NotifyRetryTime(base::Time());
963 poll_timer_.Stop(); 850 poll_timer_.Stop();
964 pending_nudge_ = NULL; 851 pending_wakeup_timer_.Stop();
965 unscheduled_nudge_storage_.reset(); 852 pending_nudge_job_.reset();
966 pending_wakeup_.Cancel(); 853 pending_configure_job_.reset();
967 if (started_) { 854 if (started_) {
968 started_ = false; 855 started_ = false;
969 } 856 }
970 if (!callback.is_null()) 857 if (!callback.is_null())
971 callback.Run(); 858 callback.Run();
972 } 859 }
973 860
974 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { 861 // This is the only place where we invoke DoSyncSessionJob with canary
975 DCHECK_EQ(MessageLoop::current(), sync_loop_); 862 // privileges. Everyone else should use NORMAL_PRIORITY.
976 SDVLOG(2) << "Do canary job"; 863 void SyncSchedulerImpl::TryCanaryJob() {
864 DCHECK(CalledOnValidThread());
977 865
978 // This is the only place where we invoke DoSyncSessionJob with canary 866 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) {
979 // privileges. Everyone else should use NORMAL_PRIORITY. 867 SDVLOG(2) << "Found pending configure job; will run as canary";
980 DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); 868 DoConfigurationSyncSessionJob(CANARY_PRIORITY);
981 } 869 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) {
982 870 SDVLOG(2) << "Found pending nudge job; will run as canary";
983 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { 871 DoNudgeSyncSessionJob(CANARY_PRIORITY);
984 DCHECK_EQ(MessageLoop::current(), sync_loop_); 872 } else {
985 // If we find a scheduled pending_ job, abandon the old one and return a 873 SDVLOG(2) << "Found no work to do; will not run a canary";
986 // a clone. If unscheduled, just hand over ownership.
987 scoped_ptr<SyncSessionJob> candidate;
988 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
989 && wait_interval_->pending_configure_job) {
990 SDVLOG(2) << "Found pending configure job";
991 candidate =
992 wait_interval_->pending_configure_job->Clone().Pass();
993 wait_interval_->pending_configure_job = candidate.get();
994 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
995 SDVLOG(2) << "Found pending nudge job";
996 candidate = pending_nudge_->Clone();
997 pending_nudge_ = candidate.get();
998 unscheduled_nudge_storage_.reset();
999 } 874 }
1000 // If we took a job and there's a wait interval, we took the pending canary.
1001 if (candidate && wait_interval_)
1002 wait_interval_->timer.Stop();
1003 return candidate.Pass();
1004 } 875 }
1005 876
1006 void SyncSchedulerImpl::PollTimerCallback() { 877 void SyncSchedulerImpl::PollTimerCallback() {
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); 878 DCHECK(CalledOnValidThread());
1008 ModelSafeRoutingInfo r;
1009 ModelTypeInvalidationMap invalidation_map =
1010 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
1011 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
1012 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
1013 TimeTicks::Now(),
1014 info,
1015 ConfigurationParams()));
1016 if (no_scheduling_allowed_) { 879 if (no_scheduling_allowed_) {
1017 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in 880 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
1018 // functions that are called only on the sync thread. This function is also 881 // functions that are called only on the sync thread. This function is also
1019 // called only on the sync thread, and only when it is posted by an expiring 882 // called only on the sync thread, and only when it is posted by an expiring
1020 // timer. If we find that no_scheduling_allowed_ is set here, then 883 // timer. If we find that no_scheduling_allowed_ is set here, then
1021 // something is very wrong. Maybe someone mistakenly called us directly, or 884 // something is very wrong. Maybe someone mistakenly called us directly, or
1022 // mishandled the book-keeping for no_scheduling_allowed_. 885 // mishandled the book-keeping for no_scheduling_allowed_.
1023 NOTREACHED() << "Illegal to schedule job while session in progress."; 886 NOTREACHED() << "Illegal to schedule job while session in progress.";
1024 return; 887 return;
1025 } 888 }
1026 889
1027 DoPollSyncSessionJob(job.Pass()); 890 DoPollSyncSessionJob();
1028 } 891 }
1029 892
1030 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { 893 void SyncSchedulerImpl::Unthrottle() {
1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); 894 DCHECK(CalledOnValidThread());
1032 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 895 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1033 DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() ||
1034 wait_interval_->pending_configure_job == to_be_canary.get());
1035 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
1036 << "canary.";
1037 896
1038 // We're no longer throttled, so clear the wait interval. 897 // We're no longer throttled, so clear the wait interval.
1039 wait_interval_.reset(); 898 wait_interval_.reset();
1040 NotifyRetryTime(base::Time()); 899 NotifyRetryTime(base::Time());
1041 900
1042 // We treat this as a 'canary' in the sense that it was originally scheduled 901 // We treat this as a 'canary' in the sense that it was originally scheduled
1043 // to run some time ago, failed, and we now want to retry, versus a job that 902 // to run some time ago, failed, and we now want to retry, versus a job that
1044 // was just created (e.g via ScheduleNudgeImpl). The main implication is 903 // was just created (e.g via ScheduleNudgeImpl). The main implication is
1045 // that we're careful to update routing info (etc) with such potentially 904 // that we're careful to update routing info (etc) with such potentially
1046 // stale canary jobs. 905 // stale canary jobs.
1047 if (to_be_canary.get()) { 906 TryCanaryJob();
1048 DoCanaryJob(to_be_canary.Pass());
1049 } else {
1050 DCHECK(!unscheduled_nudge_storage_.get());
1051 }
1052 } 907 }
1053 908
1054 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 909 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1055 DCHECK_EQ(MessageLoop::current(), sync_loop_); 910 DCHECK(CalledOnValidThread());
1056 session_context_->NotifyListeners(SyncEngineEvent(cause)); 911 session_context_->NotifyListeners(SyncEngineEvent(cause));
1057 } 912 }
1058 913
1059 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 914 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
1060 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); 915 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED);
1061 event.retry_time = retry_time; 916 event.retry_time = retry_time;
1062 session_context_->NotifyListeners(event); 917 session_context_->NotifyListeners(event);
1063 } 918 }
1064 919
1065 bool SyncSchedulerImpl::IsBackingOff() const { 920 bool SyncSchedulerImpl::IsBackingOff() const {
1066 DCHECK_EQ(MessageLoop::current(), sync_loop_); 921 DCHECK(CalledOnValidThread());
1067 return wait_interval_.get() && wait_interval_->mode == 922 return wait_interval_.get() && wait_interval_->mode ==
1068 WaitInterval::EXPONENTIAL_BACKOFF; 923 WaitInterval::EXPONENTIAL_BACKOFF;
1069 } 924 }
1070 925
1071 void SyncSchedulerImpl::OnSilencedUntil( 926 void SyncSchedulerImpl::OnSilencedUntil(
1072 const base::TimeTicks& silenced_until) { 927 const base::TimeTicks& silenced_until) {
1073 DCHECK_EQ(MessageLoop::current(), sync_loop_); 928 DCHECK(CalledOnValidThread());
1074 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 929 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
1075 silenced_until - TimeTicks::Now())); 930 silenced_until - TimeTicks::Now()));
1076 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 931 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
1077 } 932 }
1078 933
1079 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { 934 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
1080 DCHECK_EQ(MessageLoop::current(), sync_loop_); 935 DCHECK(CalledOnValidThread());
1081 return wait_interval_.get() && wait_interval_->mode == 936 return wait_interval_.get() && wait_interval_->mode ==
1082 WaitInterval::THROTTLED; 937 WaitInterval::THROTTLED;
1083 } 938 }
1084 939
1085 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 940 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
1086 const base::TimeDelta& new_interval) { 941 const base::TimeDelta& new_interval) {
1087 DCHECK_EQ(MessageLoop::current(), sync_loop_); 942 DCHECK(CalledOnValidThread());
1088 syncer_short_poll_interval_seconds_ = new_interval; 943 syncer_short_poll_interval_seconds_ = new_interval;
1089 } 944 }
1090 945
1091 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 946 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
1092 const base::TimeDelta& new_interval) { 947 const base::TimeDelta& new_interval) {
1093 DCHECK_EQ(MessageLoop::current(), sync_loop_); 948 DCHECK(CalledOnValidThread());
1094 syncer_long_poll_interval_seconds_ = new_interval; 949 syncer_long_poll_interval_seconds_ = new_interval;
1095 } 950 }
1096 951
1097 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( 952 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
1098 const base::TimeDelta& new_delay) { 953 const base::TimeDelta& new_delay) {
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); 954 DCHECK(CalledOnValidThread());
1100 sessions_commit_delay_ = new_delay; 955 sessions_commit_delay_ = new_delay;
1101 } 956 }
1102 957
1103 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { 958 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
1104 DCHECK_EQ(MessageLoop::current(), sync_loop_); 959 DCHECK(CalledOnValidThread());
1105 SDVLOG(2) << "OnShouldStopSyncingPermanently"; 960 SDVLOG(2) << "OnShouldStopSyncingPermanently";
1106 syncer_->RequestEarlyExit(); // Thread-safe. 961 syncer_->RequestEarlyExit(); // Thread-safe.
1107 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 962 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
1108 } 963 }
1109 964
1110 void SyncSchedulerImpl::OnActionableError( 965 void SyncSchedulerImpl::OnActionableError(
1111 const sessions::SyncSessionSnapshot& snap) { 966 const sessions::SyncSessionSnapshot& snap) {
1112 DCHECK_EQ(MessageLoop::current(), sync_loop_); 967 DCHECK(CalledOnValidThread());
1113 SDVLOG(2) << "OnActionableError"; 968 SDVLOG(2) << "OnActionableError";
1114 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); 969 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
1115 event.snapshot = snap; 970 event.snapshot = snap;
1116 session_context_->NotifyListeners(event); 971 session_context_->NotifyListeners(event);
1117 } 972 }
1118 973
1119 void SyncSchedulerImpl::OnSyncProtocolError( 974 void SyncSchedulerImpl::OnSyncProtocolError(
1120 const sessions::SyncSessionSnapshot& snapshot) { 975 const sessions::SyncSessionSnapshot& snapshot) {
1121 DCHECK_EQ(MessageLoop::current(), sync_loop_); 976 DCHECK(CalledOnValidThread());
1122 if (ShouldRequestEarlyExit( 977 if (ShouldRequestEarlyExit(
1123 snapshot.model_neutral_state().sync_protocol_error)) { 978 snapshot.model_neutral_state().sync_protocol_error)) {
1124 SDVLOG(2) << "Sync Scheduler requesting early exit."; 979 SDVLOG(2) << "Sync Scheduler requesting early exit.";
1125 syncer_->RequestEarlyExit(); // Thread-safe. 980 syncer_->RequestEarlyExit(); // Thread-safe.
1126 } 981 }
1127 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) 982 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error))
1128 OnActionableError(snapshot); 983 OnActionableError(snapshot);
1129 } 984 }
1130 985
1131 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { 986 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
1132 DCHECK_EQ(MessageLoop::current(), sync_loop_); 987 DCHECK(CalledOnValidThread());
1133 session_context_->set_notifications_enabled(notifications_enabled); 988 session_context_->set_notifications_enabled(notifications_enabled);
1134 } 989 }
1135 990
1136 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { 991 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
1137 DCHECK_EQ(MessageLoop::current(), sync_loop_); 992 DCHECK(CalledOnValidThread());
1138 return sessions_commit_delay_; 993 return sessions_commit_delay_;
1139 } 994 }
1140 995
1141 #undef SDVLOG_LOC 996 #undef SDVLOG_LOC
1142 997
1143 #undef SDVLOG 998 #undef SDVLOG
1144 999
1145 #undef SLOG 1000 #undef SLOG
1146 1001
1147 #undef ENUM_CASE 1002 #undef ENUM_CASE
1148 1003
1149 } // namespace syncer 1004 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/engine/sync_scheduler_impl.h ('k') | sync/engine/sync_scheduler_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698