OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/engine/sync_scheduler_impl.h" | 5 #include "sync/engine/sync_scheduler_impl.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <cstring> | 8 #include <cstring> |
9 | 9 |
10 #include "base/auto_reset.h" | 10 #include "base/auto_reset.h" |
11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/bind_helpers.h" |
12 #include "base/compiler_specific.h" | 13 #include "base/compiler_specific.h" |
13 #include "base/location.h" | 14 #include "base/location.h" |
14 #include "base/logging.h" | 15 #include "base/logging.h" |
15 #include "base/message_loop.h" | 16 #include "base/message_loop.h" |
16 #include "sync/engine/backoff_delay_provider.h" | 17 #include "sync/engine/backoff_delay_provider.h" |
17 #include "sync/engine/syncer.h" | 18 #include "sync/engine/syncer.h" |
18 #include "sync/engine/throttled_data_type_tracker.h" | 19 #include "sync/engine/throttled_data_type_tracker.h" |
19 #include "sync/protocol/proto_enum_conversions.h" | 20 #include "sync/protocol/proto_enum_conversions.h" |
20 #include "sync/protocol/sync.pb.h" | 21 #include "sync/protocol/sync.pb.h" |
21 #include "sync/util/data_type_histogram.h" | 22 #include "sync/util/data_type_histogram.h" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 : source(source), | 77 : source(source), |
77 types_to_download(types_to_download), | 78 types_to_download(types_to_download), |
78 routing_info(routing_info), | 79 routing_info(routing_info), |
79 ready_task(ready_task) { | 80 ready_task(ready_task) { |
80 DCHECK(!ready_task.is_null()); | 81 DCHECK(!ready_task.is_null()); |
81 } | 82 } |
82 ConfigurationParams::~ConfigurationParams() {} | 83 ConfigurationParams::~ConfigurationParams() {} |
83 | 84 |
84 SyncSchedulerImpl::WaitInterval::WaitInterval() | 85 SyncSchedulerImpl::WaitInterval::WaitInterval() |
85 : mode(UNKNOWN), | 86 : mode(UNKNOWN), |
86 had_nudge(false) { | 87 had_nudge(false), |
87 } | 88 pending_configure_job(NULL) {} |
| 89 |
| 90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
| 91 : mode(mode), had_nudge(false), length(length), |
| 92 pending_configure_job(NULL) {} |
88 | 93 |
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} | 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} |
90 | 95 |
91 #define ENUM_CASE(x) case x: return #x; break; | 96 #define ENUM_CASE(x) case x: return #x; break; |
92 | 97 |
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { | 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { |
94 switch (mode) { | 99 switch (mode) { |
95 ENUM_CASE(UNKNOWN); | 100 ENUM_CASE(UNKNOWN); |
96 ENUM_CASE(EXPONENTIAL_BACKOFF); | 101 ENUM_CASE(EXPONENTIAL_BACKOFF); |
97 ENUM_CASE(THROTTLED); | 102 ENUM_CASE(THROTTLED); |
98 } | 103 } |
99 NOTREACHED(); | 104 NOTREACHED(); |
100 return ""; | 105 return ""; |
101 } | 106 } |
102 | 107 |
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob() | |
104 : purpose(UNKNOWN), | |
105 is_canary_job(false) { | |
106 } | |
107 | |
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {} | |
109 | |
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | |
111 base::TimeTicks start, | |
112 linked_ptr<sessions::SyncSession> session, | |
113 bool is_canary_job, | |
114 const ConfigurationParams& config_params, | |
115 const tracked_objects::Location& from_here) | |
116 : purpose(purpose), | |
117 scheduled_start(start), | |
118 session(session), | |
119 is_canary_job(is_canary_job), | |
120 config_params(config_params), | |
121 from_here(from_here) { | |
122 } | |
123 | |
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString( | |
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) { | |
126 switch (purpose) { | |
127 ENUM_CASE(UNKNOWN); | |
128 ENUM_CASE(POLL); | |
129 ENUM_CASE(NUDGE); | |
130 ENUM_CASE(CONFIGURATION); | |
131 } | |
132 NOTREACHED(); | |
133 return ""; | |
134 } | |
135 | |
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | 108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( |
137 NudgeSource source) { | 109 NudgeSource source) { |
138 switch (source) { | 110 switch (source) { |
139 case NUDGE_SOURCE_NOTIFICATION: | 111 case NUDGE_SOURCE_NOTIFICATION: |
140 return GetUpdatesCallerInfo::NOTIFICATION; | 112 return GetUpdatesCallerInfo::NOTIFICATION; |
141 case NUDGE_SOURCE_LOCAL: | 113 case NUDGE_SOURCE_LOCAL: |
142 return GetUpdatesCallerInfo::LOCAL; | 114 return GetUpdatesCallerInfo::LOCAL; |
143 case NUDGE_SOURCE_CONTINUATION: | 115 case NUDGE_SOURCE_CONTINUATION: |
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | 116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
145 case NUDGE_SOURCE_LOCAL_REFRESH: | 117 case NUDGE_SOURCE_LOCAL_REFRESH: |
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; | 118 return GetUpdatesCallerInfo::DATATYPE_REFRESH; |
147 case NUDGE_SOURCE_UNKNOWN: | 119 case NUDGE_SOURCE_UNKNOWN: |
148 return GetUpdatesCallerInfo::UNKNOWN; | 120 return GetUpdatesCallerInfo::UNKNOWN; |
149 default: | 121 default: |
150 NOTREACHED(); | 122 NOTREACHED(); |
151 return GetUpdatesCallerInfo::UNKNOWN; | 123 return GetUpdatesCallerInfo::UNKNOWN; |
152 } | 124 } |
153 } | 125 } |
154 | 126 |
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | |
156 : mode(mode), had_nudge(false), length(length) { } | |
157 | |
158 // Helper macros to log with the syncer thread name; useful when there | 127 // Helper macros to log with the syncer thread name; useful when there |
159 // are multiple syncer threads involved. | 128 // are multiple syncer threads involved. |
160 | 129 |
161 #define SLOG(severity) LOG(severity) << name_ << ": " | 130 #define SLOG(severity) LOG(severity) << name_ << ": " |
162 | 131 |
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " | 132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " |
164 | 133 |
165 #define SDVLOG_LOC(from_here, verbose_level) \ | 134 #define SDVLOG_LOC(from_here, verbose_level) \ |
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " | 135 DVLOG_LOC(from_here, verbose_level) << name_ << ": " |
167 | 136 |
(...skipping 30 matching lines...) Expand all Loading... |
198 syncer_short_poll_interval_seconds_( | 167 syncer_short_poll_interval_seconds_( |
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
200 syncer_long_poll_interval_seconds_( | 169 syncer_long_poll_interval_seconds_( |
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
202 sessions_commit_delay_( | 171 sessions_commit_delay_( |
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), | 172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), |
204 mode_(NORMAL_MODE), | 173 mode_(NORMAL_MODE), |
205 // Start with assuming everything is fine with the connection. | 174 // Start with assuming everything is fine with the connection. |
206 // At the end of the sync cycle we would have the correct status. | 175 // At the end of the sync cycle we would have the correct status. |
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), | 176 connection_code_(HttpResponse::SERVER_CONNECTION_OK), |
| 177 pending_nudge_(NULL), |
208 delay_provider_(delay_provider), | 178 delay_provider_(delay_provider), |
209 syncer_(syncer), | 179 syncer_(syncer), |
210 session_context_(context), | 180 session_context_(context), |
211 no_scheduling_allowed_(false) { | 181 no_scheduling_allowed_(false) { |
212 DCHECK(sync_loop_); | 182 DCHECK(sync_loop_); |
213 } | 183 } |
214 | 184 |
215 SyncSchedulerImpl::~SyncSchedulerImpl() { | 185 SyncSchedulerImpl::~SyncSchedulerImpl() { |
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 186 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
217 StopImpl(base::Closure()); | 187 StopImpl(base::Closure()); |
(...skipping 16 matching lines...) Expand all Loading... |
234 void SyncSchedulerImpl::OnConnectionStatusChange() { | 204 void SyncSchedulerImpl::OnConnectionStatusChange() { |
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { | 205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { |
236 // Optimistically assume that the connection is fixed and try | 206 // Optimistically assume that the connection is fixed and try |
237 // connecting. | 207 // connecting. |
238 OnServerConnectionErrorFixed(); | 208 OnServerConnectionErrorFixed(); |
239 } | 209 } |
240 } | 210 } |
241 | 211 |
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { | 212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { |
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; | 213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; |
| 214 // There could be a pending nudge or configuration job in several cases: |
| 215 // |
| 216 // 1. We're in exponential backoff. |
| 217 // 2. We're silenced / throttled. |
| 218 // 3. A nudge was saved previously due to not having a valid auth token. |
| 219 // 4. A nudge was scheduled + saved while in configuration mode. |
| 220 // |
| 221 // In all cases except (2), we want to retry contacting the server. We |
| 222 // call DoCanaryJob to achieve this, and note that nothing -- not even a |
| 223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that |
| 224 // has the authority to do that is the Unthrottle timer. |
| 225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
| 226 if (!pending.get()) |
| 227 return; |
| 228 |
244 PostTask(FROM_HERE, "DoCanaryJob", | 229 PostTask(FROM_HERE, "DoCanaryJob", |
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, | 230 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
246 weak_ptr_factory_.GetWeakPtr())); | 231 weak_ptr_factory_.GetWeakPtr(), |
247 | 232 base::Passed(&pending))); |
248 } | 233 } |
249 | 234 |
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( | 235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( |
251 HttpResponse::ServerConnectionCode code) { | 236 HttpResponse::ServerConnectionCode code) { |
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 237 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
253 SDVLOG(2) << "New server connection code: " | 238 SDVLOG(2) << "New server connection code: " |
254 << HttpResponse::GetServerConnectionCodeString(code); | 239 << HttpResponse::GetServerConnectionCodeString(code); |
255 | 240 |
256 connection_code_ = code; | 241 connection_code_ = code; |
257 } | 242 } |
(...skipping 15 matching lines...) Expand all Loading... |
273 Mode old_mode = mode_; | 258 Mode old_mode = mode_; |
274 mode_ = mode; | 259 mode_ = mode; |
275 AdjustPolling(NULL); // Will kick start poll timer if needed. | 260 AdjustPolling(NULL); // Will kick start poll timer if needed. |
276 | 261 |
277 if (old_mode != mode_) { | 262 if (old_mode != mode_) { |
278 // We just changed our mode. See if there are any pending jobs that we could | 263 // We just changed our mode. See if there are any pending jobs that we could |
279 // execute in the new mode. | 264 // execute in the new mode. |
280 if (mode_ == NORMAL_MODE) { | 265 if (mode_ == NORMAL_MODE) { |
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job | 266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job |
282 // has not yet completed. | 267 // has not yet completed. |
283 DCHECK(!wait_interval_.get() || | 268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
284 !wait_interval_->pending_configure_job.get()); | |
285 } | 269 } |
286 | 270 |
287 DoPendingJobIfPossible(false); | 271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode()); |
| 272 if (pending.get()) { |
| 273 // TODO(tim): We should be able to remove this... |
| 274 scoped_ptr<SyncSession> session(CreateSyncSession( |
| 275 pending->session()->source())); |
| 276 // Also the routing info might have been changed since we cached the |
| 277 // pending nudge. Update it by coalescing to the latest. |
| 278 pending->mutable_session()->Coalesce(*session); |
| 279 SDVLOG(2) << "Executing pending job. Good luck!"; |
| 280 DoSyncSessionJob(pending.Pass()); |
| 281 } |
288 } | 282 } |
289 } | 283 } |
290 | 284 |
291 void SyncSchedulerImpl::SendInitialSnapshot() { | 285 void SyncSchedulerImpl::SendInitialSnapshot() { |
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 286 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
294 SyncSourceInfo(), ModelSafeRoutingInfo(), | 288 SyncSourceInfo(), ModelSafeRoutingInfo(), |
295 std::vector<ModelSafeWorker*>())); | 289 std::vector<ModelSafeWorker*>())); |
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
297 event.snapshot = dummy->TakeSnapshot(); | 291 event.snapshot = dummy->TakeSnapshot(); |
(...skipping 23 matching lines...) Expand all Loading... |
321 bool SyncSchedulerImpl::ScheduleConfiguration( | 315 bool SyncSchedulerImpl::ScheduleConfiguration( |
322 const ConfigurationParams& params) { | 316 const ConfigurationParams& params) { |
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 317 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | 318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); | 319 DCHECK_EQ(CONFIGURATION_MODE, mode_); |
326 DCHECK(!params.ready_task.is_null()); | 320 DCHECK(!params.ready_task.is_null()); |
327 SDVLOG(2) << "Reconfiguring syncer."; | 321 SDVLOG(2) << "Reconfiguring syncer."; |
328 | 322 |
329 // Only one configuration is allowed at a time. Verify we're not waiting | 323 // Only one configuration is allowed at a time. Verify we're not waiting |
330 // for a pending configure job. | 324 // for a pending configure job. |
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | 325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job); |
332 | 326 |
333 ModelSafeRoutingInfo restricted_routes; | 327 ModelSafeRoutingInfo restricted_routes; |
334 BuildModelSafeParams(params.types_to_download, | 328 BuildModelSafeParams(params.types_to_download, |
335 params.routing_info, | 329 params.routing_info, |
336 &restricted_routes); | 330 &restricted_routes); |
337 session_context_->set_routing_info(params.routing_info); | 331 session_context_->set_routing_info(params.routing_info); |
338 | 332 |
339 // Only reconfigure if we have types to download. | 333 // Only reconfigure if we have types to download. |
340 if (!params.types_to_download.Empty()) { | 334 if (!params.types_to_download.Empty()) { |
341 DCHECK(!restricted_routes.empty()); | 335 DCHECK(!restricted_routes.empty()); |
342 linked_ptr<SyncSession> session(new SyncSession( | 336 scoped_ptr<SyncSession> session(new SyncSession( |
343 session_context_, | 337 session_context_, |
344 this, | 338 this, |
345 SyncSourceInfo(params.source, | 339 SyncSourceInfo(params.source, |
346 ModelSafeRoutingInfoToInvalidationMap( | 340 ModelSafeRoutingInfoToInvalidationMap( |
347 restricted_routes, | 341 restricted_routes, |
348 std::string())), | 342 std::string())), |
349 restricted_routes, | 343 restricted_routes, |
350 session_context_->workers())); | 344 session_context_->workers())); |
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | 345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
352 TimeTicks::Now(), | 346 SyncSessionJob::CONFIGURATION, |
353 session, | 347 TimeTicks::Now(), |
354 false, | 348 session.Pass(), |
355 params, | 349 params, |
356 FROM_HERE); | 350 FROM_HERE)); |
357 DoSyncSessionJob(job); | 351 bool succeeded = DoSyncSessionJob(job.Pass()); |
358 | 352 |
359 // If we failed, the job would have been saved as the pending configure | 353 // If we failed, the job would have been saved as the pending configure |
360 // job and a wait interval would have been set. | 354 // job and a wait interval would have been set. |
361 if (!session->Succeeded()) { | 355 if (!succeeded) { |
362 DCHECK(wait_interval_.get() && | 356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job); |
363 wait_interval_->pending_configure_job.get()); | |
364 return false; | 357 return false; |
365 } | 358 } |
366 } else { | 359 } else { |
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; | 360 SDVLOG(2) << "No change in routing info, calling ready task directly."; |
368 params.ready_task.Run(); | 361 params.ready_task.Run(); |
369 } | 362 } |
370 | 363 |
371 return true; | 364 return true; |
372 } | 365 } |
373 | 366 |
374 SyncSchedulerImpl::JobProcessDecision | 367 SyncSchedulerImpl::JobProcessDecision |
375 SyncSchedulerImpl::DecideWhileInWaitInterval( | 368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) { |
376 const SyncSessionJob& job) { | |
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 369 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
378 DCHECK(wait_interval_.get()); | 370 DCHECK(wait_interval_.get()); |
379 | 371 |
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
381 << WaitInterval::GetModeString(wait_interval_->mode) | 373 << WaitInterval::GetModeString(wait_interval_->mode) |
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 374 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
383 << (job.is_canary_job ? " (canary)" : ""); | 375 << (job.is_canary() ? " (canary)" : ""); |
384 | 376 |
385 if (job.purpose == SyncSessionJob::POLL) | 377 if (job.purpose() == SyncSessionJob::POLL) |
386 return DROP; | 378 return DROP; |
387 | 379 |
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 380 // If we save a job while in a WaitInterval, there is a well-defined moment |
389 job.purpose == SyncSessionJob::CONFIGURATION); | 381 // in time in the future when it makes sense for that SAVE-worthy job to try |
| 382 // running again -- the end of the WaitInterval. |
| 383 DCHECK(job.purpose() == SyncSessionJob::NUDGE || |
| 384 job.purpose() == SyncSessionJob::CONFIGURATION); |
| 385 |
| 386 // If throttled, there's a clock ticking to unthrottle. We want to get |
| 387 // on the same train. |
390 if (wait_interval_->mode == WaitInterval::THROTTLED) | 388 if (wait_interval_->mode == WaitInterval::THROTTLED) |
391 return SAVE; | 389 return SAVE; |
392 | 390 |
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
394 if (job.purpose == SyncSessionJob::NUDGE) { | 392 if (job.purpose() == SyncSessionJob::NUDGE) { |
395 if (mode_ == CONFIGURATION_MODE) | 393 if (mode_ == CONFIGURATION_MODE) |
396 return SAVE; | 394 return SAVE; |
397 | 395 |
398 // If we already had one nudge then just drop this nudge. We will retry | 396 // If we already had one nudge then just drop this nudge. We will retry |
399 // later when the timer runs out. | 397 // later when the timer runs out. |
400 if (!job.is_canary_job) | 398 if (!job.is_canary()) |
401 return wait_interval_->had_nudge ? DROP : CONTINUE; | 399 return wait_interval_->had_nudge ? DROP : CONTINUE; |
402 else // We are here because timer ran out. So retry. | 400 else // We are here because timer ran out. So retry. |
403 return CONTINUE; | 401 return CONTINUE; |
404 } | 402 } |
405 return job.is_canary_job ? CONTINUE : SAVE; | 403 return job.is_canary() ? CONTINUE : SAVE; |
406 } | 404 } |
407 | 405 |
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( | 406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( |
409 const SyncSessionJob& job) { | 407 const SyncSessionJob& job) { |
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 408 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
411 | 409 |
412 // See if our type is throttled. | 410 // See if our type is throttled. |
413 ModelTypeSet throttled_types = | 411 ModelTypeSet throttled_types = |
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); | 412 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); |
415 if (job.purpose == SyncSessionJob::NUDGE && | 413 if (job.purpose() == SyncSessionJob::NUDGE && |
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { | 414 job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) { |
417 ModelTypeSet requested_types; | 415 ModelTypeSet requested_types; |
418 for (ModelTypeInvalidationMap::const_iterator i = | 416 for (ModelTypeInvalidationMap::const_iterator i = |
419 job.session->source().types.begin(); | 417 job.session()->source().types.begin(); |
420 i != job.session->source().types.end(); | 418 i != job.session()->source().types.end(); |
421 ++i) { | 419 ++i) { |
422 requested_types.Put(i->first); | 420 requested_types.Put(i->first); |
423 } | 421 } |
424 | 422 |
| 423 // If all types are throttled, do not CONTINUE. Today, we don't treat |
| 424 // a per-datatype "unthrottle" event as something that should force a |
| 425 // canary job. For this reason, there's no good time to reschedule this job |
| 426 // to run -- we'll lazily wait for an independent event to trigger a sync. |
| 427 // Note that there may already be such an event if we're in a WaitInterval, |
| 428 // so we can retry it then. |
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) | 429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) |
426 return SAVE; | 430 return SAVE; |
427 } | 431 } |
428 | 432 |
429 if (wait_interval_.get()) | 433 if (wait_interval_.get()) |
430 return DecideWhileInWaitInterval(job); | 434 return DecideWhileInWaitInterval(job); |
431 | 435 |
432 if (mode_ == CONFIGURATION_MODE) { | 436 if (mode_ == CONFIGURATION_MODE) { |
433 if (job.purpose == SyncSessionJob::NUDGE) | 437 if (job.purpose() == SyncSessionJob::NUDGE) |
434 return SAVE; | 438 return SAVE; // Running requires a mode switch. |
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) | 439 else if (job.purpose() == SyncSessionJob::CONFIGURATION) |
436 return CONTINUE; | 440 return CONTINUE; |
437 else | 441 else |
438 return DROP; | 442 return DROP; |
439 } | 443 } |
440 | 444 |
441 // We are in normal mode. | 445 // We are in normal mode. |
442 DCHECK_EQ(mode_, NORMAL_MODE); | 446 DCHECK_EQ(mode_, NORMAL_MODE); |
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 447 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION); |
444 | 448 |
445 // Note about some subtle scheduling semantics. | 449 // Note about some subtle scheduling semantics. |
446 // | 450 // |
447 // It's possible at this point that |job| is known to be unnecessary, and | 451 // It's possible at this point that |job| is known to be unnecessary, and |
448 // dropping it would be perfectly safe and correct. Consider | 452 // dropping it would be perfectly safe and correct. Consider |
449 // | 453 // |
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than | 454 // 1) |job| is a POLL with a |scheduled_start| time that is less than |
451 // the time that the last successful all-datatype NUDGE completed. | 455 // the time that the last successful all-datatype NUDGE completed. |
452 // | 456 // |
453 // 2) |job| is a NUDGE (for any combination of types) with a | 457 // 2) |job| is a NUDGE (for any combination of types) with a |
(...skipping 22 matching lines...) Expand all Loading... |
476 // * It's not strictly "impossible", but it would be reentrant and hence | 480 // * It's not strictly "impossible", but it would be reentrant and hence |
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a | 481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a |
478 // legal side effect of any of the work being done as part of a sync cycle. | 482 // legal side effect of any of the work being done as part of a sync cycle. |
479 // See |no_scheduling_allowed_| for details. | 483 // See |no_scheduling_allowed_| for details. |
480 | 484 |
481 // Decision now rests on state of auth tokens. | 485 // Decision now rests on state of auth tokens. |
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) | 486 if (!session_context_->connection_manager()->HasInvalidAuthToken()) |
483 return CONTINUE; | 487 return CONTINUE; |
484 | 488 |
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; | 489 SDVLOG(2) << "No valid auth token. Using that to decide on job."; |
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 490 // Running the job would require updated auth, so we can't honour |
| 491 // job.scheduled_start(). |
| 492 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP; |
487 } | 493 } |
488 | 494 |
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 495 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) { |
490 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 496 DCHECK_EQ(DecideOnJob(*job), SAVE); |
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | 497 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE; |
492 if (pending_nudge_.get() == NULL) { | 498 if (is_nudge && pending_nudge_) { |
493 SDVLOG(2) << "Creating a pending nudge job"; | 499 SDVLOG(2) << "Coalescing a pending nudge"; |
494 SyncSession* s = job.session.get(); | 500 // TODO(tim): This basically means we never use the more-careful coalescing |
495 | 501 // logic in ScheduleNudgeImpl that takes the min of the two nudge start |
496 // Get a fresh session with similar configuration as before (resets | 502 // times, because we're calling this function first. Pull this out |
497 // StatusController). | 503 // into a function to coalesce + set start times and reuse. |
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | 504 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
499 s->delegate(), s->source(), s->routing_info(), s->workers())); | |
500 | |
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | |
502 make_linked_ptr(session.release()), false, | |
503 ConfigurationParams(), job.from_here); | |
504 pending_nudge_.reset(new SyncSessionJob(new_job)); | |
505 return; | 505 return; |
506 } | 506 } |
507 | 507 |
508 SDVLOG(2) << "Coalescing a pending nudge"; | 508 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon(); |
509 pending_nudge_->session->Coalesce(*(job.session.get())); | 509 if (wait_interval_.get() && !wait_interval_->pending_configure_job) { |
510 pending_nudge_->scheduled_start = job.scheduled_start; | 510 // This job should be made the new canary. |
| 511 if (is_nudge) { |
| 512 pending_nudge_ = job_to_save.get(); |
| 513 } else { |
| 514 SDVLOG(2) << "Saving a configuration job"; |
| 515 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION); |
| 516 DCHECK(!wait_interval_->pending_configure_job); |
| 517 DCHECK_EQ(mode_, CONFIGURATION_MODE); |
| 518 DCHECK(!job->config_params().ready_task.is_null()); |
| 519 // The only nudge that could exist is a scheduled canary nudge. |
| 520 DCHECK(!unscheduled_nudge_storage_.get()); |
| 521 if (pending_nudge_) { |
| 522 // Pre-empt the nudge canary and abandon the old nudge (owned by task). |
| 523 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon(); |
| 524 pending_nudge_ = unscheduled_nudge_storage_.get(); |
| 525 } |
| 526 wait_interval_->pending_configure_job = job_to_save.get(); |
| 527 } |
| 528 TimeDelta length = |
| 529 wait_interval_->timer.desired_run_time() - TimeTicks::Now(); |
| 530 wait_interval_->length = length < TimeDelta::FromSeconds(0) ? |
| 531 TimeDelta::FromSeconds(0) : length; |
| 532 RestartWaiting(job_to_save.Pass()); |
| 533 return; |
| 534 } |
511 | 535 |
512 // Unfortunately the nudge location cannot be modified. So it stores the | 536 // Note that today there are no cases where we SAVE a CONFIGURATION job |
513 // location of the first caller. | 537 // when we're not in a WaitInterval. See bug 147736. |
514 } | 538 DCHECK(is_nudge); |
515 | 539 // There may or may not be a pending_configure_job. Either way this nudge |
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { | 540 // is unschedulable. |
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 541 pending_nudge_ = job_to_save.get(); |
518 DCHECK(started_); | 542 unscheduled_nudge_storage_ = job_to_save.Pass(); |
519 | |
520 JobProcessDecision decision = DecideOnJob(job); | |
521 SDVLOG(2) << "Should run " | |
522 << SyncSessionJob::GetPurposeString(job.purpose) | |
523 << " job in mode " << GetModeString(mode_) | |
524 << ": " << GetDecisionString(decision); | |
525 if (decision != SAVE) | |
526 return decision == CONTINUE; | |
527 | |
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | |
529 SyncSessionJob::CONFIGURATION); | |
530 | |
531 SaveJob(job); | |
532 return false; | |
533 } | |
534 | |
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) { | |
536 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
537 if (job.purpose == SyncSessionJob::NUDGE) { | |
538 SDVLOG(2) << "Saving a nudge job"; | |
539 InitOrCoalescePendingJob(job); | |
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | |
541 SDVLOG(2) << "Saving a configuration job"; | |
542 DCHECK(wait_interval_.get()); | |
543 DCHECK(mode_ == CONFIGURATION_MODE); | |
544 | |
545 // Config params should always get set. | |
546 DCHECK(!job.config_params.ready_task.is_null()); | |
547 SyncSession* old = job.session.get(); | |
548 SyncSession* s(new SyncSession(session_context_, this, old->source(), | |
549 old->routing_info(), old->workers())); | |
550 SyncSessionJob new_job(job.purpose, | |
551 TimeTicks::Now(), | |
552 make_linked_ptr(s), | |
553 false, | |
554 job.config_params, | |
555 job.from_here); | |
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | |
557 } // drop the rest. | |
558 // TODO(sync): Is it okay to drop the rest? It's weird that | |
559 // SaveJob() only does what it says sometimes. (See | |
560 // http://crbug.com/90868.) | |
561 } | 543 } |
562 | 544 |
563 // Functor for std::find_if to search by ModelSafeGroup. | 545 // Functor for std::find_if to search by ModelSafeGroup. |
564 struct ModelSafeWorkerGroupIs { | 546 struct ModelSafeWorkerGroupIs { |
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 547 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
566 bool operator()(ModelSafeWorker* w) { | 548 bool operator()(ModelSafeWorker* w) { |
567 return group == w->GetModelSafeGroup(); | 549 return group == w->GetModelSafeGroup(); |
568 } | 550 } |
569 ModelSafeGroup group; | 551 ModelSafeGroup group; |
570 }; | 552 }; |
571 | 553 |
572 void SyncSchedulerImpl::ScheduleNudgeAsync( | 554 void SyncSchedulerImpl::ScheduleNudgeAsync( |
573 const TimeDelta& delay, | 555 const TimeDelta& desired_delay, |
574 NudgeSource source, ModelTypeSet types, | 556 NudgeSource source, ModelTypeSet types, |
575 const tracked_objects::Location& nudge_location) { | 557 const tracked_objects::Location& nudge_location) { |
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 558 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
577 SDVLOG_LOC(nudge_location, 2) | 559 SDVLOG_LOC(nudge_location, 2) |
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 560 << "Nudge scheduled with delay " |
| 561 << desired_delay.InMilliseconds() << " ms, " |
579 << "source " << GetNudgeSourceString(source) << ", " | 562 << "source " << GetNudgeSourceString(source) << ", " |
580 << "types " << ModelTypeSetToString(types); | 563 << "types " << ModelTypeSetToString(types); |
581 | 564 |
582 ModelTypeInvalidationMap invalidation_map = | 565 ModelTypeInvalidationMap invalidation_map = |
583 ModelTypeSetToInvalidationMap(types, std::string()); | 566 ModelTypeSetToInvalidationMap(types, std::string()); |
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 567 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
585 GetUpdatesFromNudgeSource(source), | 568 GetUpdatesFromNudgeSource(source), |
586 invalidation_map, | 569 invalidation_map, |
587 false, | |
588 nudge_location); | 570 nudge_location); |
589 } | 571 } |
590 | 572 |
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( | 573 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( |
592 const TimeDelta& delay, | 574 const TimeDelta& desired_delay, |
593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, | 575 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, |
594 const tracked_objects::Location& nudge_location) { | 576 const tracked_objects::Location& nudge_location) { |
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 577 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
596 SDVLOG_LOC(nudge_location, 2) | 578 SDVLOG_LOC(nudge_location, 2) |
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 579 << "Nudge scheduled with delay " |
| 580 << desired_delay.InMilliseconds() << " ms, " |
598 << "source " << GetNudgeSourceString(source) << ", " | 581 << "source " << GetNudgeSourceString(source) << ", " |
599 << "payloads " | 582 << "payloads " |
600 << ModelTypeInvalidationMapToString(invalidation_map); | 583 << ModelTypeInvalidationMapToString(invalidation_map); |
601 | 584 |
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, | 585 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay, |
603 GetUpdatesFromNudgeSource(source), | 586 GetUpdatesFromNudgeSource(source), |
604 invalidation_map, | 587 invalidation_map, |
605 false, | |
606 nudge_location); | 588 nudge_location); |
607 } | 589 } |
608 | 590 |
609 void SyncSchedulerImpl::ScheduleNudgeImpl( | 591 void SyncSchedulerImpl::ScheduleNudgeImpl( |
610 const TimeDelta& delay, | 592 const TimeDelta& delay, |
611 GetUpdatesCallerInfo::GetUpdatesSource source, | 593 GetUpdatesCallerInfo::GetUpdatesSource source, |
612 const ModelTypeInvalidationMap& invalidation_map, | 594 const ModelTypeInvalidationMap& invalidation_map, |
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 595 const tracked_objects::Location& nudge_location) { |
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 596 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; | 597 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; |
616 | 598 |
617 SDVLOG_LOC(nudge_location, 2) | 599 SDVLOG_LOC(nudge_location, 2) |
618 << "In ScheduleNudgeImpl with delay " | 600 << "In ScheduleNudgeImpl with delay " |
619 << delay.InMilliseconds() << " ms, " | 601 << delay.InMilliseconds() << " ms, " |
620 << "source " << GetUpdatesSourceString(source) << ", " | 602 << "source " << GetUpdatesSourceString(source) << ", " |
621 << "payloads " | 603 << "payloads " |
622 << ModelTypeInvalidationMapToString(invalidation_map) | 604 << ModelTypeInvalidationMapToString(invalidation_map); |
623 << (is_canary_job ? " (canary)" : ""); | |
624 | 605 |
625 SyncSourceInfo info(source, invalidation_map); | 606 SyncSourceInfo info(source, invalidation_map); |
626 UpdateNudgeTimeRecords(info); | 607 UpdateNudgeTimeRecords(info); |
627 | 608 |
628 SyncSession* session(CreateSyncSession(info)); | 609 scoped_ptr<SyncSessionJob> job(new SyncSessionJob( |
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 610 SyncSessionJob::NUDGE, |
630 make_linked_ptr(session), is_canary_job, | 611 TimeTicks::Now() + delay, |
631 ConfigurationParams(), nudge_location); | 612 CreateSyncSession(info).Pass(), |
| 613 ConfigurationParams(), |
| 614 nudge_location)); |
632 | 615 |
633 session = NULL; | 616 JobProcessDecision decision = DecideOnJob(*job); |
634 if (!ShouldRunJob(job)) | 617 SDVLOG(2) << "Should run " |
| 618 << SyncSessionJob::GetPurposeString(job->purpose()) |
| 619 << " job " << job->session() |
| 620 << " in mode " << GetModeString(mode_) |
| 621 << ": " << GetDecisionString(decision); |
| 622 if (decision != CONTINUE) { |
| 623 // End of the line, though we may save the job for later. |
| 624 if (decision == SAVE) { |
| 625 HandleSaveJobDecision(job.Pass()); |
| 626 } else { |
| 627 DCHECK_EQ(decision, DROP); |
| 628 } |
635 return; | 629 return; |
| 630 } |
636 | 631 |
637 if (pending_nudge_.get()) { | 632 if (pending_nudge_) { |
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | |
639 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | |
640 return; | |
641 } | |
642 | |
643 SDVLOG(2) << "Coalescing pending nudge"; | |
644 pending_nudge_->session->Coalesce(*(job.session.get())); | |
645 | |
646 SDVLOG(2) << "Rescheduling pending nudge"; | 633 SDVLOG(2) << "Rescheduling pending nudge"; |
647 SyncSession* s = pending_nudge_->session.get(); | 634 pending_nudge_->mutable_session()->Coalesce(*(job->session())); |
648 job.session.reset(new SyncSession(s->context(), s->delegate(), | 635 // Choose the start time as the earliest of the 2. Note that this means |
649 s->source(), s->routing_info(), s->workers())); | 636 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds) |
650 | 637 // but a nudge is already scheduled to go out, we'll send the (tab) commit |
651 // Choose the start time as the earliest of the 2. | 638 // without waiting. |
652 job.scheduled_start = std::min(job.scheduled_start, | 639 pending_nudge_->set_scheduled_start( |
653 pending_nudge_->scheduled_start); | 640 std::min(job->scheduled_start(), pending_nudge_->scheduled_start())); |
654 pending_nudge_.reset(); | 641 // Abandon the old task by cloning and replacing the session. |
| 642 // It's possible that by "rescheduling" we're actually taking a job that |
| 643 // was previously unscheduled and giving it wings, so take care to reset |
| 644 // unscheduled nudge storage. |
| 645 job = pending_nudge_->CloneAndAbandon(); |
| 646 unscheduled_nudge_storage_.reset(); |
| 647 pending_nudge_ = NULL; |
655 } | 648 } |
656 | 649 |
657 // TODO(zea): Consider adding separate throttling/backoff for datatype | 650 // TODO(zea): Consider adding separate throttling/backoff for datatype |
658 // refresh requests. | 651 // refresh requests. |
659 ScheduleSyncSessionJob(job); | 652 ScheduleSyncSessionJob(job.Pass()); |
660 } | 653 } |
661 | 654 |
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { | 655 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { |
663 switch (mode) { | 656 switch (mode) { |
664 ENUM_CASE(CONFIGURATION_MODE); | 657 ENUM_CASE(CONFIGURATION_MODE); |
665 ENUM_CASE(NORMAL_MODE); | 658 ENUM_CASE(NORMAL_MODE); |
666 } | 659 } |
667 return ""; | 660 return ""; |
668 } | 661 } |
669 | 662 |
670 const char* SyncSchedulerImpl::GetDecisionString( | 663 const char* SyncSchedulerImpl::GetDecisionString( |
671 SyncSchedulerImpl::JobProcessDecision mode) { | 664 SyncSchedulerImpl::JobProcessDecision mode) { |
672 switch (mode) { | 665 switch (mode) { |
673 ENUM_CASE(CONTINUE); | 666 ENUM_CASE(CONTINUE); |
674 ENUM_CASE(SAVE); | 667 ENUM_CASE(SAVE); |
675 ENUM_CASE(DROP); | 668 ENUM_CASE(DROP); |
676 } | 669 } |
677 return ""; | 670 return ""; |
678 } | 671 } |
679 | 672 |
680 // static | |
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose( | |
682 SyncSessionJob::SyncSessionJobPurpose purpose, | |
683 SyncerStep* start, | |
684 SyncerStep* end) { | |
685 switch (purpose) { | |
686 case SyncSessionJob::CONFIGURATION: | |
687 *start = DOWNLOAD_UPDATES; | |
688 *end = APPLY_UPDATES; | |
689 return; | |
690 case SyncSessionJob::NUDGE: | |
691 case SyncSessionJob::POLL: | |
692 *start = SYNCER_BEGIN; | |
693 *end = SYNCER_END; | |
694 return; | |
695 default: | |
696 NOTREACHED(); | |
697 *start = SYNCER_END; | |
698 *end = SYNCER_END; | |
699 return; | |
700 } | |
701 } | |
702 | |
703 void SyncSchedulerImpl::PostTask( | 673 void SyncSchedulerImpl::PostTask( |
704 const tracked_objects::Location& from_here, | 674 const tracked_objects::Location& from_here, |
705 const char* name, const base::Closure& task) { | 675 const char* name, const base::Closure& task) { |
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; | 676 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; |
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 677 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
708 if (!started_) { | 678 if (!started_) { |
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 679 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
710 return; | 680 return; |
711 } | 681 } |
712 sync_loop_->PostTask(from_here, task); | 682 sync_loop_->PostTask(from_here, task); |
713 } | 683 } |
714 | 684 |
715 void SyncSchedulerImpl::PostDelayedTask( | 685 void SyncSchedulerImpl::PostDelayedTask( |
716 const tracked_objects::Location& from_here, | 686 const tracked_objects::Location& from_here, |
717 const char* name, const base::Closure& task, base::TimeDelta delay) { | 687 const char* name, const base::Closure& task, base::TimeDelta delay) { |
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " | 688 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " |
719 << delay.InMilliseconds() << " ms delay"; | 689 << delay.InMilliseconds() << " ms delay"; |
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 690 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
721 if (!started_) { | 691 if (!started_) { |
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; | 692 SDVLOG(1) << "Not posting task as scheduler is stopped."; |
723 return; | 693 return; |
724 } | 694 } |
725 sync_loop_->PostDelayedTask(from_here, task, delay); | 695 sync_loop_->PostDelayedTask(from_here, task, delay); |
726 } | 696 } |
727 | 697 |
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { | 698 void SyncSchedulerImpl::ScheduleSyncSessionJob( |
| 699 scoped_ptr<SyncSessionJob> job) { |
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 700 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
730 if (no_scheduling_allowed_) { | 701 if (no_scheduling_allowed_) { |
731 NOTREACHED() << "Illegal to schedule job while session in progress."; | 702 NOTREACHED() << "Illegal to schedule job while session in progress."; |
732 return; | 703 return; |
733 } | 704 } |
734 | 705 |
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); | 706 TimeDelta delay = job->scheduled_start() - TimeTicks::Now(); |
| 707 tracked_objects::Location loc(job->from_location()); |
736 if (delay < TimeDelta::FromMilliseconds(0)) | 708 if (delay < TimeDelta::FromMilliseconds(0)) |
737 delay = TimeDelta::FromMilliseconds(0); | 709 delay = TimeDelta::FromMilliseconds(0); |
738 SDVLOG_LOC(job.from_here, 2) | 710 SDVLOG_LOC(loc, 2) |
739 << "In ScheduleSyncSessionJob with " | 711 << "In ScheduleSyncSessionJob with " |
740 << SyncSessionJob::GetPurposeString(job.purpose) | 712 << SyncSessionJob::GetPurposeString(job->purpose()) |
741 << " job and " << delay.InMilliseconds() << " ms delay"; | 713 << " job and " << delay.InMilliseconds() << " ms delay"; |
742 | 714 |
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 715 DCHECK(job->purpose() == SyncSessionJob::NUDGE || |
744 job.purpose == SyncSessionJob::POLL); | 716 job->purpose() == SyncSessionJob::POLL); |
745 if (job.purpose == SyncSessionJob::NUDGE) { | 717 if (job->purpose() == SyncSessionJob::NUDGE) { |
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; | 718 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to "; |
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == | 719 DCHECK(!pending_nudge_ || pending_nudge_->session() == |
748 job.session); | 720 job->session()); |
749 pending_nudge_.reset(new SyncSessionJob(job)); | 721 pending_nudge_ = job.get(); |
750 } | 722 } |
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", | 723 |
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, | 724 PostDelayedTask(loc, "DoSyncSessionJob", |
753 weak_ptr_factory_.GetWeakPtr(), | 725 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob), |
754 job), | 726 weak_ptr_factory_.GetWeakPtr(), |
755 delay); | 727 base::Passed(&job)), |
| 728 delay); |
756 } | 729 } |
757 | 730 |
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { | 731 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) { |
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 732 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 733 if (job->purpose() == SyncSessionJob::NUDGE) { |
| 734 if (pending_nudge_ == NULL || |
| 735 pending_nudge_->session() != job->session()) { |
| 736 // |job| is abandoned. |
| 737 SDVLOG(2) << "Dropping a nudge in " |
| 738 << "DoSyncSessionJob because another nudge was scheduled"; |
| 739 return false; |
| 740 } |
| 741 pending_nudge_ = NULL; |
| 742 |
| 743 // Rebase the session with the latest model safe table and use it to purge |
| 744 // and update any disabled or modified entries in the job. |
| 745 job->mutable_session()->RebaseRoutingInfoWithLatest( |
| 746 session_context_->routing_info(), session_context_->workers()); |
| 747 } |
760 | 748 |
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 749 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
762 if (!ShouldRunJob(job)) { | 750 JobProcessDecision decision = DecideOnJob(*job); |
763 SLOG(WARNING) | 751 SDVLOG(2) << "Should run " |
764 << "Not executing " | 752 << SyncSessionJob::GetPurposeString(job->purpose()) |
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " | 753 << " job " << job->session() |
766 << GetUpdatesSourceString(job.session->source().updates_source); | 754 << " in mode " << GetModeString(mode_) |
767 return; | 755 << " with source " << job->session()->source().updates_source |
| 756 << ": " << GetDecisionString(decision); |
| 757 if (decision != CONTINUE) { |
| 758 if (decision == SAVE) { |
| 759 HandleSaveJobDecision(job.Pass()); |
| 760 } else { |
| 761 DCHECK_EQ(decision, DROP); |
| 762 } |
| 763 return false; |
768 } | 764 } |
769 | 765 |
770 if (job.purpose == SyncSessionJob::NUDGE) { | |
771 if (pending_nudge_.get() == NULL || | |
772 pending_nudge_->session != job.session) { | |
773 SDVLOG(2) << "Dropping a nudge in " | |
774 << "DoSyncSessionJob because another nudge was scheduled"; | |
775 return; // Another nudge must have been scheduled in in the meantime. | |
776 } | |
777 pending_nudge_.reset(); | |
778 | |
779 // Create the session with the latest model safe table and use it to purge | |
780 // and update any disabled or modified entries in the job. | |
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | |
782 | |
783 job.session->RebaseRoutingInfoWithLatest(*session); | |
784 } | |
785 SDVLOG(2) << "DoSyncSessionJob with " | 766 SDVLOG(2) << "DoSyncSessionJob with " |
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; | 767 << SyncSessionJob::GetPurposeString(job->purpose()) << " job"; |
787 | |
788 SyncerStep begin(SYNCER_END); | |
789 SyncerStep end(SYNCER_END); | |
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | |
791 | 768 |
792 bool has_more_to_sync = true; | 769 bool has_more_to_sync = true; |
793 while (ShouldRunJob(job) && has_more_to_sync) { | 770 bool premature_exit = false; |
| 771 while (DecideOnJob(*job) == CONTINUE && has_more_to_sync) { |
794 SDVLOG(2) << "Calling SyncShare."; | 772 SDVLOG(2) << "Calling SyncShare."; |
795 // Synchronously perform the sync session from this thread. | 773 // Synchronously perform the sync session from this thread. |
796 syncer_->SyncShare(job.session.get(), begin, end); | 774 premature_exit = !syncer_->SyncShare(job->mutable_session(), |
797 has_more_to_sync = job.session->HasMoreToSync(); | 775 job->start_step(), |
| 776 job->end_step()); |
| 777 |
| 778 has_more_to_sync = job->session()->HasMoreToSync(); |
798 if (has_more_to_sync) | 779 if (has_more_to_sync) |
799 job.session->PrepareForAnotherSyncCycle(); | 780 job->mutable_session()->PrepareForAnotherSyncCycle(); |
800 } | 781 } |
801 SDVLOG(2) << "Done SyncShare looping."; | 782 SDVLOG(2) << "Done SyncShare looping."; |
802 | 783 |
803 FinishSyncSessionJob(job); | 784 return FinishSyncSessionJob(job.Pass(), premature_exit); |
804 } | 785 } |
805 | 786 |
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { | 787 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { |
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 788 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
808 | 789 |
809 // We are interested in recording time between local nudges for datatypes. | 790 // We are interested in recording time between local nudges for datatypes. |
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. | 791 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. |
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) | 792 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) |
812 return; | 793 return; |
813 | 794 |
814 base::TimeTicks now = TimeTicks::Now(); | 795 base::TimeTicks now = TimeTicks::Now(); |
815 // Update timing information for how often datatypes are triggering nudges. | 796 // Update timing information for how often datatypes are triggering nudges. |
816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); | 797 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); |
817 iter != info.types.end(); | 798 iter != info.types.end(); |
818 ++iter) { | 799 ++iter) { |
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; | 800 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; |
820 last_local_nudges_by_model_type_[iter->first] = now; | 801 last_local_nudges_by_model_type_[iter->first] = now; |
821 if (previous.is_null()) | 802 if (previous.is_null()) |
822 continue; | 803 continue; |
823 | 804 |
824 #define PER_DATA_TYPE_MACRO(type_str) \ | 805 #define PER_DATA_TYPE_MACRO(type_str) \ |
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); | 806 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); |
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); | 807 SYNC_DATA_TYPE_HISTOGRAM(iter->first); |
827 #undef PER_DATA_TYPE_MACRO | 808 #undef PER_DATA_TYPE_MACRO |
828 } | 809 } |
829 } | 810 } |
830 | 811 |
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { | 812 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job, |
| 813 bool exited_prematurely) { |
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 814 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
833 // Now update the status of the connection from SCM. We need this to decide | 815 // Now update the status of the connection from SCM. We need this to decide |
834 // whether we need to save/run future jobs. The notifications from SCM are not | 816 // whether we need to save/run future jobs. The notifications from SCM are |
835 // reliable. | 817 // not reliable. |
836 // | 818 // |
837 // TODO(rlarocque): crbug.com/110954 | 819 // TODO(rlarocque): crbug.com/110954 |
838 // We should get rid of the notifications and it is probably not needed to | 820 // We should get rid of the notifications and it is probably not needed to |
839 // maintain this status variable in 2 places. We should query it directly from | 821 // maintain this status variable in 2 places. We should query it directly |
840 // SCM when needed. | 822 // from SCM when needed. |
841 ServerConnectionManager* scm = session_context_->connection_manager(); | 823 ServerConnectionManager* scm = session_context_->connection_manager(); |
842 UpdateServerConnectionManagerStatus(scm->server_status()); | 824 UpdateServerConnectionManagerStatus(scm->server_status()); |
843 | 825 |
844 if (IsSyncingCurrentlySilenced()) { | 826 // Let job know that we're through syncing (calling SyncShare) at this point. |
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 827 bool succeeded = false; |
846 // TODO(sync): Investigate whether we need to check job.purpose | 828 { |
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | |
848 SaveJob(job); | |
849 return; // Nothing to do. | |
850 } else if (job.session->Succeeded() && | |
851 !job.config_params.ready_task.is_null()) { | |
852 // If this was a configuration job with a ready task, invoke it now that | |
853 // we finished successfully. | |
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); | 829 AutoReset<bool> protector(&no_scheduling_allowed_, true); |
855 job.config_params.ready_task.Run(); | 830 succeeded = job->Finish(exited_prematurely); |
856 } | 831 } |
857 | 832 |
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 833 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
859 ScheduleNextSync(job); | 834 ScheduleNextSync(job.Pass(), succeeded); |
| 835 return succeeded; |
860 } | 836 } |
861 | 837 |
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { | 838 void SyncSchedulerImpl::ScheduleNextSync( |
| 839 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) { |
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 840 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
864 DCHECK(!old_job.session->HasMoreToSync()); | 841 DCHECK(!finished_job->session()->HasMoreToSync()); |
865 | 842 |
866 AdjustPolling(&old_job); | 843 AdjustPolling(finished_job.get()); |
867 | 844 |
868 if (old_job.session->Succeeded()) { | 845 if (succeeded) { |
869 // Only reset backoff if we actually reached the server. | 846 // Only reset backoff if we actually reached the server. |
870 if (old_job.session->SuccessfullyReachedServer()) | 847 // It's possible that we reached the server on one attempt, then had an |
| 848 // error on the next (or didn't perform some of the server-communicating |
| 849 // commands). We want to verify that, for all commands attempted, we |
| 850 // successfully spoke with the server. Therefore, we verify no errors |
| 851 // and at least one SYNCER_OK. |
| 852 if (finished_job->session()->DidReachServer()) |
871 wait_interval_.reset(); | 853 wait_interval_.reset(); |
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; | 854 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
873 return; | 855 return; |
874 } | 856 } |
875 | 857 |
876 if (old_job.purpose == SyncSessionJob::POLL) { | 858 if (IsSyncingCurrentlySilenced()) { |
| 859 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle."; |
| 860 // If we're here, it's because |job| was silenced until a server specified |
| 861 // time. (Note, it had to be |job|, because DecideOnJob would not permit |
| 862 // any job through while in WaitInterval::THROTTLED). |
| 863 scoped_ptr<SyncSessionJob> clone = finished_job->Clone(); |
| 864 if (clone->purpose() == SyncSessionJob::NUDGE) |
| 865 pending_nudge_ = clone.get(); |
| 866 else if (clone->purpose() == SyncSessionJob::CONFIGURATION) |
| 867 wait_interval_->pending_configure_job = clone.get(); |
| 868 else |
| 869 clone.reset(); // Unthrottling is enough, no need to force a canary. |
| 870 |
| 871 RestartWaiting(clone.Pass()); |
| 872 return; |
| 873 } |
| 874 |
| 875 if (finished_job->purpose() == SyncSessionJob::POLL) { |
877 return; // We don't retry POLL jobs. | 876 return; // We don't retry POLL jobs. |
878 } | 877 } |
879 | 878 |
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry | 879 // TODO(rlarocque): There's no reason why we should blindly backoff and retry |
881 // if we don't succeed. Some types of errors are not likely to disappear on | 880 // if we don't succeed. Some types of errors are not likely to disappear on |
882 // their own. With the return values now available in the old_job.session, we | 881 // their own. With the return values now available in the old_job.session, |
883 // should be able to detect such errors and only retry when we detect | 882 // we should be able to detect such errors and only retry when we detect |
884 // transient errors. | 883 // transient errors. |
885 | 884 |
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && | 885 if (IsBackingOff() && wait_interval_->timer.IsRunning() && |
887 mode_ == NORMAL_MODE) { | 886 mode_ == NORMAL_MODE) { |
888 // When in normal mode, we allow up to one nudge per backoff interval. It | 887 // When in normal mode, we allow up to one nudge per backoff interval. It |
889 // appears that this was our nudge for this interval, and it failed. | 888 // appears that this was our nudge for this interval, and it failed. |
890 // | 889 // |
891 // Note: This does not prevent us from running canary jobs. For example, an | 890 // Note: This does not prevent us from running canary jobs. For example, |
892 // IP address change might still result in another nudge being executed | 891 // an IP address change might still result in another nudge being executed |
893 // during this backoff interval. | 892 // during this backoff interval. |
894 SDVLOG(2) << "A nudge during backoff failed"; | 893 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge."; |
895 | 894 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose()); |
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
897 DCHECK(!wait_interval_->had_nudge); | 895 DCHECK(!wait_interval_->had_nudge); |
898 | 896 |
899 wait_interval_->had_nudge = true; | 897 wait_interval_->had_nudge = true; |
900 InitOrCoalescePendingJob(old_job); | 898 DCHECK(!pending_nudge_); |
901 RestartWaiting(); | 899 |
| 900 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone(); |
| 901 pending_nudge_ = new_job.get(); |
| 902 RestartWaiting(new_job.Pass()); |
902 } else { | 903 } else { |
903 // Either this is the first failure or a consecutive failure after our | 904 // Either this is the first failure or a consecutive failure after our |
904 // backoff timer expired. We handle it the same way in either case. | 905 // backoff timer expired. We handle it the same way in either case. |
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; | 906 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; |
906 HandleContinuationError(old_job); | 907 HandleContinuationError(finished_job.Pass()); |
907 } | 908 } |
908 } | 909 } |
909 | 910 |
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { | 911 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { |
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 912 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
912 | 913 |
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? | 914 TimeDelta poll = (!session_context_->notifications_enabled()) ? |
914 syncer_short_poll_interval_seconds_ : | 915 syncer_short_poll_interval_seconds_ : |
915 syncer_long_poll_interval_seconds_; | 916 syncer_long_poll_interval_seconds_; |
916 bool rate_changed = !poll_timer_.IsRunning() || | 917 bool rate_changed = !poll_timer_.IsRunning() || |
917 poll != poll_timer_.GetCurrentDelay(); | 918 poll != poll_timer_.GetCurrentDelay(); |
918 | 919 |
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | 920 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed) |
920 poll_timer_.Reset(); | 921 poll_timer_.Reset(); |
921 | 922 |
922 if (!rate_changed) | 923 if (!rate_changed) |
923 return; | 924 return; |
924 | 925 |
925 // Adjust poll rate. | 926 // Adjust poll rate. |
926 poll_timer_.Stop(); | 927 poll_timer_.Stop(); |
927 poll_timer_.Start(FROM_HERE, poll, this, | 928 poll_timer_.Start(FROM_HERE, poll, this, |
928 &SyncSchedulerImpl::PollTimerCallback); | 929 &SyncSchedulerImpl::PollTimerCallback); |
929 } | 930 } |
930 | 931 |
931 void SyncSchedulerImpl::RestartWaiting() { | 932 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) { |
932 CHECK(wait_interval_.get()); | 933 CHECK(wait_interval_.get()); |
933 wait_interval_->timer.Stop(); | 934 wait_interval_->timer.Stop(); |
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, | 935 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); |
935 this, &SyncSchedulerImpl::DoCanaryJob); | 936 if (wait_interval_->mode == WaitInterval::THROTTLED) { |
| 937 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
| 938 base::Bind(&SyncSchedulerImpl::Unthrottle, |
| 939 weak_ptr_factory_.GetWeakPtr(), |
| 940 base::Passed(&job))); |
| 941 } else { |
| 942 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, |
| 943 base::Bind(&SyncSchedulerImpl::DoCanaryJob, |
| 944 weak_ptr_factory_.GetWeakPtr(), |
| 945 base::Passed(&job))); |
| 946 } |
936 } | 947 } |
937 | 948 |
938 void SyncSchedulerImpl::HandleContinuationError( | 949 void SyncSchedulerImpl::HandleContinuationError( |
939 const SyncSessionJob& old_job) { | 950 scoped_ptr<SyncSessionJob> old_job) { |
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 951 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
941 if (DCHECK_IS_ON()) { | 952 if (DCHECK_IS_ON()) { |
942 if (IsBackingOff()) { | 953 if (IsBackingOff()) { |
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 954 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary()); |
944 } | 955 } |
945 } | 956 } |
946 | 957 |
947 TimeDelta length = delay_provider_->GetDelay( | 958 TimeDelta length = delay_provider_->GetDelay( |
948 IsBackingOff() ? wait_interval_->length : | 959 IsBackingOff() ? wait_interval_->length : |
949 delay_provider_->GetInitialDelay( | 960 delay_provider_->GetInitialDelay( |
950 old_job.session->status_controller().model_neutral_state())); | 961 old_job->session()->status_controller().model_neutral_state())); |
951 | 962 |
952 SDVLOG(2) << "In handle continuation error with " | 963 SDVLOG(2) << "In handle continuation error with " |
953 << SyncSessionJob::GetPurposeString(old_job.purpose) | 964 << SyncSessionJob::GetPurposeString(old_job->purpose()) |
954 << " job. The time delta(ms) is " | 965 << " job. The time delta(ms) is " |
955 << length.InMilliseconds(); | 966 << length.InMilliseconds(); |
956 | 967 |
957 // This will reset the had_nudge variable as well. | 968 // This will reset the had_nudge variable as well. |
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 969 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
959 length)); | 970 length)); |
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 971 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE)); |
| 972 new_job->set_scheduled_start(TimeTicks::Now() + length); |
| 973 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) { |
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | 974 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
962 // Config params should always get set. | 975 // Config params should always get set. |
963 DCHECK(!old_job.config_params.ready_task.is_null()); | 976 DCHECK(!old_job->config_params().ready_task.is_null()); |
964 SyncSession* old = old_job.session.get(); | 977 wait_interval_->pending_configure_job = new_job.get(); |
965 SyncSession* s(new SyncSession(session_context_, this, | |
966 old->source(), old->routing_info(), old->workers())); | |
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | |
968 make_linked_ptr(s), false, old_job.config_params, | |
969 FROM_HERE); | |
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | |
971 } else { | 978 } else { |
972 // We are not in configuration mode. So wait_interval's pending job | 979 // We are not in configuration mode. So wait_interval's pending job |
973 // should be null. | 980 // should be null. |
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 981 DCHECK(wait_interval_->pending_configure_job == NULL); |
| 982 DCHECK(!pending_nudge_); |
| 983 pending_nudge_ = new_job.get(); |
| 984 } |
975 | 985 |
976 // TODO(lipalani) - handle clear user data. | 986 RestartWaiting(new_job.Pass()); |
977 InitOrCoalescePendingJob(old_job); | |
978 } | |
979 RestartWaiting(); | |
980 } | 987 } |
981 | 988 |
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { | 989 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { |
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 990 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
984 DCHECK(weak_handle_this_.IsInitialized()); | 991 DCHECK(weak_handle_this_.IsInitialized()); |
985 SDVLOG(3) << "Posting StopImpl"; | 992 SDVLOG(3) << "Posting StopImpl"; |
986 weak_handle_this_.Call(FROM_HERE, | 993 weak_handle_this_.Call(FROM_HERE, |
987 &SyncSchedulerImpl::StopImpl, | 994 &SyncSchedulerImpl::StopImpl, |
988 callback); | 995 callback); |
989 } | 996 } |
990 | 997 |
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { | 998 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { |
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 999 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
993 SDVLOG(2) << "StopImpl called"; | 1000 SDVLOG(2) << "StopImpl called"; |
994 | 1001 |
995 // Kill any in-flight method calls. | 1002 // Kill any in-flight method calls. |
996 weak_ptr_factory_.InvalidateWeakPtrs(); | 1003 weak_ptr_factory_.InvalidateWeakPtrs(); |
997 wait_interval_.reset(); | 1004 wait_interval_.reset(); |
998 poll_timer_.Stop(); | 1005 poll_timer_.Stop(); |
999 if (started_) { | 1006 if (started_) { |
1000 started_ = false; | 1007 started_ = false; |
1001 } | 1008 } |
1002 if (!callback.is_null()) | 1009 if (!callback.is_null()) |
1003 callback.Run(); | 1010 callback.Run(); |
1004 } | 1011 } |
1005 | 1012 |
1006 void SyncSchedulerImpl::DoCanaryJob() { | 1013 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) { |
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1014 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1008 SDVLOG(2) << "Do canary job"; | 1015 SDVLOG(2) << "Do canary job"; |
1009 DoPendingJobIfPossible(true); | 1016 |
| 1017 // Only set canary privileges here, when we are about to run the job. This |
| 1018 // avoids confusion in managing canary bits during scheduling, when you |
| 1019 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that |
| 1020 // was scheduled as canary, and send it to an "unscheduled" state. |
| 1021 to_be_canary->GrantCanaryPrivilege(); |
| 1022 |
| 1023 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) { |
| 1024 // TODO(tim): We should be able to remove this... |
| 1025 scoped_ptr<SyncSession> temp = CreateSyncSession( |
| 1026 to_be_canary->session()->source()).Pass(); |
| 1027 // The routing info might have been changed since we cached the |
| 1028 // pending nudge. Update it by coalescing to the latest. |
| 1029 to_be_canary->mutable_session()->Coalesce(*(temp)); |
| 1030 } |
| 1031 DoSyncSessionJob(to_be_canary.Pass()); |
1010 } | 1032 } |
1011 | 1033 |
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { | 1034 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() { |
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1035 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1014 SyncSessionJob* job_to_execute = NULL; | 1036 // If we find a scheduled pending_ job, abandon the old one and return a |
| 1037 // a clone. If unscheduled, just hand over ownership. |
| 1038 scoped_ptr<SyncSessionJob> candidate; |
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 1039 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
1016 && wait_interval_->pending_configure_job.get()) { | 1040 && wait_interval_->pending_configure_job) { |
1017 SDVLOG(2) << "Found pending configure job"; | 1041 SDVLOG(2) << "Found pending configure job"; |
1018 job_to_execute = wait_interval_->pending_configure_job.get(); | 1042 candidate = |
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 1043 wait_interval_->pending_configure_job->CloneAndAbandon().Pass(); |
| 1044 wait_interval_->pending_configure_job = candidate.get(); |
| 1045 } else if (mode_ == NORMAL_MODE && pending_nudge_) { |
1020 SDVLOG(2) << "Found pending nudge job"; | 1046 SDVLOG(2) << "Found pending nudge job"; |
1021 | 1047 candidate = pending_nudge_->CloneAndAbandon(); |
1022 scoped_ptr<SyncSession> session(CreateSyncSession( | 1048 pending_nudge_ = candidate.get(); |
1023 pending_nudge_->session->source())); | 1049 unscheduled_nudge_storage_.reset(); |
1024 | |
1025 // Also the routing info might have been changed since we cached the | |
1026 // pending nudge. Update it by coalescing to the latest. | |
1027 pending_nudge_->session->Coalesce(*(session.get())); | |
1028 // The pending nudge would be cleared in the DoSyncSessionJob function. | |
1029 job_to_execute = pending_nudge_.get(); | |
1030 } | 1050 } |
1031 | 1051 return candidate.Pass(); |
1032 if (job_to_execute != NULL) { | |
1033 SDVLOG(2) << "Executing pending job"; | |
1034 SyncSessionJob copy = *job_to_execute; | |
1035 copy.is_canary_job = is_canary_job; | |
1036 DoSyncSessionJob(copy); | |
1037 } | |
1038 } | 1052 } |
1039 | 1053 |
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( | 1054 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession( |
1041 const SyncSourceInfo& source) { | 1055 const SyncSourceInfo& source) { |
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1056 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1043 DVLOG(2) << "Creating sync session with routes " | 1057 DVLOG(2) << "Creating sync session with routes " |
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); | 1058 << ModelSafeRoutingInfoToString(session_context_->routing_info()); |
1045 | 1059 |
1046 SyncSourceInfo info(source); | 1060 SyncSourceInfo info(source); |
1047 SyncSession* session(new SyncSession(session_context_, this, info, | 1061 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info, |
1048 session_context_->routing_info(), session_context_->workers())); | 1062 session_context_->routing_info(), session_context_->workers())); |
1049 | |
1050 return session; | |
1051 } | 1063 } |
1052 | 1064 |
1053 void SyncSchedulerImpl::PollTimerCallback() { | 1065 void SyncSchedulerImpl::PollTimerCallback() { |
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1066 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1055 ModelSafeRoutingInfo r; | 1067 ModelSafeRoutingInfo r; |
1056 ModelTypeInvalidationMap invalidation_map = | 1068 ModelTypeInvalidationMap invalidation_map = |
1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); | 1069 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); |
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); | 1070 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); |
1059 SyncSession* s = CreateSyncSession(info); | 1071 scoped_ptr<SyncSession> s(CreateSyncSession(info)); |
1060 | 1072 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL, |
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1073 TimeTicks::Now(), |
1062 make_linked_ptr(s), | 1074 s.Pass(), |
1063 false, | 1075 ConfigurationParams(), |
1064 ConfigurationParams(), | 1076 FROM_HERE)); |
1065 FROM_HERE); | 1077 ScheduleSyncSessionJob(job.Pass()); |
1066 | |
1067 ScheduleSyncSessionJob(job); | |
1068 } | 1078 } |
1069 | 1079 |
1070 void SyncSchedulerImpl::Unthrottle() { | 1080 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) { |
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1081 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1082 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
1073 SDVLOG(2) << "Unthrottled."; | 1083 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ") |
1074 DoCanaryJob(); | 1084 << "canary."; |
| 1085 if (to_be_canary.get()) |
| 1086 DoCanaryJob(to_be_canary.Pass()); |
| 1087 |
| 1088 // TODO(tim): The way DecideOnJob works today, canary privileges aren't |
| 1089 // enough to bypass a THROTTLED wait interval, which would suggest we need |
| 1090 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is |
| 1091 // probably the "right" thing to do). Bug 154216. |
1075 wait_interval_.reset(); | 1092 wait_interval_.reset(); |
1076 } | 1093 } |
1077 | 1094 |
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { | 1095 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { |
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1096 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 1097 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
1081 } | 1098 } |
1082 | 1099 |
1083 bool SyncSchedulerImpl::IsBackingOff() const { | 1100 bool SyncSchedulerImpl::IsBackingOff() const { |
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1101 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1085 return wait_interval_.get() && wait_interval_->mode == | 1102 return wait_interval_.get() && wait_interval_->mode == |
1086 WaitInterval::EXPONENTIAL_BACKOFF; | 1103 WaitInterval::EXPONENTIAL_BACKOFF; |
1087 } | 1104 } |
1088 | 1105 |
1089 void SyncSchedulerImpl::OnSilencedUntil( | 1106 void SyncSchedulerImpl::OnSilencedUntil( |
1090 const base::TimeTicks& silenced_until) { | 1107 const base::TimeTicks& silenced_until) { |
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | 1109 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, |
1093 silenced_until - TimeTicks::Now())); | 1110 silenced_until - TimeTicks::Now())); |
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this, | |
1095 &SyncSchedulerImpl::Unthrottle); | |
1096 } | 1111 } |
1097 | 1112 |
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { | 1113 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { |
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1114 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
1100 return wait_interval_.get() && wait_interval_->mode == | 1115 return wait_interval_.get() && wait_interval_->mode == |
1101 WaitInterval::THROTTLED; | 1116 WaitInterval::THROTTLED; |
1102 } | 1117 } |
1103 | 1118 |
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( | 1119 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( |
1105 const base::TimeDelta& new_interval) { | 1120 const base::TimeDelta& new_interval) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1159 | 1174 |
1160 #undef SDVLOG_LOC | 1175 #undef SDVLOG_LOC |
1161 | 1176 |
1162 #undef SDVLOG | 1177 #undef SDVLOG |
1163 | 1178 |
1164 #undef SLOG | 1179 #undef SLOG |
1165 | 1180 |
1166 #undef ENUM_CASE | 1181 #undef ENUM_CASE |
1167 | 1182 |
1168 } // namespace syncer | 1183 } // namespace syncer |
OLD | NEW |