OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 const base::Closure& ready_task) | 76 const base::Closure& ready_task) |
77 : source(source), | 77 : source(source), |
78 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
79 routing_info(routing_info), | 79 routing_info(routing_info), |
80 ready_task(ready_task) { | 80 ready_task(ready_task) { |
81 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
82 } | 82 } |
83 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
84 | 84 |
85 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
86 : mode(UNKNOWN), | 86 : mode(UNKNOWN) {} |
87 had_nudge(false), | |
88 pending_configure_job(NULL) {} | |
89 | 87 |
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | 88 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
91 : mode(mode), had_nudge(false), length(length), | 89 : mode(mode), length(length) {} |
92 pending_configure_job(NULL) {} | |
93 | 90 |
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 91 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
95 | 92 |
96 #define ENUM_CASE(x) case x: return #x; break; | 93 #define ENUM_CASE(x) case x: return #x; break; |
97 | 94 |
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 95 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
99 switch (mode) { | 96 switch (mode) { |
100 ENUM_CASE(UNKNOWN); | 97 ENUM_CASE(UNKNOWN); |
101 ENUM_CASE(EXPONENTIAL_BACKOFF); | 98 ENUM_CASE(EXPONENTIAL_BACKOFF); |
102 ENUM_CASE(THROTTLED); | 99 ENUM_CASE(THROTTLED); |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
153 | 150 |
154 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, | 151 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, |
155 BackoffDelayProvider* delay_provider, | 152 BackoffDelayProvider* delay_provider, |
156 sessions::SyncSessionContext* context, | 153 sessions::SyncSessionContext* context, |
157 Syncer* syncer) | 154 Syncer* syncer) |
158 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 155 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
159 weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 156 weak_ptr_factory_for_weak_handle_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
160 weak_handle_this_(MakeWeakHandle( | 157 weak_handle_this_(MakeWeakHandle( |
161 weak_ptr_factory_for_weak_handle_.GetWeakPtr())), | 158 weak_ptr_factory_for_weak_handle_.GetWeakPtr())), |
162 name_(name), | 159 name_(name), |
163 sync_loop_(MessageLoop::current()), | |
164 started_(false), | 160 started_(false), |
165 syncer_short_poll_interval_seconds_( | 161 syncer_short_poll_interval_seconds_( |
166 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 162 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
167 syncer_long_poll_interval_seconds_( | 163 syncer_long_poll_interval_seconds_( |
168 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 164 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
169 sessions_commit_delay_( | 165 sessions_commit_delay_( |
170 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 166 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
171 mode_(NORMAL_MODE), | 167 mode_(NORMAL_MODE), |
172 // Start with assuming everything is fine with the connection. | |
173 // At the end of the sync cycle we would have the correct status. | |
174 pending_nudge_(NULL), | |
175 delay_provider_(delay_provider), | 168 delay_provider_(delay_provider), |
176 syncer_(syncer), | 169 syncer_(syncer), |
177 session_context_(context), | 170 session_context_(context), |
178 no_scheduling_allowed_(false) { | 171 no_scheduling_allowed_(false) { |
179 DCHECK(sync_loop_); | |
180 } | 172 } |
181 | 173 |
182 SyncSchedulerImpl::~SyncSchedulerImpl() { | 174 SyncSchedulerImpl::~SyncSchedulerImpl() { |
183 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 175 DCHECK(CalledOnValidThread()); |
184 StopImpl(base::Closure()); | 176 StopImpl(base::Closure()); |
185 } | 177 } |
186 | 178 |
187 void SyncSchedulerImpl::OnCredentialsUpdated() { | 179 void SyncSchedulerImpl::OnCredentialsUpdated() { |
188 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 180 DCHECK(CalledOnValidThread()); |
189 | 181 |
190 if (HttpResponse::SYNC_AUTH_ERROR == | 182 if (HttpResponse::SYNC_AUTH_ERROR == |
191 session_context_->connection_manager()->server_status()) { | 183 session_context_->connection_manager()->server_status()) { |
192 OnServerConnectionErrorFixed(); | 184 OnServerConnectionErrorFixed(); |
193 } | 185 } |
194 } | 186 } |
195 | 187 |
196 void SyncSchedulerImpl::OnConnectionStatusChange() { | 188 void SyncSchedulerImpl::OnConnectionStatusChange() { |
197 if (HttpResponse::CONNECTION_UNAVAILABLE == | 189 if (HttpResponse::CONNECTION_UNAVAILABLE == |
198 session_context_->connection_manager()->server_status()) { | 190 session_context_->connection_manager()->server_status()) { |
199 // Optimistically assume that the connection is fixed and try | 191 // Optimistically assume that the connection is fixed and try |
200 // connecting. | 192 // connecting. |
201 OnServerConnectionErrorFixed(); | 193 OnServerConnectionErrorFixed(); |
202 } | 194 } |
203 } | 195 } |
204 | 196 |
205 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 197 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
206 // There could be a pending nudge or configuration job in several cases: | 198 // There could be a pending nudge or configuration job in several cases: |
207 // | 199 // |
208 // 1. We're in exponential backoff. | 200 // 1. We're in exponential backoff. |
209 // 2. We're silenced / throttled. | 201 // 2. We're silenced / throttled. |
210 // 3. A nudge was saved previously due to not having a valid auth token. | 202 // 3. A nudge was saved previously due to not having a valid auth token. |
211 // 4. A nudge was scheduled + saved while in configuration mode. | 203 // 4. A nudge was scheduled + saved while in configuration mode. |
212 // | 204 // |
213 // In all cases except (2), we want to retry contacting the server. We | 205 // In all cases except (2), we want to retry contacting the server. We |
214 // call DoCanaryJob to achieve this, and note that nothing -- not even a | 206 // call DoCanaryJob to achieve this, and note that nothing -- not even a |
215 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that | 207 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
216 // has the authority to do that is the Unthrottle timer. | 208 // has the authority to do that is the Unthrottle timer. |
217 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | 209 TryCanaryJob(); |
218 if (!pending.get()) | |
219 return; | |
220 DoCanaryJob(pending.Pass()); | |
221 } | 210 } |
222 | 211 |
223 void SyncSchedulerImpl::Start(Mode mode) { | 212 void SyncSchedulerImpl::Start(Mode mode) { |
224 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 213 DCHECK(CalledOnValidThread()); |
225 std::string thread_name = MessageLoop::current()->thread_name(); | 214 std::string thread_name = MessageLoop::current()->thread_name(); |
226 if (thread_name.empty()) | 215 if (thread_name.empty()) |
227 thread_name = "<Main thread>"; | 216 thread_name = "<Main thread>"; |
228 SDVLOG(2) << "Start called from thread " | 217 SDVLOG(2) << "Start called from thread " |
229 << thread_name << " with mode " << GetModeString(mode); | 218 << thread_name << " with mode " << GetModeString(mode); |
230 if (!started_) { | 219 if (!started_) { |
231 started_ = true; | 220 started_ = true; |
232 SendInitialSnapshot(); | 221 SendInitialSnapshot(); |
233 } | 222 } |
234 | 223 |
235 DCHECK(!session_context_->account_name().empty()); | 224 DCHECK(!session_context_->account_name().empty()); |
236 DCHECK(syncer_.get()); | 225 DCHECK(syncer_.get()); |
237 Mode old_mode = mode_; | 226 Mode old_mode = mode_; |
238 mode_ = mode; | 227 mode_ = mode; |
239 AdjustPolling(NULL); // Will kick start poll timer if needed. | 228 AdjustPolling(NULL); // Will kick start poll timer if needed. |
240 | 229 |
241 if (old_mode != mode_) { | 230 if (old_mode != mode_ && mode_ == NORMAL_MODE && pending_nudge_job_) { |
242 // We just changed our mode. See if there are any pending jobs that we could | 231 // We just got back to normal mode. Let's try to run the work that was |
243 // execute in the new mode. | 232 // queued up while we were configuring. |
244 if (mode_ == NORMAL_MODE) { | 233 DoNudgeSyncSessionJob(NORMAL_PRIORITY); |
245 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | |
246 // has not yet completed. | |
247 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | |
248 } | |
249 | |
250 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); | |
251 if (pending.get()) { | |
252 SDVLOG(2) << "Executing pending job. Good luck!"; | |
253 DoSyncSessionJob(pending.Pass(), NORMAL_PRIORITY); | |
254 } | |
255 } | 234 } |
256 } | 235 } |
257 | 236 |
258 void SyncSchedulerImpl::SendInitialSnapshot() { | 237 void SyncSchedulerImpl::SendInitialSnapshot() { |
259 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 238 DCHECK(CalledOnValidThread()); |
260 scoped_ptr<SyncSession> dummy(new SyncSession( | 239 scoped_ptr<SyncSession> dummy(new SyncSession( |
261 session_context_, this, SyncSourceInfo())); | 240 session_context_, this, SyncSourceInfo())); |
262 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 241 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
263 event.snapshot = dummy->TakeSnapshot(); | 242 event.snapshot = dummy->TakeSnapshot(); |
264 session_context_->NotifyListeners(event); | 243 session_context_->NotifyListeners(event); |
265 } | 244 } |
266 | 245 |
267 namespace { | 246 namespace { |
268 | 247 |
269 // Helper to extract the routing info corresponding to types in | 248 // Helper to extract the routing info corresponding to types in |
270 // |types_to_download| from |current_routes|. | 249 // |types_to_download| from |current_routes|. |
271 void BuildModelSafeParams( | 250 void BuildModelSafeParams( |
272 ModelTypeSet types_to_download, | 251 ModelTypeSet types_to_download, |
273 const ModelSafeRoutingInfo& current_routes, | 252 const ModelSafeRoutingInfo& current_routes, |
274 ModelSafeRoutingInfo* result_routes) { | 253 ModelSafeRoutingInfo* result_routes) { |
275 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); | 254 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); |
276 iter.Inc()) { | 255 iter.Inc()) { |
277 ModelType type = iter.Get(); | 256 ModelType type = iter.Get(); |
278 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); | 257 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); |
279 DCHECK(route != current_routes.end()); | 258 DCHECK(route != current_routes.end()); |
280 ModelSafeGroup group = route->second; | 259 ModelSafeGroup group = route->second; |
281 (*result_routes)[type] = group; | 260 (*result_routes)[type] = group; |
282 } | 261 } |
283 } | 262 } |
284 | 263 |
285 } // namespace. | 264 } // namespace. |
286 | 265 |
287 bool SyncSchedulerImpl::ScheduleConfiguration( | 266 bool SyncSchedulerImpl::ScheduleConfiguration( |
288 const ConfigurationParams& params) { | 267 const ConfigurationParams& params) { |
289 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 268 DCHECK(CalledOnValidThread()); |
290 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 269 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
291 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 270 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
292 DCHECK(!params.ready_task.is_null()); | 271 DCHECK(!params.ready_task.is_null()); |
293 CHECK(started_) << "Scheduler must be running to configure."; | 272 CHECK(started_) << "Scheduler must be running to configure."; |
294 SDVLOG(2) << "Reconfiguring syncer."; | 273 SDVLOG(2) << "Reconfiguring syncer."; |
295 | 274 |
296 // Only one configuration is allowed at a time. Verify we're not waiting | 275 // Only one configuration is allowed at a time. Verify we're not waiting |
297 // for a pending configure job. | 276 // for a pending configure job. |
298 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); | 277 DCHECK(!pending_configure_job_); |
299 | 278 |
300 ModelSafeRoutingInfo restricted_routes; | 279 ModelSafeRoutingInfo restricted_routes; |
301 BuildModelSafeParams(params.types_to_download, | 280 BuildModelSafeParams(params.types_to_download, |
302 params.routing_info, | 281 params.routing_info, |
303 &restricted_routes); | 282 &restricted_routes); |
304 session_context_->set_routing_info(restricted_routes); | 283 session_context_->set_routing_info(restricted_routes); |
305 | 284 |
306 // Only reconfigure if we have types to download. | 285 // Only reconfigure if we have types to download. |
307 if (!params.types_to_download.Empty()) { | 286 if (!params.types_to_download.Empty()) { |
308 DCHECK(!restricted_routes.empty()); | 287 DCHECK(!restricted_routes.empty()); |
309 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 288 pending_configure_job_.reset(new SyncSessionJob( |
310 SyncSessionJob::CONFIGURATION, | 289 SyncSessionJob::CONFIGURATION, |
311 TimeTicks::Now(), | 290 TimeTicks::Now(), |
312 SyncSourceInfo(params.source, | 291 SyncSourceInfo(params.source, |
313 ModelSafeRoutingInfoToInvalidationMap( | 292 ModelSafeRoutingInfoToInvalidationMap( |
314 restricted_routes, | 293 restricted_routes, |
315 std::string())), | 294 std::string())), |
316 params)); | 295 params)); |
317 bool succeeded = DoSyncSessionJob(job.Pass(), NORMAL_PRIORITY); | 296 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); |
318 | 297 |
319 // If we failed, the job would have been saved as the pending configure | 298 // If we failed, the job would have been saved as the pending configure |
320 // job and a wait interval would have been set. | 299 // job and a wait interval would have been set. |
321 if (!succeeded) { | 300 if (!succeeded) { |
322 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); | 301 DCHECK(pending_configure_job_); |
323 return false; | 302 return false; |
| 303 } else { |
| 304 DCHECK(!pending_configure_job_); |
324 } | 305 } |
325 } else { | 306 } else { |
326 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 307 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
327 params.ready_task.Run(); | 308 params.ready_task.Run(); |
328 } | 309 } |
329 | 310 |
330 return true; | 311 return true; |
331 } | 312 } |
332 | 313 |
333 SyncSchedulerImpl::JobProcessDecision | 314 SyncSchedulerImpl::JobProcessDecision |
334 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, | 315 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job, |
335 JobPriority priority) { | 316 JobPriority priority) { |
336 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 317 DCHECK(CalledOnValidThread()); |
337 DCHECK(wait_interval_.get()); | 318 DCHECK(wait_interval_.get()); |
338 DCHECK_NE(job.purpose(), SyncSessionJob::POLL); | 319 DCHECK_NE(job.purpose(), SyncSessionJob::POLL); |
339 | 320 |
340 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 321 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
341 << WaitInterval::GetModeString(wait_interval_->mode) | 322 << WaitInterval::GetModeString(wait_interval_->mode) |
342 << (wait_interval_->had_nudge ? " (had nudge)" : "") | |
343 << ((priority == CANARY_PRIORITY) ? " (canary)" : ""); | 323 << ((priority == CANARY_PRIORITY) ? " (canary)" : ""); |
344 | 324 |
345 // If we save a job while in a WaitInterval, there is a well-defined moment | 325 // If we save a job while in a WaitInterval, there is a well-defined moment |
346 // in time in the future when it makes sense for that SAVE-worthy job to try | 326 // in time in the future when it makes sense for that SAVE-worthy job to try |
347 // running again -- the end of the WaitInterval. | 327 // running again -- the end of the WaitInterval. |
348 DCHECK(job.purpose() == SyncSessionJob::NUDGE || | 328 DCHECK(job.purpose() == SyncSessionJob::NUDGE || |
349 job.purpose() == SyncSessionJob::CONFIGURATION); | 329 job.purpose() == SyncSessionJob::CONFIGURATION); |
350 | 330 |
351 // If throttled, there's a clock ticking to unthrottle. We want to get | 331 // If throttled, there's a clock ticking to unthrottle. We want to get |
352 // on the same train. | 332 // on the same train. |
353 if (wait_interval_->mode == WaitInterval::THROTTLED) | 333 if (wait_interval_->mode == WaitInterval::THROTTLED) |
354 return SAVE; | 334 return SAVE; |
355 | 335 |
356 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 336 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
357 if (job.purpose() == SyncSessionJob::NUDGE) { | 337 if (job.purpose() == SyncSessionJob::NUDGE) { |
358 if (mode_ == CONFIGURATION_MODE) | 338 if (mode_ == CONFIGURATION_MODE) |
359 return SAVE; | 339 return SAVE; |
360 | 340 |
361 // If we already had one nudge then just drop this nudge. We will retry | |
362 // later when the timer runs out. | |
363 if (priority == NORMAL_PRIORITY) | 341 if (priority == NORMAL_PRIORITY) |
364 return wait_interval_->had_nudge ? DROP : CONTINUE; | 342 return DROP; |
365 else // We are here because timer ran out. So retry. | 343 else // Either backoff has ended, or we have permission to bypass it. |
366 return CONTINUE; | 344 return CONTINUE; |
367 } | 345 } |
368 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; | 346 return (priority == CANARY_PRIORITY) ? CONTINUE : SAVE; |
369 } | 347 } |
370 | 348 |
371 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 349 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
372 const SyncSessionJob& job, | 350 const SyncSessionJob& job, |
373 JobPriority priority) { | 351 JobPriority priority) { |
374 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 352 DCHECK(CalledOnValidThread()); |
375 | 353 |
376 // POLL jobs do not call this function. | 354 // POLL jobs do not call this function. |
377 DCHECK(job.purpose() == SyncSessionJob::NUDGE || | 355 DCHECK(job.purpose() == SyncSessionJob::NUDGE || |
378 job.purpose() == SyncSessionJob::CONFIGURATION); | 356 job.purpose() == SyncSessionJob::CONFIGURATION); |
379 | 357 |
380 // See if our type is throttled. | 358 // See if our type is throttled. |
381 ModelTypeSet throttled_types = | 359 ModelTypeSet throttled_types = |
382 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 360 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
383 if (job.purpose() == SyncSessionJob::NUDGE && | 361 if (job.purpose() == SyncSessionJob::NUDGE && |
384 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { | 362 job.source_info().updates_source == GetUpdatesCallerInfo::LOCAL) { |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
442 // Decision now rests on state of auth tokens. | 420 // Decision now rests on state of auth tokens. |
443 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 421 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
444 return CONTINUE; | 422 return CONTINUE; |
445 | 423 |
446 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 424 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
447 // Running the job would require updated auth, so we can't honour | 425 // Running the job would require updated auth, so we can't honour |
448 // job.scheduled_start(). | 426 // job.scheduled_start(). |
449 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; | 427 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; |
450 } | 428 } |
451 | 429 |
452 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { | |
453 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; | |
454 if (is_nudge && pending_nudge_) { | |
455 SDVLOG(2) << "Coalescing a pending nudge"; | |
456 // TODO(tim): This basically means we never use the more-careful coalescing | |
457 // logic in ScheduleNudgeImpl that takes the min of the two nudge start | |
458 // times, because we're calling this function first. Pull this out | |
459 // into a function to coalesce + set start times and reuse. | |
460 pending_nudge_->CoalesceSources(job->source_info()); | |
461 return; | |
462 } | |
463 | |
464 scoped_ptr<SyncSessionJob> job_to_save = job->Clone(); | |
465 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { | |
466 // This job should be made the new canary. | |
467 if (is_nudge) { | |
468 pending_nudge_ = job_to_save.get(); | |
469 } else { | |
470 SDVLOG(2) << "Saving a configuration job"; | |
471 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); | |
472 DCHECK(!wait_interval_->pending_configure_job); | |
473 DCHECK_EQ(mode_, CONFIGURATION_MODE); | |
474 DCHECK(!job->config_params().ready_task.is_null()); | |
475 // The only nudge that could exist is a scheduled canary nudge. | |
476 DCHECK(!unscheduled_nudge_storage_.get()); | |
477 if (pending_nudge_) { | |
478 // Pre-empt the nudge canary and abandon the old nudge (owned by task). | |
479 unscheduled_nudge_storage_ = pending_nudge_->Clone(); | |
480 pending_nudge_ = unscheduled_nudge_storage_.get(); | |
481 } | |
482 wait_interval_->pending_configure_job = job_to_save.get(); | |
483 } | |
484 TimeDelta length = | |
485 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); | |
486 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? | |
487 TimeDelta::FromSeconds(0) : length; | |
488 RestartWaiting(job_to_save.Pass()); | |
489 return; | |
490 } | |
491 | |
492 // Note that today there are no cases where we SAVE a CONFIGURATION job | |
493 // when we're not in a WaitInterval. See bug 147736. | |
494 DCHECK(is_nudge); | |
495 // There may or may not be a pending_configure_job. Either way this nudge | |
496 // is unschedulable. | |
497 pending_nudge_ = job_to_save.get(); | |
498 unscheduled_nudge_storage_ = job_to_save.Pass(); | |
499 } | |
500 | |
501 // Functor for std::find_if to search by ModelSafeGroup. | |
502 struct ModelSafeWorkerGroupIs { | |
503 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
504 bool operator()(ModelSafeWorker* w) { | |
505 return group == w->GetModelSafeGroup(); | |
506 } | |
507 ModelSafeGroup group; | |
508 }; | |
509 | |
510 void SyncSchedulerImpl::ScheduleNudgeAsync( | 430 void SyncSchedulerImpl::ScheduleNudgeAsync( |
511 const TimeDelta& desired_delay, | 431 const TimeDelta& desired_delay, |
512 NudgeSource source, ModelTypeSet types, | 432 NudgeSource source, ModelTypeSet types, |
513 const tracked_objects::Location& nudge_location) { | 433 const tracked_objects::Location& nudge_location) { |
514 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 434 DCHECK(CalledOnValidThread()); |
515 SDVLOG_LOC(nudge_location, 2) | 435 SDVLOG_LOC(nudge_location, 2) |
516 << "Nudge scheduled with delay " | 436 << "Nudge scheduled with delay " |
517 << desired_delay.InMilliseconds() << " ms, " | 437 << desired_delay.InMilliseconds() << " ms, " |
518 << "source " << GetNudgeSourceString(source) << ", " | 438 << "source " << GetNudgeSourceString(source) << ", " |
519 << "types " << ModelTypeSetToString(types); | 439 << "types " << ModelTypeSetToString(types); |
520 | 440 |
521 ModelTypeInvalidationMap invalidation_map = | 441 ModelTypeInvalidationMap invalidation_map = |
522 ModelTypeSetToInvalidationMap(types, std::string()); | 442 ModelTypeSetToInvalidationMap(types, std::string()); |
523 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, | 443 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
524 GetUpdatesFromNudgeSource(source), | 444 GetUpdatesFromNudgeSource(source), |
525 invalidation_map, | 445 invalidation_map, |
526 nudge_location); | 446 nudge_location); |
527 } | 447 } |
528 | 448 |
529 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 449 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
530 const TimeDelta& desired_delay, | 450 const TimeDelta& desired_delay, |
531 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, | 451 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
532 const tracked_objects::Location& nudge_location) { | 452 const tracked_objects::Location& nudge_location) { |
533 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 453 DCHECK(CalledOnValidThread()); |
534 SDVLOG_LOC(nudge_location, 2) | 454 SDVLOG_LOC(nudge_location, 2) |
535 << "Nudge scheduled with delay " | 455 << "Nudge scheduled with delay " |
536 << desired_delay.InMilliseconds() << " ms, " | 456 << desired_delay.InMilliseconds() << " ms, " |
537 << "source " << GetNudgeSourceString(source) << ", " | 457 << "source " << GetNudgeSourceString(source) << ", " |
538 << "payloads " | 458 << "payloads " |
539 << ModelTypeInvalidationMapToString(invalidation_map); | 459 << ModelTypeInvalidationMapToString(invalidation_map); |
540 | 460 |
541 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, | 461 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
542 GetUpdatesFromNudgeSource(source), | 462 GetUpdatesFromNudgeSource(source), |
543 invalidation_map, | 463 invalidation_map, |
544 nudge_location); | 464 nudge_location); |
545 } | 465 } |
546 | 466 |
547 | 467 |
548 // TODO(zea): Consider adding separate throttling/backoff for datatype | 468 // TODO(zea): Consider adding separate throttling/backoff for datatype |
549 // refresh requests. | 469 // refresh requests. |
550 void SyncSchedulerImpl::ScheduleNudgeImpl( | 470 void SyncSchedulerImpl::ScheduleNudgeImpl( |
551 const TimeDelta& delay, | 471 const TimeDelta& delay, |
552 GetUpdatesCallerInfo::GetUpdatesSource source, | 472 GetUpdatesCallerInfo::GetUpdatesSource source, |
553 const ModelTypeInvalidationMap& invalidation_map, | 473 const ModelTypeInvalidationMap& invalidation_map, |
554 const tracked_objects::Location& nudge_location) { | 474 const tracked_objects::Location& nudge_location) { |
555 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 475 DCHECK(CalledOnValidThread()); |
556 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; | 476 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
557 | 477 |
558 if (no_scheduling_allowed_) { | 478 if (no_scheduling_allowed_) { |
559 NOTREACHED() << "Illegal to schedule job while session in progress."; | 479 NOTREACHED() << "Illegal to schedule job while session in progress."; |
560 return; | 480 return; |
561 } | 481 } |
562 | 482 |
563 if (!started_) { | 483 if (!started_) { |
564 SDVLOG_LOC(nudge_location, 2) | 484 SDVLOG_LOC(nudge_location, 2) |
565 << "Dropping nudge, scheduler is not running."; | 485 << "Dropping nudge, scheduler is not running."; |
(...skipping 13 matching lines...) Expand all Loading... |
579 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( | 499 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
580 SyncSessionJob::NUDGE, | 500 SyncSessionJob::NUDGE, |
581 TimeTicks::Now() + delay, | 501 TimeTicks::Now() + delay, |
582 info, | 502 info, |
583 ConfigurationParams())); | 503 ConfigurationParams())); |
584 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); | 504 JobProcessDecision decision = DecideOnJob(*job, NORMAL_PRIORITY); |
585 SDVLOG(2) << "Should run " | 505 SDVLOG(2) << "Should run " |
586 << SyncSessionJob::GetPurposeString(job->purpose()) | 506 << SyncSessionJob::GetPurposeString(job->purpose()) |
587 << " in mode " << GetModeString(mode_) | 507 << " in mode " << GetModeString(mode_) |
588 << ": " << GetDecisionString(decision); | 508 << ": " << GetDecisionString(decision); |
589 if (decision != CONTINUE) { | 509 if (decision == DROP) { |
590 // End of the line, though we may save the job for later. | |
591 if (decision == SAVE) { | |
592 HandleSaveJobDecision(job.Pass()); | |
593 } else { | |
594 DCHECK_EQ(decision, DROP); | |
595 } | |
596 return; | 510 return; |
597 } | 511 } |
598 | 512 |
599 if (pending_nudge_) { | 513 // Try to coalesce in both SAVE and CONTINUE cases. |
600 SDVLOG(2) << "Rescheduling pending nudge"; | 514 if (pending_nudge_job_) { |
601 pending_nudge_->CoalesceSources(job->source_info()); | 515 pending_nudge_job_->CoalesceSources(job->source_info()); |
602 // Choose the start time as the earliest of the 2. Note that this means | 516 if (decision == CONTINUE) { |
603 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) | 517 // Only update the scheduled_start if we're going to reschedule. |
604 // but a nudge is already scheduled to go out, we'll send the (tab) commit | 518 pending_nudge_job_->set_scheduled_start( |
605 // without waiting. | 519 std::min(job->scheduled_start(), |
606 pending_nudge_->set_scheduled_start( | 520 pending_nudge_job_->scheduled_start())); |
607 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); | 521 } |
608 // Abandon the old task by cloning and replacing the session. | 522 } else { |
609 // It's possible that by "rescheduling" we're actually taking a job that | 523 pending_nudge_job_ = job.Pass(); |
610 // was previously unscheduled and giving it wings, so take care to reset | |
611 // unscheduled nudge storage. | |
612 job = pending_nudge_->Clone(); | |
613 pending_nudge_ = NULL; | |
614 unscheduled_nudge_storage_.reset(); | |
615 // It's also possible we took a canary job, since we allow one nudge | |
616 // per backoff interval. | |
617 DCHECK(!wait_interval_ || !wait_interval_->had_nudge); | |
618 } | 524 } |
619 | 525 |
620 TimeDelta run_delay = job->scheduled_start() - TimeTicks::Now(); | 526 if (decision == SAVE) { |
| 527 return; |
| 528 } |
| 529 |
| 530 TimeDelta run_delay = |
| 531 pending_nudge_job_->scheduled_start() - TimeTicks::Now(); |
621 if (run_delay < TimeDelta::FromMilliseconds(0)) | 532 if (run_delay < TimeDelta::FromMilliseconds(0)) |
622 run_delay = TimeDelta::FromMilliseconds(0); | 533 run_delay = TimeDelta::FromMilliseconds(0); |
623 SDVLOG_LOC(nudge_location, 2) | 534 SDVLOG_LOC(nudge_location, 2) |
624 << "Scheduling a nudge with " | 535 << "Scheduling a nudge with " |
625 << run_delay.InMilliseconds() << " ms delay"; | 536 << run_delay.InMilliseconds() << " ms delay"; |
626 | 537 |
627 pending_nudge_ = job.get(); | 538 if (started_) { |
628 PostDelayedTask(nudge_location, "DoSyncSessionJob", | 539 pending_wakeup_timer_.Start( |
629 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), | 540 nudge_location, |
630 weak_ptr_factory_.GetWeakPtr(), | 541 run_delay, |
631 base::Passed(&job), | 542 base::Bind(&SyncSchedulerImpl::DoNudgeSyncSessionJob, |
632 NORMAL_PRIORITY), | 543 weak_ptr_factory_.GetWeakPtr(), |
633 run_delay); | 544 NORMAL_PRIORITY)); |
| 545 } |
634 } | 546 } |
635 | 547 |
636 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 548 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
637 switch (mode) { | 549 switch (mode) { |
638 ENUM_CASE(CONFIGURATION_MODE); | 550 ENUM_CASE(CONFIGURATION_MODE); |
639 ENUM_CASE(NORMAL_MODE); | 551 ENUM_CASE(NORMAL_MODE); |
640 } | 552 } |
641 return ""; | 553 return ""; |
642 } | 554 } |
643 | 555 |
644 const char* SyncSchedulerImpl::GetDecisionString( | 556 const char* SyncSchedulerImpl::GetDecisionString( |
645 SyncSchedulerImpl::JobProcessDecision mode) { | 557 SyncSchedulerImpl::JobProcessDecision mode) { |
646 switch (mode) { | 558 switch (mode) { |
647 ENUM_CASE(CONTINUE); | 559 ENUM_CASE(CONTINUE); |
648 ENUM_CASE(SAVE); | 560 ENUM_CASE(SAVE); |
649 ENUM_CASE(DROP); | 561 ENUM_CASE(DROP); |
650 } | 562 } |
651 return ""; | 563 return ""; |
652 } | 564 } |
653 | 565 |
654 void SyncSchedulerImpl::PostDelayedTask( | 566 bool SyncSchedulerImpl::DoSyncSessionJobImpl(scoped_ptr<SyncSessionJob> job, |
655 const tracked_objects::Location& from_here, | 567 JobPriority priority) { |
656 const char* name, const base::Closure& task, base::TimeDelta delay) { | 568 DCHECK(CalledOnValidThread()); |
657 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | |
658 << delay.InMilliseconds() << " ms delay"; | |
659 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
660 if (!started_) { | |
661 SDVLOG(1) << "Not posting task as scheduler is stopped."; | |
662 return; | |
663 } | |
664 // This cancels the previous task, if one existed. | |
665 pending_wakeup_.Reset(task); | |
666 sync_loop_->PostDelayedTask(from_here, pending_wakeup_.callback(), delay); | |
667 } | |
668 | |
669 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job, | |
670 JobPriority priority) { | |
671 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
672 if (job->purpose() == SyncSessionJob::NUDGE) { | |
673 pending_nudge_ = NULL; | |
674 } | |
675 | 569 |
676 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 570 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
677 JobProcessDecision decision = DecideOnJob(*job, priority); | 571 JobProcessDecision decision = DecideOnJob(*job, priority); |
678 SDVLOG(2) << "Should run " | 572 SDVLOG(2) << "Should run " |
679 << SyncSessionJob::GetPurposeString(job->purpose()) | 573 << SyncSessionJob::GetPurposeString(job->purpose()) |
680 << " in mode " << GetModeString(mode_) | 574 << " in mode " << GetModeString(mode_) |
681 << " with source " << job->source_info().updates_source | 575 << " with source " << job->source_info().updates_source |
682 << ": " << GetDecisionString(decision); | 576 << ": " << GetDecisionString(decision); |
683 if (decision != CONTINUE) { | 577 if (decision != CONTINUE) { |
684 if (decision == SAVE) { | 578 if (decision == SAVE) { |
685 HandleSaveJobDecision(job.Pass()); | 579 if (job->purpose() == SyncSessionJob::CONFIGURATION) { |
| 580 pending_configure_job_ = job.Pass(); |
| 581 } else { |
| 582 pending_nudge_job_ = job.Pass(); |
| 583 } |
686 } else { | 584 } else { |
687 DCHECK_EQ(decision, DROP); | 585 DCHECK_EQ(decision, DROP); |
688 } | 586 } |
689 return false; | 587 return false; |
690 } | 588 } |
691 | 589 |
692 DVLOG(2) << "Creating sync session with routes " | 590 DVLOG(2) << "Creating sync session with routes " |
693 << ModelSafeRoutingInfoToString(session_context_->routing_info()) | 591 << ModelSafeRoutingInfoToString(session_context_->routing_info()) |
694 << "and purpose " << job->purpose(); | 592 << "and purpose " << job->purpose(); |
695 SyncSession session(session_context_, this, job->source_info()); | 593 SyncSession session(session_context_, this, job->source_info()); |
696 bool premature_exit = !syncer_->SyncShare(&session, | 594 bool premature_exit = !syncer_->SyncShare(&session, |
697 job->start_step(), | 595 job->start_step(), |
698 job->end_step()); | 596 job->end_step()); |
699 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 597 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
700 | 598 |
701 bool success = FinishSyncSessionJob(job.get(), | 599 bool success = FinishSyncSessionJob(job.get(), |
702 premature_exit, | 600 premature_exit, |
703 &session); | 601 &session); |
704 | 602 |
705 if (IsSyncingCurrentlySilenced()) { | 603 if (IsSyncingCurrentlySilenced()) { |
706 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; | 604 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
707 // If we're here, it's because |job| was silenced until a server specified | 605 // If we're here, it's because |job| was silenced until a server specified |
708 // time. (Note, it had to be |job|, because DecideOnJob would not permit | 606 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
709 // any job through while in WaitInterval::THROTTLED). | 607 // any job through while in WaitInterval::THROTTLED). |
710 scoped_ptr<SyncSessionJob> clone = job->Clone(); | 608 if (job->purpose() == SyncSessionJob::NUDGE) |
711 if (clone->purpose() == SyncSessionJob::NUDGE) | 609 pending_nudge_job_ = job.Pass(); |
712 pending_nudge_ = clone.get(); | 610 else if (job->purpose() == SyncSessionJob::CONFIGURATION) |
713 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) | 611 pending_configure_job_ = job.Pass(); |
714 wait_interval_->pending_configure_job = clone.get(); | |
715 else | 612 else |
716 NOTREACHED(); | 613 NOTREACHED(); |
717 | 614 |
718 RestartWaiting(clone.Pass()); | 615 RestartWaiting(); |
719 return success; | 616 return success; |
720 } | 617 } |
721 | 618 |
722 if (!success) | 619 if (!success) |
723 ScheduleNextSync(job.Pass(), &session); | 620 ScheduleNextSync(job.Pass(), &session); |
724 | 621 |
725 return success; | 622 return success; |
726 } | 623 } |
727 | 624 |
| 625 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { |
| 626 DoSyncSessionJobImpl(pending_nudge_job_.Pass(), priority); |
| 627 } |
| 628 |
| 629 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { |
| 630 return DoSyncSessionJobImpl(pending_configure_job_.Pass(), priority); |
| 631 } |
| 632 |
728 bool SyncSchedulerImpl::ShouldPoll() { | 633 bool SyncSchedulerImpl::ShouldPoll() { |
729 if (wait_interval_.get()) { | 634 if (wait_interval_.get()) { |
730 SDVLOG(2) << "Not running poll in wait interval."; | 635 SDVLOG(2) << "Not running poll in wait interval."; |
731 return false; | 636 return false; |
732 } | 637 } |
733 | 638 |
734 if (mode_ == CONFIGURATION_MODE) { | 639 if (mode_ == CONFIGURATION_MODE) { |
735 SDVLOG(2) << "Not running poll in configuration mode."; | 640 SDVLOG(2) << "Not running poll in configuration mode."; |
736 return false; | 641 return false; |
737 } | 642 } |
738 | 643 |
739 // TODO(rlarocque): Refactor decision-making logic common to all types | 644 // TODO(rlarocque): Refactor decision-making logic common to all types |
740 // of jobs into a shared function. | 645 // of jobs into a shared function. |
741 | 646 |
742 if (session_context_->connection_manager()->HasInvalidAuthToken()) { | 647 if (session_context_->connection_manager()->HasInvalidAuthToken()) { |
743 SDVLOG(2) << "Not running poll because auth token is invalid."; | 648 SDVLOG(2) << "Not running poll because auth token is invalid."; |
744 return false; | 649 return false; |
745 } | 650 } |
746 | 651 |
747 return true; | 652 return true; |
748 } | 653 } |
749 | 654 |
750 void SyncSchedulerImpl::DoPollSyncSessionJob(scoped_ptr<SyncSessionJob> job) { | 655 void SyncSchedulerImpl::DoPollSyncSessionJob() { |
751 DCHECK_EQ(job->purpose(), SyncSessionJob::POLL); | 656 ModelSafeRoutingInfo r; |
| 657 ModelTypeInvalidationMap invalidation_map = |
| 658 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
| 659 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
| 660 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
| 661 TimeTicks::Now(), |
| 662 info, |
| 663 ConfigurationParams())); |
752 | 664 |
753 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 665 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
754 | 666 |
755 if (!ShouldPoll()) | 667 if (!ShouldPoll()) |
756 return; | 668 return; |
757 | 669 |
758 DVLOG(2) << "Polling with routes " | 670 DVLOG(2) << "Polling with routes " |
759 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 671 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
760 SyncSession session(session_context_, this, job->source_info()); | 672 SyncSession session(session_context_, this, job->source_info()); |
761 bool premature_exit = !syncer_->SyncShare(&session, | 673 bool premature_exit = !syncer_->SyncShare(&session, |
762 job->start_step(), | 674 job->start_step(), |
763 job->end_step()); | 675 job->end_step()); |
764 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; | 676 SDVLOG(2) << "Done SyncShare, returned: " << premature_exit; |
765 | 677 |
766 FinishSyncSessionJob(job.get(), premature_exit, &session); | 678 FinishSyncSessionJob(job.get(), premature_exit, &session); |
767 | 679 |
768 if (IsSyncingCurrentlySilenced()) { | 680 if (IsSyncingCurrentlySilenced()) { |
769 // This will start the countdown to unthrottle. Other kinds of jobs would | 681 // Normally we would only call RestartWaiting() if we had a |
770 // schedule themselves as the post-unthrottle canary. A poll job is not | 682 // pending_nudge_job_ or pending_configure_job_ set. In this case, it's |
771 // that urgent, so it does not get to be the canary. We still need to start | 683 // possible that neither is set. We create the wait interval anyway because |
772 // the timer regardless. Otherwise there could be no one to clear the | 684 // we need it to make sure we get unthrottled on time. |
773 // WaitInterval when the throttling expires. | 685 RestartWaiting(); |
774 RestartWaiting(scoped_ptr<SyncSessionJob>()); | |
775 } | 686 } |
776 } | 687 } |
777 | 688 |
778 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 689 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
779 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 690 DCHECK(CalledOnValidThread()); |
780 | 691 |
781 // We are interested in recording time between local nudges for datatypes. | 692 // We are interested in recording time between local nudges for datatypes. |
782 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 693 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
783 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 694 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
784 return; | 695 return; |
785 | 696 |
786 base::TimeTicks now = TimeTicks::Now(); | 697 base::TimeTicks now = TimeTicks::Now(); |
787 // Update timing information for how often datatypes are triggering nudges. | 698 // Update timing information for how often datatypes are triggering nudges. |
788 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 699 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
789 iter != info.types.end(); | 700 iter != info.types.end(); |
790 ++iter) { | 701 ++iter) { |
791 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 702 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
792 last_local_nudges_by_model_type_[iter->first] = now; | 703 last_local_nudges_by_model_type_[iter->first] = now; |
793 if (previous.is_null()) | 704 if (previous.is_null()) |
794 continue; | 705 continue; |
795 | 706 |
796 #define PER_DATA_TYPE_MACRO(type_str) \ | 707 #define PER_DATA_TYPE_MACRO(type_str) \ |
797 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 708 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
798 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 709 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
799 #undef PER_DATA_TYPE_MACRO | 710 #undef PER_DATA_TYPE_MACRO |
800 } | 711 } |
801 } | 712 } |
802 | 713 |
803 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, | 714 bool SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job, |
804 bool exited_prematurely, | 715 bool exited_prematurely, |
805 SyncSession* session) { | 716 SyncSession* session) { |
806 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 717 DCHECK(CalledOnValidThread()); |
807 | 718 |
808 // Let job know that we're through syncing (calling SyncShare) at this point. | 719 // Let job know that we're through syncing (calling SyncShare) at this point. |
809 bool succeeded = false; | 720 bool succeeded = false; |
810 { | 721 { |
811 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); | 722 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); |
812 succeeded = job->Finish(exited_prematurely, session); | 723 succeeded = job->Finish(exited_prematurely, session); |
813 } | 724 } |
814 | 725 |
815 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 726 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
816 | 727 |
817 AdjustPolling(job); | 728 AdjustPolling(job); |
818 | 729 |
819 if (succeeded) { | 730 if (succeeded) { |
820 // No job currently supported by the scheduler could succeed without | 731 // No job currently supported by the scheduler could succeed without |
821 // successfully reaching the server. Therefore, if we make it here, it is | 732 // successfully reaching the server. Therefore, if we make it here, it is |
822 // appropriate to reset the backoff interval. | 733 // appropriate to reset the backoff interval. |
823 wait_interval_.reset(); | 734 wait_interval_.reset(); |
824 NotifyRetryTime(base::Time()); | 735 NotifyRetryTime(base::Time()); |
825 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 736 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
826 } | 737 } |
827 | 738 |
828 return succeeded; | 739 return succeeded; |
829 } | 740 } |
830 | 741 |
831 void SyncSchedulerImpl::ScheduleNextSync( | 742 void SyncSchedulerImpl::ScheduleNextSync( |
832 scoped_ptr<SyncSessionJob> finished_job, | 743 scoped_ptr<SyncSessionJob> finished_job, |
833 SyncSession* session) { | 744 SyncSession* session) { |
834 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 745 DCHECK(CalledOnValidThread()); |
835 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION | 746 DCHECK(finished_job->purpose() == SyncSessionJob::CONFIGURATION |
836 || finished_job->purpose() == SyncSessionJob::NUDGE); | 747 || finished_job->purpose() == SyncSessionJob::NUDGE); |
837 | 748 |
838 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 749 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
839 // if we don't succeed. Some types of errors are not likely to disappear on | 750 // if we don't succeed. Some types of errors are not likely to disappear on |
840 // their own. With the return values now available in the old_job.session, | 751 // their own. With the return values now available in the old_job.session, |
841 // we should be able to detect such errors and only retry when we detect | 752 // we should be able to detect such errors and only retry when we detect |
842 // transient errors. | 753 // transient errors. |
843 | 754 |
844 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 755 SDVLOG(2) << "SyncShare job failed; will start or update backoff"; |
845 mode_ == NORMAL_MODE) { | 756 HandleContinuationError(finished_job.Pass(), session); |
846 // When in normal mode, we allow up to one nudge per backoff interval. It | |
847 // appears that this was our nudge for this interval, and it failed. | |
848 // | |
849 // Note: This does not prevent us from running canary jobs. For example, | |
850 // an IP address change might still result in another nudge being executed | |
851 // during this backoff interval. | |
852 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; | |
853 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); | |
854 DCHECK(!wait_interval_->had_nudge); | |
855 | |
856 wait_interval_->had_nudge = true; | |
857 DCHECK(!pending_nudge_); | |
858 | |
859 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); | |
860 pending_nudge_ = new_job.get(); | |
861 RestartWaiting(new_job.Pass()); | |
862 } else { | |
863 // Either this is the first failure or a consecutive failure after our | |
864 // backoff timer expired. We handle it the same way in either case. | |
865 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | |
866 HandleContinuationError(finished_job.Pass(), session); | |
867 } | |
868 } | 757 } |
869 | 758 |
870 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 759 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
871 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 760 DCHECK(CalledOnValidThread()); |
872 | 761 |
873 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 762 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
874 syncer_short_poll_interval_seconds_ : | 763 syncer_short_poll_interval_seconds_ : |
875 syncer_long_poll_interval_seconds_; | 764 syncer_long_poll_interval_seconds_; |
876 bool rate_changed = !poll_timer_.IsRunning() || | 765 bool rate_changed = !poll_timer_.IsRunning() || |
877 poll != poll_timer_.GetCurrentDelay(); | 766 poll != poll_timer_.GetCurrentDelay(); |
878 | 767 |
879 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) | 768 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
880 poll_timer_.Reset(); | 769 poll_timer_.Reset(); |
881 | 770 |
882 if (!rate_changed) | 771 if (!rate_changed) |
883 return; | 772 return; |
884 | 773 |
885 // Adjust poll rate. | 774 // Adjust poll rate. |
886 poll_timer_.Stop(); | 775 poll_timer_.Stop(); |
887 poll_timer_.Start(FROM_HERE, poll, this, | 776 poll_timer_.Start(FROM_HERE, poll, this, |
888 &SyncSchedulerImpl::PollTimerCallback); | 777 &SyncSchedulerImpl::PollTimerCallback); |
889 } | 778 } |
890 | 779 |
891 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { | 780 void SyncSchedulerImpl::RestartWaiting() { |
892 CHECK(wait_interval_.get()); | 781 CHECK(wait_interval_.get()); |
893 wait_interval_->timer.Stop(); | |
894 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); | 782 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
895 if (wait_interval_->mode == WaitInterval::THROTTLED) { | 783 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
896 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::Unthrottle, | 784 pending_wakeup_timer_.Start( |
897 weak_ptr_factory_.GetWeakPtr(), | 785 FROM_HERE, |
898 base::Passed(&job))); | 786 wait_interval_->length, |
899 | 787 base::Bind(&SyncSchedulerImpl::Unthrottle, |
| 788 weak_ptr_factory_.GetWeakPtr())); |
900 } else { | 789 } else { |
901 pending_wakeup_.Reset(base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 790 pending_wakeup_timer_.Start( |
902 weak_ptr_factory_.GetWeakPtr(), | 791 FROM_HERE, |
903 base::Passed(&job))); | 792 wait_interval_->length, |
| 793 base::Bind(&SyncSchedulerImpl::TryCanaryJob, |
| 794 weak_ptr_factory_.GetWeakPtr())); |
904 } | 795 } |
905 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | |
906 pending_wakeup_.callback()); | |
907 } | 796 } |
908 | 797 |
909 void SyncSchedulerImpl::HandleContinuationError( | 798 void SyncSchedulerImpl::HandleContinuationError( |
910 scoped_ptr<SyncSessionJob> old_job, | 799 scoped_ptr<SyncSessionJob> old_job, |
911 SyncSession* session) { | 800 SyncSession* session) { |
912 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 801 DCHECK(CalledOnValidThread()); |
913 | 802 |
914 TimeDelta length = delay_provider_->GetDelay( | 803 TimeDelta length = delay_provider_->GetDelay( |
915 IsBackingOff() ? wait_interval_->length : | 804 IsBackingOff() ? wait_interval_->length : |
916 delay_provider_->GetInitialDelay( | 805 delay_provider_->GetInitialDelay( |
917 session->status_controller().model_neutral_state())); | 806 session->status_controller().model_neutral_state())); |
918 | 807 |
919 SDVLOG(2) << "In handle continuation error with " | 808 SDVLOG(2) << "In handle continuation error with " |
920 << SyncSessionJob::GetPurposeString(old_job->purpose()) | 809 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
921 << " job. The time delta(ms) is " | 810 << " job. The time delta(ms) is " |
922 << length.InMilliseconds(); | 811 << length.InMilliseconds(); |
923 | 812 |
924 // This will reset the had_nudge variable as well. | |
925 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 813 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
926 length)); | 814 length)); |
927 NotifyRetryTime(base::Time::Now() + length); | 815 NotifyRetryTime(base::Time::Now() + length); |
928 scoped_ptr<SyncSessionJob> new_job(old_job->Clone()); | 816 old_job->set_scheduled_start(TimeTicks::Now() + length); |
929 new_job->set_scheduled_start(TimeTicks::Now() + length); | |
930 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { | 817 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { |
931 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 818 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
932 // Config params should always get set. | 819 // Config params should always get set. |
933 DCHECK(!old_job->config_params().ready_task.is_null()); | 820 DCHECK(!old_job->config_params().ready_task.is_null()); |
934 wait_interval_->pending_configure_job = new_job.get(); | 821 DCHECK(!pending_configure_job_); |
| 822 pending_configure_job_ = old_job.Pass(); |
935 } else { | 823 } else { |
936 // We are not in configuration mode. So wait_interval's pending job | 824 // We're not in configure mode so we should not have a configure job. |
937 // should be null. | 825 DCHECK(!pending_configure_job_); |
938 DCHECK(wait_interval_->pending_configure_job == NULL); | 826 DCHECK(!pending_nudge_job_); |
939 DCHECK(!pending_nudge_); | 827 pending_nudge_job_ = old_job.Pass(); |
940 pending_nudge_ = new_job.get(); | |
941 } | 828 } |
942 | 829 |
943 RestartWaiting(new_job.Pass()); | 830 RestartWaiting(); |
944 } | 831 } |
945 | 832 |
946 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 833 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
947 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 834 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
948 DCHECK(weak_handle_this_.IsInitialized()); | 835 DCHECK(weak_handle_this_.IsInitialized()); |
949 SDVLOG(3) << "Posting StopImpl"; | 836 SDVLOG(3) << "Posting StopImpl"; |
950 weak_handle_this_.Call(FROM_HERE, | 837 weak_handle_this_.Call(FROM_HERE, |
951 &SyncSchedulerImpl::StopImpl, | 838 &SyncSchedulerImpl::StopImpl, |
952 callback); | 839 callback); |
953 } | 840 } |
954 | 841 |
955 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 842 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
956 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 843 DCHECK(CalledOnValidThread()); |
957 SDVLOG(2) << "StopImpl called"; | 844 SDVLOG(2) << "StopImpl called"; |
958 | 845 |
959 // Kill any in-flight method calls. | 846 // Kill any in-flight method calls. |
960 weak_ptr_factory_.InvalidateWeakPtrs(); | 847 weak_ptr_factory_.InvalidateWeakPtrs(); |
961 wait_interval_.reset(); | 848 wait_interval_.reset(); |
962 NotifyRetryTime(base::Time()); | 849 NotifyRetryTime(base::Time()); |
963 poll_timer_.Stop(); | 850 poll_timer_.Stop(); |
964 pending_nudge_ = NULL; | 851 pending_wakeup_timer_.Stop(); |
965 unscheduled_nudge_storage_.reset(); | 852 pending_nudge_job_.reset(); |
966 pending_wakeup_.Cancel(); | 853 pending_configure_job_.reset(); |
967 if (started_) { | 854 if (started_) { |
968 started_ = false; | 855 started_ = false; |
969 } | 856 } |
970 if (!callback.is_null()) | 857 if (!callback.is_null()) |
971 callback.Run(); | 858 callback.Run(); |
972 } | 859 } |
973 | 860 |
974 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { | 861 // This is the only place where we invoke DoSyncSessionJob with canary |
975 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 862 // privileges. Everyone else should use NORMAL_PRIORITY. |
976 SDVLOG(2) << "Do canary job"; | 863 void SyncSchedulerImpl::TryCanaryJob() { |
| 864 DCHECK(CalledOnValidThread()); |
977 | 865 |
978 // This is the only place where we invoke DoSyncSessionJob with canary | 866 if (mode_ == CONFIGURATION_MODE && pending_configure_job_) { |
979 // privileges. Everyone else should use NORMAL_PRIORITY. | 867 SDVLOG(2) << "Found pending configure job; will run as canary"; |
980 DoSyncSessionJob(to_be_canary.Pass(), CANARY_PRIORITY); | 868 DoConfigurationSyncSessionJob(CANARY_PRIORITY); |
981 } | 869 } else if (mode_ == NORMAL_MODE && pending_nudge_job_) { |
982 | 870 SDVLOG(2) << "Found pending nudge job; will run as canary"; |
983 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { | 871 DoNudgeSyncSessionJob(CANARY_PRIORITY); |
984 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 872 } else { |
985 // If we find a scheduled pending_ job, abandon the old one and return a | 873 SDVLOG(2) << "Found no work to do; will not run a canary"; |
986 // a clone. If unscheduled, just hand over ownership. | |
987 scoped_ptr<SyncSessionJob> candidate; | |
988 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | |
989 && wait_interval_->pending_configure_job) { | |
990 SDVLOG(2) << "Found pending configure job"; | |
991 candidate = | |
992 wait_interval_->pending_configure_job->Clone().Pass(); | |
993 wait_interval_->pending_configure_job = candidate.get(); | |
994 } else if (mode_ == NORMAL_MODE && pending_nudge_) { | |
995 SDVLOG(2) << "Found pending nudge job"; | |
996 candidate = pending_nudge_->Clone(); | |
997 pending_nudge_ = candidate.get(); | |
998 unscheduled_nudge_storage_.reset(); | |
999 } | 874 } |
1000 // If we took a job and there's a wait interval, we took the pending canary. | |
1001 if (candidate && wait_interval_) | |
1002 wait_interval_->timer.Stop(); | |
1003 return candidate.Pass(); | |
1004 } | 875 } |
1005 | 876 |
1006 void SyncSchedulerImpl::PollTimerCallback() { | 877 void SyncSchedulerImpl::PollTimerCallback() { |
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 878 DCHECK(CalledOnValidThread()); |
1008 ModelSafeRoutingInfo r; | |
1009 ModelTypeInvalidationMap invalidation_map = | |
1010 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | |
1011 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | |
1012 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, | |
1013 TimeTicks::Now(), | |
1014 info, | |
1015 ConfigurationParams())); | |
1016 if (no_scheduling_allowed_) { | 879 if (no_scheduling_allowed_) { |
1017 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in | 880 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in |
1018 // functions that are called only on the sync thread. This function is also | 881 // functions that are called only on the sync thread. This function is also |
1019 // called only on the sync thread, and only when it is posted by an expiring | 882 // called only on the sync thread, and only when it is posted by an expiring |
1020 // timer. If we find that no_scheduling_allowed_ is set here, then | 883 // timer. If we find that no_scheduling_allowed_ is set here, then |
1021 // something is very wrong. Maybe someone mistakenly called us directly, or | 884 // something is very wrong. Maybe someone mistakenly called us directly, or |
1022 // mishandled the book-keeping for no_scheduling_allowed_. | 885 // mishandled the book-keeping for no_scheduling_allowed_. |
1023 NOTREACHED() << "Illegal to schedule job while session in progress."; | 886 NOTREACHED() << "Illegal to schedule job while session in progress."; |
1024 return; | 887 return; |
1025 } | 888 } |
1026 | 889 |
1027 DoPollSyncSessionJob(job.Pass()); | 890 DoPollSyncSessionJob(); |
1028 } | 891 } |
1029 | 892 |
1030 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { | 893 void SyncSchedulerImpl::Unthrottle() { |
1031 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 894 DCHECK(CalledOnValidThread()); |
1032 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 895 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1033 DCHECK(!to_be_canary.get() || pending_nudge_ == to_be_canary.get() || | |
1034 wait_interval_->pending_configure_job == to_be_canary.get()); | |
1035 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") | |
1036 << "canary."; | |
1037 | 896 |
1038 // We're no longer throttled, so clear the wait interval. | 897 // We're no longer throttled, so clear the wait interval. |
1039 wait_interval_.reset(); | 898 wait_interval_.reset(); |
1040 NotifyRetryTime(base::Time()); | 899 NotifyRetryTime(base::Time()); |
1041 | 900 |
1042 // We treat this as a 'canary' in the sense that it was originally scheduled | 901 // We treat this as a 'canary' in the sense that it was originally scheduled |
1043 // to run some time ago, failed, and we now want to retry, versus a job that | 902 // to run some time ago, failed, and we now want to retry, versus a job that |
1044 // was just created (e.g via ScheduleNudgeImpl). The main implication is | 903 // was just created (e.g via ScheduleNudgeImpl). The main implication is |
1045 // that we're careful to update routing info (etc) with such potentially | 904 // that we're careful to update routing info (etc) with such potentially |
1046 // stale canary jobs. | 905 // stale canary jobs. |
1047 if (to_be_canary.get()) { | 906 TryCanaryJob(); |
1048 DoCanaryJob(to_be_canary.Pass()); | |
1049 } else { | |
1050 DCHECK(!unscheduled_nudge_storage_.get()); | |
1051 } | |
1052 } | 907 } |
1053 | 908 |
1054 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 909 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1055 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 910 DCHECK(CalledOnValidThread()); |
1056 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 911 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1057 } | 912 } |
1058 | 913 |
1059 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { | 914 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { |
1060 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); | 915 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); |
1061 event.retry_time = retry_time; | 916 event.retry_time = retry_time; |
1062 session_context_->NotifyListeners(event); | 917 session_context_->NotifyListeners(event); |
1063 } | 918 } |
1064 | 919 |
1065 bool SyncSchedulerImpl::IsBackingOff() const { | 920 bool SyncSchedulerImpl::IsBackingOff() const { |
1066 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 921 DCHECK(CalledOnValidThread()); |
1067 return wait_interval_.get() && wait_interval_->mode == | 922 return wait_interval_.get() && wait_interval_->mode == |
1068 WaitInterval::EXPONENTIAL_BACKOFF; | 923 WaitInterval::EXPONENTIAL_BACKOFF; |
1069 } | 924 } |
1070 | 925 |
1071 void SyncSchedulerImpl::OnSilencedUntil( | 926 void SyncSchedulerImpl::OnSilencedUntil( |
1072 const base::TimeTicks& silenced_until) { | 927 const base::TimeTicks& silenced_until) { |
1073 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 928 DCHECK(CalledOnValidThread()); |
1074 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 929 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1075 silenced_until - TimeTicks::Now())); | 930 silenced_until - TimeTicks::Now())); |
1076 NotifyRetryTime(base::Time::Now() + wait_interval_->length); | 931 NotifyRetryTime(base::Time::Now() + wait_interval_->length); |
1077 } | 932 } |
1078 | 933 |
1079 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 934 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1080 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 935 DCHECK(CalledOnValidThread()); |
1081 return wait_interval_.get() && wait_interval_->mode == | 936 return wait_interval_.get() && wait_interval_->mode == |
1082 WaitInterval::THROTTLED; | 937 WaitInterval::THROTTLED; |
1083 } | 938 } |
1084 | 939 |
1085 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 940 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1086 const base::TimeDelta& new_interval) { | 941 const base::TimeDelta& new_interval) { |
1087 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 942 DCHECK(CalledOnValidThread()); |
1088 syncer_short_poll_interval_seconds_ = new_interval; | 943 syncer_short_poll_interval_seconds_ = new_interval; |
1089 } | 944 } |
1090 | 945 |
1091 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( | 946 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( |
1092 const base::TimeDelta& new_interval) { | 947 const base::TimeDelta& new_interval) { |
1093 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 948 DCHECK(CalledOnValidThread()); |
1094 syncer_long_poll_interval_seconds_ = new_interval; | 949 syncer_long_poll_interval_seconds_ = new_interval; |
1095 } | 950 } |
1096 | 951 |
1097 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( | 952 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( |
1098 const base::TimeDelta& new_delay) { | 953 const base::TimeDelta& new_delay) { |
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 954 DCHECK(CalledOnValidThread()); |
1100 sessions_commit_delay_ = new_delay; | 955 sessions_commit_delay_ = new_delay; |
1101 } | 956 } |
1102 | 957 |
1103 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { | 958 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { |
1104 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 959 DCHECK(CalledOnValidThread()); |
1105 SDVLOG(2) << "OnShouldStopSyncingPermanently"; | 960 SDVLOG(2) << "OnShouldStopSyncingPermanently"; |
1106 syncer_->RequestEarlyExit(); // Thread-safe. | 961 syncer_->RequestEarlyExit(); // Thread-safe. |
1107 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | 962 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
1108 } | 963 } |
1109 | 964 |
1110 void SyncSchedulerImpl::OnActionableError( | 965 void SyncSchedulerImpl::OnActionableError( |
1111 const sessions::SyncSessionSnapshot& snap) { | 966 const sessions::SyncSessionSnapshot& snap) { |
1112 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 967 DCHECK(CalledOnValidThread()); |
1113 SDVLOG(2) << "OnActionableError"; | 968 SDVLOG(2) << "OnActionableError"; |
1114 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); | 969 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); |
1115 event.snapshot = snap; | 970 event.snapshot = snap; |
1116 session_context_->NotifyListeners(event); | 971 session_context_->NotifyListeners(event); |
1117 } | 972 } |
1118 | 973 |
1119 void SyncSchedulerImpl::OnSyncProtocolError( | 974 void SyncSchedulerImpl::OnSyncProtocolError( |
1120 const sessions::SyncSessionSnapshot& snapshot) { | 975 const sessions::SyncSessionSnapshot& snapshot) { |
1121 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 976 DCHECK(CalledOnValidThread()); |
1122 if (ShouldRequestEarlyExit( | 977 if (ShouldRequestEarlyExit( |
1123 snapshot.model_neutral_state().sync_protocol_error)) { | 978 snapshot.model_neutral_state().sync_protocol_error)) { |
1124 SDVLOG(2) << "Sync Scheduler requesting early exit."; | 979 SDVLOG(2) << "Sync Scheduler requesting early exit."; |
1125 syncer_->RequestEarlyExit(); // Thread-safe. | 980 syncer_->RequestEarlyExit(); // Thread-safe. |
1126 } | 981 } |
1127 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) | 982 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) |
1128 OnActionableError(snapshot); | 983 OnActionableError(snapshot); |
1129 } | 984 } |
1130 | 985 |
1131 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { | 986 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { |
1132 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 987 DCHECK(CalledOnValidThread()); |
1133 session_context_->set_notifications_enabled(notifications_enabled); | 988 session_context_->set_notifications_enabled(notifications_enabled); |
1134 } | 989 } |
1135 | 990 |
1136 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { | 991 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { |
1137 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 992 DCHECK(CalledOnValidThread()); |
1138 return sessions_commit_delay_; | 993 return sessions_commit_delay_; |
1139 } | 994 } |
1140 | 995 |
1141 #undef SDVLOG_LOC | 996 #undef SDVLOG_LOC |
1142 | 997 |
1143 #undef SDVLOG | 998 #undef SDVLOG |
1144 | 999 |
1145 #undef SLOG | 1000 #undef SLOG |
1146 | 1001 |
1147 #undef ENUM_CASE | 1002 #undef ENUM_CASE |
1148 | 1003 |
1149 } // namespace syncer | 1004 } // namespace syncer |
OLD | NEW |