Chromium Code Reviews| Index: dm/appengine/distributor/swarming/v1/distributor.go |
| diff --git a/dm/appengine/distributor/swarming/v1/distributor.go b/dm/appengine/distributor/swarming/v1/distributor.go |
| index 1c3b53f21c7c0304d013633e31440bc33fd45737..cf939a024ce06da99bc7059ec7080f2caacf6a73 100644 |
| --- a/dm/appengine/distributor/swarming/v1/distributor.go |
| +++ b/dm/appengine/distributor/swarming/v1/distributor.go |
| @@ -45,6 +45,26 @@ var swarmBotLookup = map[string]dm.AbnormalFinish_Status{ |
| "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT, |
| } |
| +func cipdPackageFromSwarm(pkg *swarm.SwarmingRpcsCipdPackage) *sv1.CipdPackage { |
| + return &sv1.CipdPackage{Name: pkg.PackageName, Version: pkg.Version} |
| +} |
| + |
| +func cipdSpecFromSwarm(pkgs *swarm.SwarmingRpcsCipdPins) *sv1.CipdSpec { |
| + ret := &sv1.CipdSpec{ |
| + Client: cipdPackageFromSwarm(pkgs.ClientPackage), |
| + ByPath: map[string]*sv1.CipdSpec_CipdPackages{}, |
| + } |
| + for _, pkg := range pkgs.Packages { |
| + pkgs, ok := ret.ByPath[pkg.Path] |
| + if !ok { |
| + pkgs = &sv1.CipdSpec_CipdPackages{} |
| + ret.ByPath[pkg.Path] = pkgs |
| + } |
| + pkgs.Pkg = append(pkgs.Pkg, cipdPackageFromSwarm(pkg)) |
| + } |
| + return ret |
| +} |
| + |
| func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { |
| ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) |
| for key, value := range m { |
| @@ -107,6 +127,15 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d |
| return |
| } |
| + prevParsed := (*sv1.Result)(nil) |
| + if prev != nil { |
| + prevParsed = &sv1.Result{} |
| + if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err != nil { |
| + err = errors.Annotate(err).Reason("parsing previous result").Err() |
| + return |
| + } |
| + } |
| + |
| isoCtx, _ := context.WithTimeout(d, 30*time.Second) |
| iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, params) |
| if err != nil { |
| @@ -114,36 +143,34 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d |
| return |
| } |
| + cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) |
| + if prev != nil { |
|
Vadim Sh.
2016/09/26 23:16:06
nit: prevParsed != nil (and below)
iannucci
2016/09/27 01:23:56
ah yeah. done
|
| + cipdInput = prevParsed.CipdPins.ToCipdInput() |
| + } else { |
| + cipdInput = params.Job.Inputs.Cipd.ToCipdInput() |
| + } |
|
iannucci
2016/09/26 23:15:17
this is where the pinned cipd inputs are applied
|
| + |
| topic, token, err := d.cfg.PrepareTopic(d, auth.Id) |
| if err != nil { |
| err = errors.Annotate(err).Reason("preparing topic").Err() |
| return |
| } |
| - cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) |
| - if len(params.Job.Inputs.Packages) > 0 { |
| - cipdInput := &swarm.SwarmingRpcsCipdInput{ |
| - Server: params.Job.Inputs.CipdServer, |
| - } |
| - for _, pkg := range params.Job.Inputs.Packages { |
| - cipdInput.Packages = append(cipdInput.Packages, &swarm.SwarmingRpcsCipdPackage{ |
| - PackageName: pkg.Name, |
| - Path: pkg.Path, |
| - Version: pkg.Version, |
| - }) |
| - } |
| - } |
| - |
| - dims := []*swarm.SwarmingRpcsStringPair(nil) |
| - for key, value := range params.Scheduling.Dimensions { |
| - dims = append(dims, &swarm.SwarmingRpcsStringPair{Key: key, Value: value}) |
| - } |
| - |
| prefix := params.Meta.NamePrefix |
| if len(prefix) > 0 { |
| prefix += " / " |
| } |
| + dims := make(map[string]string, len(params.Scheduling.Dimensions)) |
| + for k, v := range params.Scheduling.Dimensions { |
| + dims[k] = v |
| + } |
| + if prev != nil { |
| + for k, v := range prevParsed.SnapshotDimensions { |
|
Vadim Sh.
2016/09/26 23:16:06
I'm not 100% sure it is what we want all the time.
iannucci
2016/09/27 01:23:56
yeah me either, but it's a start. I think even if
Vadim Sh.
2016/09/27 03:07:11
No, it's fine as is.
|
| + dims[k] = v |
| + } |
| + } |
|
iannucci
2016/09/26 23:15:17
this is where the pinned dimensions are applied
|
| + |
| tags := []string{ |
| "requestor:DM", |
| "requestor:" + info.TrimmedAppID(d), |
| @@ -160,12 +187,11 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d |
| ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration().Seconds()), |
| Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Quest, id.Attempt, id.Id), |
| - // Priority is already pre-Normalize()'d |
| Priority: int64(params.Scheduling.Priority), |
| Properties: &swarm.SwarmingRpcsTaskProperties{ |
| CipdInput: cipdInput, |
| - Dimensions: toSwarmMap(params.Scheduling.Dimensions), |
| + Dimensions: toSwarmMap(dims), |
| Env: toSwarmMap(params.Job.Env), |
| ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.Run.Duration().Seconds()), |
| GracePeriodSecs: int64(desc.Meta.Timeouts.Stop.Duration().Seconds()), |
| @@ -197,7 +223,25 @@ func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { |
| }, retry.LogCallback(d, "swarm.Task.Cancel")) |
| } |
| -func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) { |
| +func snapshotDimensions(p *sv1.Parameters, dims []*swarm.SwarmingRpcsStringListPair) map[string]string { |
| + if len(p.Scheduling.SnapshotDimensions) == 0 { |
| + return nil |
| + } |
| + allDims := map[string]string{} |
| + for _, dim := range dims { |
| + allDims[dim.Key] = dim.Value[len(dim.Value)-1] |
|
Vadim Sh.
2016/09/26 23:16:06
I don't think we guarantee the order of dimensions
|
| + } |
| + |
| + ret := make(map[string]string, len(p.Scheduling.SnapshotDimensions)) |
| + for _, k := range p.Scheduling.SnapshotDimensions { |
| + if v, ok := allDims[k]; ok { |
| + ret[k] = v |
| + } |
| + } |
| + return ret |
| +} |
| + |
| +func (d *swarmingDist) GetStatus(q *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) { |
| rslt := (*swarm.SwarmingRpcsTaskResult)(nil) |
| err := retry.Retry(d, retry.Default, func() (err error) { |
| @@ -237,6 +281,15 @@ func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R |
| retData := &sv1.Result{ |
| ExitCode: rslt.ExitCode, |
| } |
| + if rslt.CipdPins != nil { |
| + retData.CipdPins = cipdSpecFromSwarm(rslt.CipdPins) |
| + } |
| + params, err := parseParams(q) |
| + if err != nil { |
| + return nil, err |
| + } |
| + retData.SnapshotDimensions = snapshotDimensions(params, rslt.BotDimensions) |
|
iannucci
2016/09/26 23:15:17
this is where the pinning happens
|
| + |
| if ref := rslt.OutputsRef; ref != nil { |
| retData.IsolatedOutdir = &sv1.IsolatedRef{ |
| Id: ref.Isolated, Server: ref.Isolatedserver} |