Index: sync/engine/sync_scheduler.cc |
diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc |
index f55627dccc9eeeb14e447c3205e79c145798211c..04b30f0d1028bd1a845cc11c3b0a64d441ddd8b2 100644 |
--- a/sync/engine/sync_scheduler.cc |
+++ b/sync/engine/sync_scheduler.cc |
@@ -65,6 +65,24 @@ bool IsActionableError( |
} |
} // namespace |
+ConfigurationParams::ConfigurationParams() |
+ : source(GetUpdatesCallerInfo::UNKNOWN), |
+ keystore_key_status(KEYSTORE_KEY_UNNECESSARY) {} |
+ConfigurationParams::ConfigurationParams( |
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, |
+ const syncer::ModelTypeSet& types_to_download, |
+ const syncer::ModelSafeRoutingInfo& routing_info, |
+ KeystoreKeyStatus keystore_key_status, |
+ const base::Closure& ready_task) |
+ : source(source), |
+ types_to_download(types_to_download), |
+ routing_info(routing_info), |
+ keystore_key_status(keystore_key_status), |
+ ready_task(ready_task) { |
+ DCHECK(!ready_task.is_null()); |
+} |
+ConfigurationParams::~ConfigurationParams() {} |
+ |
SyncScheduler::DelayProvider::DelayProvider() {} |
SyncScheduler::DelayProvider::~DelayProvider() {} |
@@ -96,12 +114,16 @@ SyncScheduler::SyncSessionJob::~SyncSessionJob() {} |
SyncScheduler::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, |
base::TimeTicks start, |
- linked_ptr<sessions::SyncSession> session, bool is_canary_job, |
- const tracked_objects::Location& from_here) : purpose(purpose), |
- scheduled_start(start), |
- session(session), |
- is_canary_job(is_canary_job), |
- from_here(from_here) { |
+ linked_ptr<sessions::SyncSession> session, |
+ bool is_canary_job, |
+ const ConfigurationParams& config_params, |
+ const tracked_objects::Location& from_here) |
+ : purpose(purpose), |
+ scheduled_start(start), |
+ session(session), |
+ is_canary_job(is_canary_job), |
+ config_params(config_params), |
+ from_here(from_here) { |
} |
const char* SyncScheduler::SyncSessionJob::GetPurposeString( |
@@ -243,7 +265,7 @@ void SyncScheduler::UpdateServerConnectionManagerStatus( |
connection_code_ = code; |
} |
-void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
+void SyncScheduler::Start(Mode mode) { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
std::string thread_name = MessageLoop::current()->thread_name(); |
if (thread_name.empty()) |
@@ -260,8 +282,6 @@ void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
Mode old_mode = mode_; |
mode_ = mode; |
AdjustPolling(NULL); // Will kick start poll timer if needed. |
- if (!callback.is_null()) |
- callback.Run(); |
if (old_mode != mode_) { |
// We just changed our mode. See if there are any pending jobs that we could |
@@ -280,6 +300,112 @@ void SyncScheduler::SendInitialSnapshot() { |
session_context_->NotifyListeners(event); |
} |
+namespace { |
+ |
+// Helper to extract the routing info and workers corresponding to types in |
+// |types| from |current_routes| and |current_workers|. |
+void BuildModelSafeParams( |
+ const ModelTypeSet& types_to_download, |
+ const ModelSafeRoutingInfo& current_routes, |
+ const std::vector<ModelSafeWorker*>& current_workers, |
+ ModelSafeRoutingInfo* result_routes, |
+ std::vector<ModelSafeWorker*>* result_workers) { |
+ std::set<ModelSafeGroup> active_groups; |
+ active_groups.insert(GROUP_PASSIVE); |
+ for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); |
+ iter.Inc()) { |
+ syncer::ModelType type = iter.Get(); |
+ ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); |
+ DCHECK(route != current_routes.end()); |
+ ModelSafeGroup group = route->second; |
+ (*result_routes)[type] = group; |
+ active_groups.insert(group); |
+ } |
+ |
+ for(std::vector<ModelSafeWorker*>::const_iterator iter = |
+ current_workers.begin(); iter != current_workers.end(); ++iter) { |
+ if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) |
+ result_workers->push_back(*iter); |
+ } |
+} |
+ |
+} // namespace. |
+ |
+bool SyncScheduler::ScheduleConfiguration(const ConfigurationParams& params) { |
+ DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
+ DCHECK_EQ(CONFIGURATION_MODE, mode_); |
+ DCHECK(!params.ready_task.is_null()); |
+ SDVLOG(2) << "Reconfiguring syncer."; |
+ |
+ // Only one configuration is allowed at a time. Verify we're not waiting |
+ // for a pending configure job. |
+ DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); |
+ |
+ // TODO(sync): now that ModelChanging commands only use those workers within |
+ // the routing info, we don't really need |restricted_workers|. Remove it. |
+ // crbug.com/133030 |
+ syncer::ModelSafeRoutingInfo restricted_routes; |
+ std::vector<ModelSafeWorker*> restricted_workers; |
+ BuildModelSafeParams(params.types_to_download, |
+ params.routing_info, |
+ session_context_->workers(), |
+ &restricted_routes, |
+ &restricted_workers); |
+ session_context_->set_routing_info(params.routing_info); |
+ |
+ // We rely on this not failing, so don't need to worry about checking for |
+ // success. In addition, this will be removed as part of crbug.com/131433. |
+ SyncSessionJob cleanup_job( |
+ SyncSessionJob::CLEANUP_DISABLED_TYPES, |
+ TimeTicks::Now(), |
+ make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
+ false, |
+ ConfigurationParams(), |
+ FROM_HERE); |
+ DoSyncSessionJob(cleanup_job); |
+ |
+ if (params.keystore_key_status == ConfigurationParams::KEYSTORE_KEY_NEEDED) { |
+ // TODO(zea): implement in such a way that we can handle failures and the |
+ // subsequent retrys the scheduler might perform. See crbug.com/129665. |
+ NOTIMPLEMENTED(); |
+ } |
+ |
+ // Only reconfigure if we have types to download. |
+ if (!params.types_to_download.Empty()) { |
+ DCHECK(!restricted_routes.empty()); |
+ linked_ptr<SyncSession> session(new SyncSession( |
+ session_context_, |
+ this, |
+ SyncSourceInfo(params.source, |
+ ModelSafeRoutingInfoToPayloadMap( |
+ restricted_routes, |
+ std::string())), |
+ restricted_routes, |
+ restricted_workers)); |
+ SyncSessionJob job(SyncSessionJob::CONFIGURATION, |
+ TimeTicks::Now(), |
+ session, |
+ false, |
+ params, |
+ FROM_HERE); |
+ DoSyncSessionJob(job); |
+ |
+ // If we failed, the job would have been saved as the pending configure |
+ // job and a wait interval would have been set. |
+ if (!session->Succeeded()) { |
+ DCHECK(wait_interval_.get() && |
+ wait_interval_->pending_configure_job.get()); |
+ return false; |
+ } |
+ } else { |
+ SDVLOG(2) << "No change in routing info, calling ready task directly."; |
+ params.ready_task.Run(); |
+ } |
+ |
+ return true; |
+} |
+ |
SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
const SyncSessionJob& job) { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
@@ -376,7 +502,8 @@ void SyncScheduler::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
s->delegate(), s->source(), s->routing_info(), s->workers())); |
SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
- make_linked_ptr(session.release()), false, job.from_here); |
+ make_linked_ptr(session.release()), false, |
+ ConfigurationParams(), job.from_here); |
pending_nudge_.reset(new SyncSessionJob(new_job)); |
return; |
@@ -421,11 +548,17 @@ void SyncScheduler::SaveJob(const SyncSessionJob& job) { |
DCHECK(wait_interval_.get()); |
DCHECK(mode_ == CONFIGURATION_MODE); |
+ // Config params should always get set. |
+ DCHECK(!job.config_params.ready_task.is_null()); |
SyncSession* old = job.session.get(); |
SyncSession* s(new SyncSession(session_context_, this, old->source(), |
old->routing_info(), old->workers())); |
- SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
- make_linked_ptr(s), false, job.from_here); |
+ SyncSessionJob new_job(job.purpose, |
+ TimeTicks::Now(), |
+ make_linked_ptr(s), |
+ false, |
+ job.config_params, |
+ job.from_here); |
wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
} // drop the rest. |
// TODO(sync): Is it okay to drop the rest? It's weird that |
@@ -442,15 +575,6 @@ struct ModelSafeWorkerGroupIs { |
ModelSafeGroup group; |
}; |
-void SyncScheduler::CleanupDisabledTypes() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
- make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
- false, |
- FROM_HERE); |
- DoSyncSessionJob(job); |
-} |
- |
void SyncScheduler::ScheduleNudgeAsync( |
const TimeDelta& delay, |
NudgeSource source, ModelTypeSet types, |
@@ -508,7 +632,7 @@ void SyncScheduler::ScheduleNudgeImpl( |
SyncSession* session(CreateSyncSession(info)); |
SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
make_linked_ptr(session), is_canary_job, |
- nudge_location); |
+ ConfigurationParams(), nudge_location); |
session = NULL; |
if (!ShouldRunJob(job)) |
@@ -539,76 +663,6 @@ void SyncScheduler::ScheduleNudgeImpl( |
ScheduleSyncSessionJob(job); |
} |
-// Helper to extract the routing info and workers corresponding to types in |
-// |types| from |current_routes| and |current_workers|. |
-void GetModelSafeParamsForTypes(ModelTypeSet types, |
- const ModelSafeRoutingInfo& current_routes, |
- const std::vector<ModelSafeWorker*>& current_workers, |
- ModelSafeRoutingInfo* result_routes, |
- std::vector<ModelSafeWorker*>* result_workers) { |
- bool passive_group_added = false; |
- |
- typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
- for (ModelTypeSet::Iterator it = types.First(); |
- it.Good(); it.Inc()) { |
- const syncer::ModelType t = it.Get(); |
- ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); |
- DCHECK(route != current_routes.end()); |
- ModelSafeGroup group = route->second; |
- |
- (*result_routes)[t] = group; |
- iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), |
- ModelSafeWorkerGroupIs(group)); |
- if (w_tmp_it != current_workers.end()) { |
- iter result_workers_it = std::find_if( |
- result_workers->begin(), result_workers->end(), |
- ModelSafeWorkerGroupIs(group)); |
- if (result_workers_it == result_workers->end()) |
- result_workers->push_back(*w_tmp_it); |
- |
- if (group == GROUP_PASSIVE) |
- passive_group_added = true; |
- } else { |
- NOTREACHED(); |
- } |
- } |
- |
- // Always add group passive. |
- if (passive_group_added == false) { |
- iter it = std::find_if(current_workers.begin(), current_workers.end(), |
- ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
- if (it != current_workers.end()) |
- result_workers->push_back(*it); |
- else |
- NOTREACHED(); |
- } |
-} |
- |
-void SyncScheduler::ScheduleConfiguration( |
- ModelTypeSet types, |
- GetUpdatesCallerInfo::GetUpdatesSource source) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- DCHECK(IsConfigRelatedUpdateSourceValue(source)); |
- SDVLOG(2) << "Scheduling a config"; |
- |
- ModelSafeRoutingInfo routes; |
- std::vector<ModelSafeWorker*> workers; |
- GetModelSafeParamsForTypes(types, |
- session_context_->routing_info(), |
- session_context_->workers(), |
- &routes, &workers); |
- |
- SyncSession* session = new SyncSession(session_context_, this, |
- SyncSourceInfo(source, |
- ModelSafeRoutingInfoToPayloadMap(routes, std::string())), |
- routes, workers); |
- SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), |
- make_linked_ptr(session), |
- false, |
- FROM_HERE); |
- DoSyncSessionJob(job); |
-} |
- |
const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
switch (mode) { |
ENUM_CASE(CONFIGURATION_MODE); |
@@ -807,6 +861,11 @@ void SyncScheduler::FinishSyncSessionJob(const SyncSessionJob& job) { |
// here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) |
SaveJob(job); |
return; // Nothing to do. |
+ } else if (job.session->Succeeded() && |
+ !job.config_params.ready_task.is_null()) { |
+ // If this was a configuration job with a ready task, invoke it now that |
+ // we finished successfully. |
+ job.config_params.ready_task.Run(); |
} |
SDVLOG(2) << "Updating the next polling time after SyncMain"; |
@@ -909,11 +968,15 @@ void SyncScheduler::HandleContinuationError( |
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
length)); |
if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
+ SDVLOG(2) << "Configuration did not succeed, scheduling retry."; |
+ // Config params should always get set. |
+ DCHECK(!old_job.config_params.ready_task.is_null()); |
SyncSession* old = old_job.session.get(); |
SyncSession* s(new SyncSession(session_context_, this, |
old->source(), old->routing_info(), old->workers())); |
SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
- make_linked_ptr(s), false, FROM_HERE); |
+ make_linked_ptr(s), false, old_job.config_params, |
+ FROM_HERE); |
wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); |
} else { |
// We are not in configuration mode. So wait_interval's pending job |
@@ -1035,6 +1098,7 @@ void SyncScheduler::PollTimerCallback() { |
SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), |
make_linked_ptr(s), |
false, |
+ ConfigurationParams(), |
FROM_HERE); |
ScheduleSyncSessionJob(job); |