Index: sync/engine/sync_scheduler.cc |
diff --git a/sync/engine/sync_scheduler.cc b/sync/engine/sync_scheduler.cc |
index 1e9da41f81cace5e706faf0b40bddaa70188d474..38f7f41caacafcccc4b6121b5b53ccf29916a536 100644 |
--- a/sync/engine/sync_scheduler.cc |
+++ b/sync/engine/sync_scheduler.cc |
@@ -67,6 +67,25 @@ bool IsActionableError( |
} |
} // namespace |
+SyncScheduler::ConfigureParams::ConfigureParams() |
+ : source(GetUpdatesCallerInfo::UNKNOWN), |
+ need_cleanup(false), |
+ need_encryption_key(false) {} |
+SyncScheduler::ConfigureParams::ConfigureParams( |
+ const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, |
+ const syncable::ModelTypeSet& types_to_config, |
+ const browser_sync::ModelSafeRoutingInfo& routing_info, |
+ bool need_cleanup, |
+ bool need_encryption_key, |
+ const base::Closure& ready_task) |
+ : source(source), |
+ types_to_config(types_to_config), |
+ routing_info(routing_info), |
+ need_cleanup(need_cleanup), |
+ need_encryption_key(need_encryption_key), |
+ ready_task(ready_task) {} |
+SyncScheduler::ConfigureParams::~ConfigureParams() {} |
+ |
SyncScheduler::DelayProvider::DelayProvider() {} |
SyncScheduler::DelayProvider::~DelayProvider() {} |
@@ -246,7 +265,7 @@ void SyncScheduler::UpdateServerConnectionManagerStatus( |
connection_code_ = code; |
} |
-void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
+void SyncScheduler::Start(Mode mode) { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
std::string thread_name = MessageLoop::current()->thread_name(); |
if (thread_name.empty()) |
@@ -255,13 +274,9 @@ void SyncScheduler::Start(Mode mode, const base::Closure& callback) { |
<< thread_name << " with mode " << GetModeString(mode); |
if (!started_) { |
started_ = true; |
- PostTask(FROM_HERE, "SendInitialSnapshot", |
- base::Bind(&SyncScheduler::SendInitialSnapshot, |
- weak_ptr_factory_.GetWeakPtr())); |
+ SendInitialSnapshot(); |
} |
- PostTask(FROM_HERE, "StartImpl", |
- base::Bind(&SyncScheduler::StartImpl, |
- weak_ptr_factory_.GetWeakPtr(), mode, callback)); |
+ StartImpl(mode); |
} |
void SyncScheduler::SendInitialSnapshot() { |
@@ -274,7 +289,7 @@ void SyncScheduler::SendInitialSnapshot() { |
session_context_->NotifyListeners(event); |
} |
-void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
+void SyncScheduler::StartImpl(Mode mode) { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
SDVLOG(2) << "In StartImpl with mode " << GetModeString(mode); |
@@ -284,8 +299,6 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
Mode old_mode = mode_; |
mode_ = mode; |
AdjustPolling(NULL); // Will kick start poll timer if needed. |
- if (!callback.is_null()) |
- callback.Run(); |
if (old_mode != mode_) { |
// We just changed our mode. See if there are any pending jobs that we could |
@@ -294,6 +307,108 @@ void SyncScheduler::StartImpl(Mode mode, const base::Closure& callback) { |
} |
} |
+namespace { |
+ |
+// Helper to extract the routing info and workers corresponding to types in |
+// |types| from |current_routes| and |current_workers|. |
+void BuildModelSafeParams( |
+ const ModelTypeSet& types_to_config, |
+ const ModelSafeRoutingInfo& current_routes, |
+ const std::vector<ModelSafeWorker*>& current_workers, |
+ ModelSafeRoutingInfo* result_routes, |
+ std::vector<ModelSafeWorker*>* result_workers) { |
+ std::set<ModelSafeGroup> active_groups; |
+ active_groups.insert(GROUP_PASSIVE); |
+ for (ModelTypeSet::Iterator iter = types_to_config.First(); iter.Good(); |
+ iter.Inc()) { |
+ syncable::ModelType type = iter.Get(); |
+ ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); |
+ DCHECK(route != current_routes.end()); |
+ ModelSafeGroup group = route->second; |
+ (*result_routes)[type] = group; |
+ active_groups.insert(group); |
+ } |
+ |
+ for(std::vector<ModelSafeWorker*>::const_iterator iter = |
+ current_workers.begin(); iter != current_workers.end(); ++iter) { |
+ if (active_groups.count((*iter)->GetModelSafeGroup()) > 0) |
+ result_workers->push_back(*iter); |
+ } |
+} |
+ |
+} // namespace. |
+ |
+bool SyncScheduler::Configure(const ConfigureParams& params) { |
+ DCHECK_EQ(MessageLoop::current(), sync_loop_); |
+ DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); |
+ DCHECK_EQ(CONFIGURATION_MODE, mode_); |
+ SDVLOG(2) << "Reconfiguring syncer."; |
+ |
+ // Only one configuration is allowed at a time. Verify we're not waiting |
+ // for a pending configure job. |
+ DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); |
+ |
+ // We set the routing info for all enabled types in the session context, but |
+ // the session for this configuration only knows about the routing info for |
+ // those types_to_config (set via restricted_routes and restricted_workers). |
+ browser_sync::ModelSafeRoutingInfo restricted_routes; |
+ std::vector<ModelSafeWorker*> restricted_workers; |
+ BuildModelSafeParams(params.types_to_config, |
+ params.routing_info, |
+ session_context_->workers(), |
+ &restricted_routes, |
+ &restricted_workers); |
+ const browser_sync::ModelSafeRoutingInfo& previous_routing_info = |
+ session_context_->routing_info(); |
+ // TODO(zea): ensure that the routing info can't be set anywhere but here. |
+ session_context_->set_routing_info(params.routing_info); |
+ |
+ if (params.need_cleanup) { |
rlarocque
2012/06/04 23:01:38
CleanupDisabledTypesCommand has some strange logic
tim (not reviewing)
2012/06/05 04:02:09
We do have a function that takes a 'types to clean
Nicolas Zea
2012/06/07 19:25:42
Discussed offline. I've removed this logic and now
|
+ SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
+ make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
+ false, |
+ FROM_HERE); |
+ DoSyncSessionJob(job); |
+ } |
+ |
+ if (params.need_encryption_key) { |
+ // TODO(zea): implement in such a way that we can handle failures and the |
+ // subsequent retry's the scheduler might perform. |
+ NOTIMPLEMENTED(); |
+ } |
+ |
+ // If the set of enabled types hasn't changed there's no need to configure. |
+ if (params.routing_info != previous_routing_info) { |
+ DCHECK(!restricted_routes.empty()); |
+ // TODO(tim): config-specific GetUpdatesCallerInfo value? |
+ SyncSession* session = new SyncSession( |
+ session_context_, |
+ this, |
+ SyncSourceInfo(params.source, |
+ syncable::ModelTypePayloadMapFromRoutingInfo( |
+ restricted_routes, |
+ std::string())), |
+ restricted_routes, |
+ restricted_workers); |
+ SyncSessionJob job(SyncSessionJob::CONFIGURATION, |
rlarocque
2012/06/04 23:01:38
The arguments for converting SyncSessionJobs into
|
+ TimeTicks::Now(), |
+ make_linked_ptr(session), |
+ false, |
+ FROM_HERE); |
+ job.config_params = params; |
+ DoSyncSessionJob(job); |
+ |
+ // If we failed, the job would have been saved as the pending configure |
+ // job and a wait interval would have been set. |
+ if (wait_interval_.get() && wait_interval_->pending_configure_job.get()) |
+ return false; |
+ } else { |
+ params.ready_task.Run(); |
+ } |
+ |
+ return true; |
+} |
+ |
SyncScheduler::JobProcessDecision SyncScheduler::DecideWhileInWaitInterval( |
const SyncSessionJob& job) { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
@@ -438,6 +553,8 @@ void SyncScheduler::SaveJob(const SyncSessionJob& job) { |
DCHECK(wait_interval_.get()); |
DCHECK(mode_ == CONFIGURATION_MODE); |
+ // Config params should always get set. |
+ DCHECK(!job.config_params.ready_task.is_null()); |
SyncSession* old = job.session.get(); |
SyncSession* s(new SyncSession(session_context_, this, old->source(), |
old->routing_info(), old->workers())); |
@@ -459,6 +576,8 @@ struct ModelSafeWorkerGroupIs { |
ModelSafeGroup group; |
}; |
+// TODO(sync): Remove the *Impl methods for the other Schedule* |
+// functions, too. |
void SyncScheduler::ScheduleClearUserData() { |
DCHECK_EQ(MessageLoop::current(), sync_loop_); |
PostTask(FROM_HERE, "ScheduleClearUserDataImpl", |
@@ -466,17 +585,6 @@ void SyncScheduler::ScheduleClearUserData() { |
weak_ptr_factory_.GetWeakPtr())); |
} |
-// TODO(sync): Remove the *Impl methods for the other Schedule* |
-// functions, too. |
-void SyncScheduler::ScheduleCleanupDisabledTypes() { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- SyncSessionJob job(SyncSessionJob::CLEANUP_DISABLED_TYPES, TimeTicks::Now(), |
- make_linked_ptr(CreateSyncSession(SyncSourceInfo())), |
- false, |
- FROM_HERE); |
- ScheduleSyncSessionJob(job); |
-} |
- |
void SyncScheduler::ScheduleNudge( |
const TimeDelta& delay, |
NudgeSource source, ModelTypeSet types, |
@@ -581,93 +689,6 @@ void SyncScheduler::ScheduleNudgeImpl( |
ScheduleSyncSessionJob(job); |
} |
-// Helper to extract the routing info and workers corresponding to types in |
-// |types| from |current_routes| and |current_workers|. |
-void GetModelSafeParamsForTypes(ModelTypeSet types, |
- const ModelSafeRoutingInfo& current_routes, |
- const std::vector<ModelSafeWorker*>& current_workers, |
- ModelSafeRoutingInfo* result_routes, |
- std::vector<ModelSafeWorker*>* result_workers) { |
- bool passive_group_added = false; |
- |
- typedef std::vector<ModelSafeWorker*>::const_iterator iter; |
- for (ModelTypeSet::Iterator it = types.First(); |
- it.Good(); it.Inc()) { |
- const syncable::ModelType t = it.Get(); |
- ModelSafeRoutingInfo::const_iterator route = current_routes.find(t); |
- DCHECK(route != current_routes.end()); |
- ModelSafeGroup group = route->second; |
- |
- (*result_routes)[t] = group; |
- iter w_tmp_it = std::find_if(current_workers.begin(), current_workers.end(), |
- ModelSafeWorkerGroupIs(group)); |
- if (w_tmp_it != current_workers.end()) { |
- iter result_workers_it = std::find_if( |
- result_workers->begin(), result_workers->end(), |
- ModelSafeWorkerGroupIs(group)); |
- if (result_workers_it == result_workers->end()) |
- result_workers->push_back(*w_tmp_it); |
- |
- if (group == GROUP_PASSIVE) |
- passive_group_added = true; |
- } else { |
- NOTREACHED(); |
- } |
- } |
- |
- // Always add group passive. |
- if (passive_group_added == false) { |
- iter it = std::find_if(current_workers.begin(), current_workers.end(), |
- ModelSafeWorkerGroupIs(GROUP_PASSIVE)); |
- if (it != current_workers.end()) |
- result_workers->push_back(*it); |
- else |
- NOTREACHED(); |
- } |
-} |
- |
-void SyncScheduler::ScheduleConfig( |
- ModelTypeSet types, |
- GetUpdatesCallerInfo::GetUpdatesSource source) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- DCHECK(IsConfigRelatedUpdateSourceValue(source)); |
- SDVLOG(2) << "Scheduling a config"; |
- |
- ModelSafeRoutingInfo routes; |
- std::vector<ModelSafeWorker*> workers; |
- GetModelSafeParamsForTypes(types, |
- session_context_->routing_info(), |
- session_context_->workers(), |
- &routes, &workers); |
- |
- PostTask(FROM_HERE, "ScheduleConfigImpl", |
- base::Bind(&SyncScheduler::ScheduleConfigImpl, |
- weak_ptr_factory_.GetWeakPtr(), |
- routes, |
- workers, |
- source)); |
-} |
- |
-void SyncScheduler::ScheduleConfigImpl( |
- const ModelSafeRoutingInfo& routing_info, |
- const std::vector<ModelSafeWorker*>& workers, |
- const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
- DCHECK_EQ(MessageLoop::current(), sync_loop_); |
- |
- SDVLOG(2) << "In ScheduleConfigImpl"; |
- // TODO(tim): config-specific GetUpdatesCallerInfo value? |
- SyncSession* session = new SyncSession(session_context_, this, |
- SyncSourceInfo(source, |
- syncable::ModelTypePayloadMapFromRoutingInfo( |
- routing_info, std::string())), |
- routing_info, workers); |
- SyncSessionJob job(SyncSessionJob::CONFIGURATION, TimeTicks::Now(), |
- make_linked_ptr(session), |
- false, |
- FROM_HERE); |
- ScheduleSyncSessionJob(job); |
-} |
- |
const char* SyncScheduler::GetModeString(SyncScheduler::Mode mode) { |
switch (mode) { |
ENUM_CASE(CONFIGURATION_MODE); |
@@ -889,6 +910,11 @@ void SyncScheduler::ScheduleNextSync(const SyncSessionJob& old_job) { |
// special code here. |
wait_interval_.reset(); |
SDVLOG(2) << "Job succeeded so not scheduling more jobs"; |
+ |
+ // If this was a configuration job with a ready task, invoke it now that |
+ // we finished successfully. |
+ if (!old_job.config_params.ready_task.is_null()) |
+ old_job.config_params.ready_task.Run(); |
return; |
} |
@@ -974,6 +1000,8 @@ void SyncScheduler::HandleContinuationError( |
wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
length)); |
if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
+ // Config params should always get set. |
+ DCHECK(!old_job.config_params.ready_task.is_null()); |
SyncSession* old = old_job.session.get(); |
SyncSession* s(new SyncSession(session_context_, this, |
old->source(), old->routing_info(), old->workers())); |