| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/engine/sync_scheduler.h" | 5 #include "sync/engine/sync_scheduler.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <cstring> | 8 #include <cstring> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 58 return false; | 58 return false; |
| 59 } | 59 } |
| 60 } | 60 } |
| 61 | 61 |
| 62 bool IsActionableError( | 62 bool IsActionableError( |
| 63 const syncer::SyncProtocolError& error) { | 63 const syncer::SyncProtocolError& error) { |
| 64 return (error.action != syncer::UNKNOWN_ACTION); | 64 return (error.action != syncer::UNKNOWN_ACTION); |
| 65 } | 65 } |
| 66 } // namespace | 66 } // namespace |
| 67 | 67 |
| 68 ConfigurationParams::ConfigurationParams() | |
| 69 : source(GetUpdatesCallerInfo::UNKNOWN), | |
| 70 keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {} | |
| 71 ConfigurationParams::ConfigurationParams( | |
| 72 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, | |
| 73 const syncer::ModelTypeSet& types_to_download, | |
| 74 const syncer::ModelSafeRoutingInfo& routing_info, | |
| 75 KeystoreKeyStatus keystore_key_status, | |
| 76 const base::Closure& ready_task) | |
| 77 : source(source), | |
| 78 types_to_download(types_to_download), | |
| 79 routing_info(routing_info), | |
| 80 keystore_key_status(keystore_key_status), | |
| 81 ready_task(ready_task) { | |
| 82 DCHECK(!ready_task.is_null()); | |
| 83 } | |
| 84 ConfigurationParams::~ConfigurationParams() {} | |
| 85 | |
| 86 SyncScheduler::DelayProvider::DelayProvider() {} | 68 SyncScheduler::DelayProvider::DelayProvider() {} |
| 87 SyncScheduler::DelayProvider::~DelayProvider() {} | 69 SyncScheduler::DelayProvider::~DelayProvider() {} |
| 88 | 70 |
| 89 SyncScheduler::WaitInterval::WaitInterval() | 71 SyncScheduler::WaitInterval::WaitInterval() |
| 90 : mode(UNKNOWN), | 72 : mode(UNKNOWN), |
| 91 had_nudge(false) { | 73 had_nudge(false) { |
| 92 } | 74 } |
| 93 | 75 |
| 94 SyncScheduler::WaitInterval::~WaitInterval() {} | 76 SyncScheduler::WaitInterval::~WaitInterval() {} |
| 95 | 77 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 107 | 89 |
| 108 SyncScheduler::SyncSessionJob::SyncSessionJob() | 90 SyncScheduler::SyncSessionJob::SyncSessionJob() |
| 109 : purpose(UNKNOWN), | 91 : purpose(UNKNOWN), |
| 110 is_canary_job(false) { | 92 is_canary_job(false) { |
| 111 } | 93 } |
| 112 | 94 |
| 113 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} | 95 SyncScheduler::SyncSessionJob::~SyncSessionJob() {} |
| 114 | 96 |
| 115 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, | 97 SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
| 116 base::TimeTicks start, | 98 base::TimeTicks start, |
| 117 linked_ptr<sessions::SyncSession> session, | 99 linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
| 118 bool is_canary_job, | 100 const tracked_objects::Location& from_here) : purpose(purpose), |
| 119 const ConfigurationParams& config_params, | 101 scheduled_start(start), |
| 120 const tracked_objects::Location& from_here) | 102 session(session), |
| 121 : purpose(purpose), | 103 is_canary_job(is_canary_job), |
| 122 scheduled_start(start), | 104 from_here(from_here) { |
| 123 session(session), | |
| 124 is_canary_job(is_canary_job), | |
| 125 config_params(config_params), | |
| 126 from_here(from_here) { | |
| 127 } | 105 } |
| 128 | 106 |
| 129 const char* SyncScheduler::SyncSessionJob::GetPurposeString( | 107 const char* SyncScheduler::SyncSessionJob::GetPurposeString( |
| 130 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { | 108 SyncScheduler::SyncSessionJob::SyncSessionJobPurpose purpose) { |
| 131 switch (purpose) { | 109 switch (purpose) { |
| 132 ENUM_CASE(UNKNOWN); | 110 ENUM_CASE(UNKNOWN); |
| 133 ENUM_CASE(POLL); | 111 ENUM_CASE(POLL); |
| 134 ENUM_CASE(NUDGE); | 112 ENUM_CASE(NUDGE); |
| 135 ENUM_CASE(CONFIGURATION); | 113 ENUM_CASE(CONFIGURATION); |
| 136 ENUM_CASE(CLEANUP_DISABLED_TYPES); | 114 ENUM_CASE(CLEANUP_DISABLED_TYPES); |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 258 | 236 |
| 259 void SyncScheduler::UpdateServerConnectionManagerStatus( | 237 void SyncScheduler::UpdateServerConnectionManagerStatus( |
| 260 HttpResponse::ServerConnectionCode code) { | 238 HttpResponse::ServerConnectionCode code) { |
| 261 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 239 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 262 SDVLOG(2) << "New server connection code: " | 240 SDVLOG(2) << "New server connection code: " |
| 263 << HttpResponse::GetServerConnectionCodeString(code); | 241 << HttpResponse::GetServerConnectionCodeString(code); |
| 264 | 242 |
| 265 connection_code_ = code; | 243 connection_code_ = code; |
| 266 } | 244 } |
| 267 | 245 |
| 268 void SyncScheduler::Start(Mode mode) { | 246 void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
| 269 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 247 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 270 std::string thread_name = MessageLoop::current()->thread_name(); | 248 std::string thread_name = MessageLoop::current()->thread_name(); |
| 271 if (thread_name.empty()) | 249 if (thread_name.empty()) |
| 272 thread_name = "<Main thread>"; | 250 thread_name = "<Main thread>"; |
| 273 SDVLOG(2) << "Start called from thread " | 251 SDVLOG(2) << "Start called from thread " |
| 274 << thread_name << " with mode " << GetModeString(mode); | 252 << thread_name << " with mode " << GetModeString(mode); |
| 275 if (!started_) { | 253 if (!started_) { |
| 276 started_ = true; | 254 started_ = true; |
| 277 SendInitialSnapshot(); | 255 SendInitialSnapshot(); |
| 278 } | 256 } |
| 279 | 257 |
| 280 DCHECK(!session_context_->account_name().empty()); | 258 DCHECK(!session_context_->account_name().empty()); |
| 281 DCHECK(syncer_.get()); | 259 DCHECK(syncer_.get()); |
| 282 Mode old_mode = mode_; | 260 Mode old_mode = mode_; |
| 283 mode_ = mode; | 261 mode_ = mode; |
| 284 AdjustPolling(NULL); // Will kick start poll timer if needed. | 262 AdjustPolling(NULL); // Will kick start poll timer if needed. |
| 263 if (!callback.is_null()) |
| 264 callback.Run(); |
| 285 | 265 |
| 286 if (old_mode != mode_) { | 266 if (old_mode != mode_) { |
| 287 // We just changed our mode. See if there are any pending jobs that we could | 267 // We just changed our mode. See if there are any pending jobs that we could |
| 288 // execute in the new mode. | 268 // execute in the new mode. |
| 289 DoPendingJobIfPossible(false); | 269 DoPendingJobIfPossible(false); |
| 290 } | 270 } |
| 291 } | 271 } |
| 292 | 272 |
| 293 void SyncScheduler::SendInitialSnapshot() { | 273 void SyncScheduler::SendInitialSnapshot() { |
| 294 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 274 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 295 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, | 275 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, |
| 296 SyncSourceInfo(), ModelSafeRoutingInfo(), | 276 SyncSourceInfo(), ModelSafeRoutingInfo(), |
| 297 std::vector<ModelSafeWorker*>())); | 277 std::vector<ModelSafeWorker*>())); |
| 298 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 278 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
| 299 event.snapshot = dummy->TakeSnapshot(); | 279 event.snapshot = dummy->TakeSnapshot(); |
| 300 session_context_->NotifyListeners(event); | 280 session_context_->NotifyListeners(event); |
| 301 } | 281 } |
| 302 | 282 |
| 303 namespace { | |
| 304 | |
| 305 // Helper to extract the routing info and workers corresponding to types in | |
| 306 // |types| from |current_routes| and |current_workers|. | |
| 307 void BuildModelSafeParams( | |
| 308 const ModelTypeSet& types_to_download, | |
| 309 const ModelSafeRoutingInfo& current_routes, | |
| 310 const std::vector<ModelSafeWorker*>& current_workers, | |
| 311 ModelSafeRoutingInfo* result_routes, | |
| 312 std::vector<ModelSafeWorker*>* result_workers) { | |
| 313 std::set<ModelSafeGroup> active_groups; | |
| 314 active_groups.insert(GROUP_PASSIVE); | |
| 315 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); | |
| 316 iter.Inc()) { | |
| 317 syncer::ModelType type = iter.Get(); | |
| 318 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); | |
| 319 DCHECK(route != current_routes.end()); | |
| 320 ModelSafeGroup group = route->second; | |
| 321 (*result_routes)[type] = group; | |
| 322 active_groups.insert(group); | |
| 323 } | |
| 324 | |
| 325 for(std::vector<ModelSafeWorker*>::const_iterator iter = | |
| 326 current_workers.begin(); iter != current_workers.end(); ++iter) { | |
| 327 if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) | |
| 328 result_workers->push_back(*iter); | |
| 329 } | |
| 330 } | |
| 331 | |
| 332 } // namespace. | |
| 333 | |
| 334 bool SyncScheduler::ScheduleConfiguration(const ConfigurationParams& params) { | |
| 335 DCHECK_EQ(MessageLoop::current(), sync_loop_); | |
| 336 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); | |
| 337 DCHECK_EQ(CONFIGURATION_MODE, mode_); | |
| 338 DCHECK(!params.ready_task.is_null()); | |
| 339 SDVLOG(2) << "Reconfiguring syncer."; | |
| 340 | |
| 341 // Only one configuration is allowed at a time. Verify we're not waiting | |
| 342 // for a pending configure job. | |
| 343 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); | |
| 344 | |
| 345 // TODO(sync): now that ModelChanging commands only use those workers within | |
| 346 // the routing info, we don't really need |restricted_workers|. Remove it. | |
| 347 // crbug.com/133030 | |
| 348 syncer::ModelSafeRoutingInfo restricted_routes; | |
| 349 std::vector<ModelSafeWorker*> restricted_workers; | |
| 350 BuildModelSafeParams(params.types_to_download, | |
| 351 params.routing_info, | |
| 352 session_context_->workers(), | |
| 353 &restricted_routes, | |
| 354 &restricted_workers); | |
| 355 session_context_->set_routing_info(params.routing_info); | |
| 356 | |
| 357 // We rely on this not failing, so don't need to worry about checking for | |
| 358 // success. In addition, this will be removed as part of crbug.com/131433. | |
| 359 SyncSessionJob cleanup_job( | |
| 360 SyncSessionJob::CLEANUP_DISABLED_TYPES, | |
| 361 TimeTicks::Now(), | |
| 362 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), | |
| 363 false, | |
| 364 ConfigurationParams(), | |
| 365 FROM_HERE); | |
| 366 DoSyncSessionJob(cleanup_job); | |
| 367 | |
| 368 if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) { | |
| 369 // TODO(zea): implement in such a way that we can handle failures and the | |
| 370 // subsequent retrys the scheduler might perform. See crbug.com/129665. | |
| 371 NOTIMPLEMENTED(); | |
| 372 } | |
| 373 | |
| 374 // Only reconfigure if we have types to download. | |
| 375 if (!params.types_to_download.Empty()) { | |
| 376 DCHECK(!restricted_routes.empty()); | |
| 377 linked_ptr<SyncSession> session(new SyncSession( | |
| 378 session_context_, | |
| 379 this, | |
| 380 SyncSourceInfo(params.source, | |
| 381 ModelSafeRoutingInfoToPayloadMap( | |
| 382 restricted_routes, | |
| 383 std::string())), | |
| 384 restricted_routes, | |
| 385 restricted_workers)); | |
| 386 SyncSessionJob job(SyncSessionJob::CONFIGURATION, | |
| 387 TimeTicks::Now(), | |
| 388 session, | |
| 389 false, | |
| 390 params, | |
| 391 FROM_HERE); | |
| 392 DoSyncSessionJob(job); | |
| 393 | |
| 394 // If we failed, the job would have been saved as the pending configure | |
| 395 // job and a wait interval would have been set. | |
| 396 if (!session->Succeeded()) { | |
| 397 DCHECK(wait_interval_.get() && | |
| 398 wait_interval_->pending_configure_job.get()); | |
| 399 return false; | |
| 400 } | |
| 401 } else { | |
| 402 SDVLOG(2) << "No change in routing info, calling ready task directly."; | |
| 403 params.ready_task.Run(); | |
| 404 } | |
| 405 | |
| 406 return true; | |
| 407 } | |
| 408 | |
| 409 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( | 283 SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
| 410 const SyncSessionJob& job) { | 284 const SyncSessionJob& job) { |
| 411 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 285 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 412 DCHECK(wait_interval_.get()); | 286 DCHECK(wait_interval_.get()); |
| 413 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); | 287 DCHECK_NE(job.purpose, SyncSessionJob::CLEANUP_DISABLED_TYPES); |
| 414 | 288 |
| 415 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " | 289 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " |
| 416 << WaitInterval::GetModeString(wait_interval_->mode) | 290 << WaitInterval::GetModeString(wait_interval_->mode) |
| 417 << (wait_interval_->had_nudge ? " (had nudge)" : "") | 291 << (wait_interval_->had_nudge ? " (had nudge)" : "") |
| 418 << (job.is_canary_job ? " (canary)" : ""); | 292 << (job.is_canary_job ? " (canary)" : ""); |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 495 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 369 void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
| 496 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 370 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 497 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | 371 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
| 498 if (pending_nudge_.get() == NULL) { | 372 if (pending_nudge_.get() == NULL) { |
| 499 SDVLOG(2) << "Creating a pending nudge job"; | 373 SDVLOG(2) << "Creating a pending nudge job"; |
| 500 SyncSession* s = job.session.get(); | 374 SyncSession* s = job.session.get(); |
| 501 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | 375 scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
| 502 s->delegate(), s->source(), s->routing_info(), s->workers())); | 376 s->delegate(), s->source(), s->routing_info(), s->workers())); |
| 503 | 377 |
| 504 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | 378 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
| 505 make_linked_ptr(session.release()), false, | 379 make_linked_ptr(session.release()), false, job.from_here); |
| 506 ConfigurationParams(), job.from_here); | |
| 507 pending_nudge_.reset(new SyncSessionJob(new_job)); | 380 pending_nudge_.reset(new SyncSessionJob(new_job)); |
| 508 | 381 |
| 509 return; | 382 return; |
| 510 } | 383 } |
| 511 | 384 |
| 512 SDVLOG(2) << "Coalescing a pending nudge"; | 385 SDVLOG(2) << "Coalescing a pending nudge"; |
| 513 pending_nudge_->session->Coalesce(*(job.session.get())); | 386 pending_nudge_->session->Coalesce(*(job.session.get())); |
| 514 pending_nudge_->scheduled_start = job.scheduled_start; | 387 pending_nudge_->scheduled_start = job.scheduled_start; |
| 515 | 388 |
| 516 // Unfortunately the nudge location cannot be modified. So it stores the | 389 // Unfortunately the nudge location cannot be modified. So it stores the |
| (...skipping 24 matching lines...) Expand all Loading... |
| 541 // TODO(sync): Should we also check that job.purpose != | 414 // TODO(sync): Should we also check that job.purpose != |
| 542 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) | 415 // CLEANUP_DISABLED_TYPES? (See http://crbug.com/90868.) |
| 543 if (job.purpose == SyncSessionJob::NUDGE) { | 416 if (job.purpose == SyncSessionJob::NUDGE) { |
| 544 SDVLOG(2) << "Saving a nudge job"; | 417 SDVLOG(2) << "Saving a nudge job"; |
| 545 InitOrCoalescePendingJob(job); | 418 InitOrCoalescePendingJob(job); |
| 546 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | 419 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
| 547 SDVLOG(2) << "Saving a configuration job"; | 420 SDVLOG(2) << "Saving a configuration job"; |
| 548 DCHECK(wait_interval_.get()); | 421 DCHECK(wait_interval_.get()); |
| 549 DCHECK(mode_ == CONFIGURATION_MODE); | 422 DCHECK(mode_ == CONFIGURATION_MODE); |
| 550 | 423 |
| 551 // Config params should always get set. | |
| 552 DCHECK(!job.config_params.ready_task.is_null()); | |
| 553 SyncSession* old = job.session.get(); | 424 SyncSession* old = job.session.get(); |
| 554 SyncSession* s(new SyncSession(session_context_, this, old->source(), | 425 SyncSession* s(new SyncSession(session_context_, this, old->source(), |
| 555 old->routing_info(), old->workers())); | 426 old->routing_info(), old->workers())); |
| 556 SyncSessionJob new_job(job.purpose, | 427 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
| 557 TimeTicks::Now(), | 428 make_linked_ptr(s), false, job.from_here); |
| 558 make_linked_ptr(s), | |
| 559 false, | |
| 560 job.config_params, | |
| 561 job.from_here); | |
| 562 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | 429 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
| 563 } // drop the rest. | 430 } // drop the rest. |
| 564 // TODO(sync): Is it okay to drop the rest? It's weird that | 431 // TODO(sync): Is it okay to drop the rest? It's weird that |
| 565 // SaveJob() only does what it says sometimes. (See | 432 // SaveJob() only does what it says sometimes. (See |
| 566 // http://crbug.com/90868.) | 433 // http://crbug.com/90868.) |
| 567 } | 434 } |
| 568 | 435 |
| 569 // Functor for std::find_if to search by ModelSafeGroup. | 436 // Functor for std::find_if to search by ModelSafeGroup. |
| 570 struct ModelSafeWorkerGroupIs { | 437 struct ModelSafeWorkerGroupIs { |
| 571 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} | 438 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} |
| 572 bool operator()(ModelSafeWorker* w) { | 439 bool operator()(ModelSafeWorker* w) { |
| 573 return group == w->GetModelSafeGroup(); | 440 return group == w->GetModelSafeGroup(); |
| 574 } | 441 } |
| 575 ModelSafeGroup group; | 442 ModelSafeGroup group; |
| 576 }; | 443 }; |
| 577 | 444 |
| 445 void SyncScheduler::CleanupDisabledTypes() { |
| 446 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 447 SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
| 448 make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
| 449 false, |
| 450 FROM_HERE); |
| 451 DoSyncSessionJob(job); |
| 452 } |
| 453 |
| 578 void SyncScheduler::ScheduleNudgeAsync( | 454 void SyncScheduler::ScheduleNudgeAsync( |
| 579 const TimeDelta& delay, | 455 const TimeDelta& delay, |
| 580 NudgeSource source, ModelTypeSet types, | 456 NudgeSource source, ModelTypeSet types, |
| 581 const tracked_objects::Location& nudge_location) { | 457 const tracked_objects::Location& nudge_location) { |
| 582 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 458 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 583 SDVLOG_LOC(nudge_location, 2) | 459 SDVLOG_LOC(nudge_location, 2) |
| 584 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " | 460 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " |
| 585 << "source " << GetNudgeSourceString(source) << ", " | 461 << "source " << GetNudgeSourceString(source) << ", " |
| 586 << "types " << ModelTypeSetToString(types); | 462 << "types " << ModelTypeSetToString(types); |
| 587 | 463 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 625 << "source " << GetUpdatesSourceString(source) << ", " | 501 << "source " << GetUpdatesSourceString(source) << ", " |
| 626 << "payloads " | 502 << "payloads " |
| 627 << syncer::ModelTypePayloadMapToString(types_with_payloads) | 503 << syncer::ModelTypePayloadMapToString(types_with_payloads) |
| 628 << (is_canary_job ? " (canary)" : ""); | 504 << (is_canary_job ? " (canary)" : ""); |
| 629 | 505 |
| 630 SyncSourceInfo info(source, types_with_payloads); | 506 SyncSourceInfo info(source, types_with_payloads); |
| 631 | 507 |
| 632 SyncSession* session(CreateSyncSession(info)); | 508 SyncSession* session(CreateSyncSession(info)); |
| 633 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 509 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
| 634 make_linked_ptr(session), is_canary_job, | 510 make_linked_ptr(session), is_canary_job, |
| 635 ConfigurationParams(), nudge_location); | 511 nudge_location); |
| 636 | 512 |
| 637 session = NULL; | 513 session = NULL; |
| 638 if (!ShouldRunJob(job)) | 514 if (!ShouldRunJob(job)) |
| 639 return; | 515 return; |
| 640 | 516 |
| 641 if (pending_nudge_.get()) { | 517 if (pending_nudge_.get()) { |
| 642 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | 518 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
| 643 SDVLOG(2) << "Dropping the nudge because we are in backoff"; | 519 SDVLOG(2) << "Dropping the nudge because we are in backoff"; |
| 644 return; | 520 return; |
| 645 } | 521 } |
| (...skipping 10 matching lines...) Expand all Loading... |
| 656 job.scheduled_start = std::min(job.scheduled_start, | 532 job.scheduled_start = std::min(job.scheduled_start, |
| 657 pending_nudge_->scheduled_start); | 533 pending_nudge_->scheduled_start); |
| 658 pending_nudge_.reset(); | 534 pending_nudge_.reset(); |
| 659 } | 535 } |
| 660 | 536 |
| 661 // TODO(zea): Consider adding separate throttling/backoff for datatype | 537 // TODO(zea): Consider adding separate throttling/backoff for datatype |
| 662 // refresh requests. | 538 // refresh requests. |
| 663 ScheduleSyncSessionJob(job); | 539 ScheduleSyncSessionJob(job); |
| 664 } | 540 } |
| 665 | 541 |
| 542 // Helper to extract the routing info and workers corresponding to types in |
| 543 // |types| from |current_routes| and |current_workers|. |
| 544 void GetModelSafeParamsForTypes(ModelTypeSet types, |
| 545 const ModelSafeRoutingInfo& current_routes, |
| 546 const std::vector<ModelSafeWorker*>& current_workers, |
| 547 ModelSafeRoutingInfo* result_routes, |
| 548 std::vector<ModelSafeWorker*>* result_workers) { |
| 549 bool passive_group_added = false; |
| 550 |
| 551 typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
| 552 for (ModelTypeSet::Iterator it = types.First(); |
| 553 it.Good(); it.Inc()) { |
| 554 const syncer::ModelType t = it.Get(); |
| 555 ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); |
| 556 DCHECK(route != current_routes.end()); |
| 557 ModelSafeGroup group = route->second; |
| 558 |
| 559 (*result_routes)[t] = group; |
| 560 iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), |
| 561 ModelSafeWorkerGroupIs(group)); |
| 562 if (w_tmp_it != current_workers.end()) { |
| 563 iter result_workers_it = std::find_if( |
| 564 result_workers->begin(), result_workers->end(), |
| 565 ModelSafeWorkerGroupIs(group)); |
| 566 if (result_workers_it == result_workers->end()) |
| 567 result_workers->push_back(*w_tmp_it); |
| 568 |
| 569 if (group == GROUP_PASSIVE) |
| 570 passive_group_added = true; |
| 571 } else { |
| 572 NOTREACHED(); |
| 573 } |
| 574 } |
| 575 |
| 576 // Always add group passive. |
| 577 if (passive_group_added == false) { |
| 578 iter it = std::find_if(current_workers.begin(), current_workers.end(), |
| 579 ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
| 580 if (it != current_workers.end()) |
| 581 result_workers->push_back(*it); |
| 582 else |
| 583 NOTREACHED(); |
| 584 } |
| 585 } |
| 586 |
| 587 void SyncScheduler::ScheduleConfiguration( |
| 588 ModelTypeSet types, |
| 589 GetUpdatesCallerInfo::GetUpdatesSource source) { |
| 590 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 591 DCHECK(IsConfigRelatedUpdateSourceValue(source)); |
| 592 SDVLOG(2) << "Scheduling a config"; |
| 593 |
| 594 ModelSafeRoutingInfo routes; |
| 595 std::vector<ModelSafeWorker*> workers; |
| 596 GetModelSafeParamsForTypes(types, |
| 597 session_context_->routing_info(), |
| 598 session_context_->workers(), |
| 599 &routes, &workers); |
| 600 |
| 601 SyncSession* session = new SyncSession(session_context_, this, |
| 602 SyncSourceInfo(source, |
| 603 ModelSafeRoutingInfoToPayloadMap(routes, std::string())), |
| 604 routes, workers); |
| 605 SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), |
| 606 make_linked_ptr(session), |
| 607 false, |
| 608 FROM_HERE); |
| 609 DoSyncSessionJob(job); |
| 610 } |
| 611 |
| 666 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { | 612 const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
| 667 switch (mode) { | 613 switch (mode) { |
| 668 ENUM_CASE(CONFIGURATION_MODE); | 614 ENUM_CASE(CONFIGURATION_MODE); |
| 669 ENUM_CASE(NORMAL_MODE); | 615 ENUM_CASE(NORMAL_MODE); |
| 670 } | 616 } |
| 671 return ""; | 617 return ""; |
| 672 } | 618 } |
| 673 | 619 |
| 674 const char* SyncScheduler::GetDecisionString( | 620 const char* SyncScheduler::GetDecisionString( |
| 675 SyncScheduler::JobProcessDecision mode) { | 621 SyncScheduler::JobProcessDecision mode) { |
| (...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 854 ServerConnectionManager* scm = session_context_->connection_manager(); | 800 ServerConnectionManager* scm = session_context_->connection_manager(); |
| 855 UpdateServerConnectionManagerStatus(scm->server_status()); | 801 UpdateServerConnectionManagerStatus(scm->server_status()); |
| 856 | 802 |
| 857 UpdateCarryoverSessionState(job); | 803 UpdateCarryoverSessionState(job); |
| 858 if (IsSyncingCurrentlySilenced()) { | 804 if (IsSyncingCurrentlySilenced()) { |
| 859 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; | 805 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; |
| 860 // TODO(sync): Investigate whether we need to check job.purpose | 806 // TODO(sync): Investigate whether we need to check job.purpose |
| 861 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) | 807 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) |
| 862 SaveJob(job); | 808 SaveJob(job); |
| 863 return; // Nothing to do. | 809 return; // Nothing to do. |
| 864 } else if (job.session->Succeeded() && | |
| 865 !job.config_params.ready_task.is_null()) { | |
| 866 // If this was a configuration job with a ready task, invoke it now that | |
| 867 // we finished successfully. | |
| 868 job.config_params.ready_task.Run(); | |
| 869 } | 810 } |
| 870 | 811 |
| 871 SDVLOG(2) << "Updating the next polling time after SyncMain"; | 812 SDVLOG(2) << "Updating the next polling time after SyncMain"; |
| 872 ScheduleNextSync(job); | 813 ScheduleNextSync(job); |
| 873 } | 814 } |
| 874 | 815 |
| 875 void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { | 816 void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { |
| 876 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 817 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 877 DCHECK(!old_job.session->HasMoreToSync()); | 818 DCHECK(!old_job.session->HasMoreToSync()); |
| 878 | 819 |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 961 | 902 |
| 962 SDVLOG(2) << "In handle continuation error with " | 903 SDVLOG(2) << "In handle continuation error with " |
| 963 << SyncSessionJob::GetPurposeString(old_job.purpose) | 904 << SyncSessionJob::GetPurposeString(old_job.purpose) |
| 964 << " job. The time delta(ms) is " | 905 << " job. The time delta(ms) is " |
| 965 << length.InMilliseconds(); | 906 << length.InMilliseconds(); |
| 966 | 907 |
| 967 // This will reset the had_nudge variable as well. | 908 // This will reset the had_nudge variable as well. |
| 968 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 909 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| 969 length)); | 910 length)); |
| 970 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 911 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| 971 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; | |
| 972 // Config params should always get set. | |
| 973 DCHECK(!old_job.config_params.ready_task.is_null()); | |
| 974 SyncSession* old = old_job.session.get(); | 912 SyncSession* old = old_job.session.get(); |
| 975 SyncSession* s(new SyncSession(session_context_, this, | 913 SyncSession* s(new SyncSession(session_context_, this, |
| 976 old->source(), old->routing_info(), old->workers())); | 914 old->source(), old->routing_info(), old->workers())); |
| 977 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | 915 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
| 978 make_linked_ptr(s), false, old_job.config_params, | 916 make_linked_ptr(s), false, FROM_HERE); |
| 979 FROM_HERE); | |
| 980 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); | 917 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
| 981 } else { | 918 } else { |
| 982 // We are not in configuration mode. So wait_interval's pending job | 919 // We are not in configuration mode. So wait_interval's pending job |
| 983 // should be null. | 920 // should be null. |
| 984 DCHECK(wait_interval_->pending_configure_job.get() == NULL); | 921 DCHECK(wait_interval_->pending_configure_job.get() == NULL); |
| 985 | 922 |
| 986 // TODO(lipalani) - handle clear user data. | 923 // TODO(lipalani) - handle clear user data. |
| 987 InitOrCoalescePendingJob(old_job); | 924 InitOrCoalescePendingJob(old_job); |
| 988 } | 925 } |
| 989 RestartWaiting(); | 926 RestartWaiting(); |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1028 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1092 ModelSafeRoutingInfo r; | 1029 ModelSafeRoutingInfo r; |
| 1093 ModelTypePayloadMap types_with_payloads = | 1030 ModelTypePayloadMap types_with_payloads = |
| 1094 ModelSafeRoutingInfoToPayloadMap(r, std::string()); | 1031 ModelSafeRoutingInfoToPayloadMap(r, std::string()); |
| 1095 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); | 1032 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
| 1096 SyncSession* s = CreateSyncSession(info); | 1033 SyncSession* s = CreateSyncSession(info); |
| 1097 | 1034 |
| 1098 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), | 1035 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), |
| 1099 make_linked_ptr(s), | 1036 make_linked_ptr(s), |
| 1100 false, | 1037 false, |
| 1101 ConfigurationParams(), | |
| 1102 FROM_HERE); | 1038 FROM_HERE); |
| 1103 | 1039 |
| 1104 ScheduleSyncSessionJob(job); | 1040 ScheduleSyncSessionJob(job); |
| 1105 } | 1041 } |
| 1106 | 1042 |
| 1107 void SyncScheduler::Unthrottle() { | 1043 void SyncScheduler::Unthrottle() { |
| 1108 DCHECK_EQ(MessageLoop::current(), sync_loop_); | 1044 DCHECK_EQ(MessageLoop::current(), sync_loop_); |
| 1109 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 1045 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| 1110 SDVLOG(2) << "Unthrottled."; | 1046 SDVLOG(2) << "Unthrottled."; |
| 1111 DoCanaryJob(); | 1047 DoCanaryJob(); |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1195 | 1131 |
| 1196 #undef SDVLOG_LOC | 1132 #undef SDVLOG_LOC |
| 1197 | 1133 |
| 1198 #undef SDVLOG | 1134 #undef SDVLOG |
| 1199 | 1135 |
| 1200 #undef SLOG | 1136 #undef SLOG |
| 1201 | 1137 |
| 1202 #undef ENUM_CASE | 1138 #undef ENUM_CASE |
| 1203 | 1139 |
| 1204 } // namespace syncer | 1140 } // namespace syncer |
| OLD | NEW |