| Index: dm/appengine/distributor/swarming/v1/distributor.go
|
| diff --git a/dm/appengine/distributor/swarming/v1/distributor.go b/dm/appengine/distributor/swarming/v1/distributor.go
|
| index 1c3b53f21c7c0304d013633e31440bc33fd45737..33dfaf5bb131f18aa1ad8e888f5df7c52fbc177e 100644
|
| --- a/dm/appengine/distributor/swarming/v1/distributor.go
|
| +++ b/dm/appengine/distributor/swarming/v1/distributor.go
|
| @@ -45,6 +45,26 @@ var swarmBotLookup = map[string]dm.AbnormalFinish_Status{
|
| "TIMED_OUT": dm.AbnormalFinish_TIMED_OUT,
|
| }
|
|
|
| +func cipdPackageFromSwarm(pkg *swarm.SwarmingRpcsCipdPackage) *sv1.CipdPackage {
|
| + return &sv1.CipdPackage{Name: pkg.PackageName, Version: pkg.Version}
|
| +}
|
| +
|
| +func cipdSpecFromSwarm(pkgs *swarm.SwarmingRpcsCipdPins) *sv1.CipdSpec {
|
| + ret := &sv1.CipdSpec{
|
| + Client: cipdPackageFromSwarm(pkgs.ClientPackage),
|
| + ByPath: map[string]*sv1.CipdSpec_CipdPackages{},
|
| + }
|
| + for _, pkg := range pkgs.Packages {
|
| + pkgs, ok := ret.ByPath[pkg.Path]
|
| + if !ok {
|
| + pkgs = &sv1.CipdSpec_CipdPackages{}
|
| + ret.ByPath[pkg.Path] = pkgs
|
| + }
|
| + pkgs.Pkg = append(pkgs.Pkg, cipdPackageFromSwarm(pkg))
|
| + }
|
| + return ret
|
| +}
|
| +
|
| func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair {
|
| ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m))
|
| for key, value := range m {
|
| @@ -107,6 +127,15 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
|
| return
|
| }
|
|
|
| + prevParsed := (*sv1.Result)(nil)
|
| + if prev != nil {
|
| + prevParsed = &sv1.Result{}
|
| + if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err != nil {
|
| + err = errors.Annotate(err).Reason("parsing previous result").Err()
|
| + return
|
| + }
|
| + }
|
| +
|
| isoCtx, _ := context.WithTimeout(d, 30*time.Second)
|
| iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, params)
|
| if err != nil {
|
| @@ -114,36 +143,34 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
|
| return
|
| }
|
|
|
| + cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
|
| + if prevParsed != nil {
|
| + cipdInput = prevParsed.CipdPins.ToCipdInput()
|
| + } else {
|
| + cipdInput = params.Job.Inputs.Cipd.ToCipdInput()
|
| + }
|
| +
|
| topic, token, err := d.cfg.PrepareTopic(d, auth.Id)
|
| if err != nil {
|
| err = errors.Annotate(err).Reason("preparing topic").Err()
|
| return
|
| }
|
|
|
| - cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
|
| - if len(params.Job.Inputs.Packages) > 0 {
|
| - cipdInput := &swarm.SwarmingRpcsCipdInput{
|
| - Server: params.Job.Inputs.CipdServer,
|
| - }
|
| - for _, pkg := range params.Job.Inputs.Packages {
|
| - cipdInput.Packages = append(cipdInput.Packages, &swarm.SwarmingRpcsCipdPackage{
|
| - PackageName: pkg.Name,
|
| - Path: pkg.Path,
|
| - Version: pkg.Version,
|
| - })
|
| - }
|
| - }
|
| -
|
| - dims := []*swarm.SwarmingRpcsStringPair(nil)
|
| - for key, value := range params.Scheduling.Dimensions {
|
| - dims = append(dims, &swarm.SwarmingRpcsStringPair{Key: key, Value: value})
|
| - }
|
| -
|
| prefix := params.Meta.NamePrefix
|
| if len(prefix) > 0 {
|
| prefix += " / "
|
| }
|
|
|
| + dims := make(map[string]string, len(params.Scheduling.Dimensions))
|
| + for k, v := range params.Scheduling.Dimensions {
|
| + dims[k] = v
|
| + }
|
| + if prevParsed != nil {
|
| + for k, v := range prevParsed.SnapshotDimensions {
|
| + dims[k] = v
|
| + }
|
| + }
|
| +
|
| tags := []string{
|
| "requestor:DM",
|
| "requestor:" + info.TrimmedAppID(d),
|
| @@ -160,12 +187,11 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
|
| ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration().Seconds()),
|
| Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Quest, id.Attempt, id.Id),
|
|
|
| - // Priority is already pre-Normalize()'d
|
| Priority: int64(params.Scheduling.Priority),
|
|
|
| Properties: &swarm.SwarmingRpcsTaskProperties{
|
| CipdInput: cipdInput,
|
| - Dimensions: toSwarmMap(params.Scheduling.Dimensions),
|
| + Dimensions: toSwarmMap(dims),
|
| Env: toSwarmMap(params.Job.Env),
|
| ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.Run.Duration().Seconds()),
|
| GracePeriodSecs: int64(desc.Meta.Timeouts.Stop.Duration().Seconds()),
|
| @@ -197,7 +223,25 @@ func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error {
|
| }, retry.LogCallback(d, "swarm.Task.Cancel"))
|
| }
|
|
|
| -func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) {
|
| +func snapshotDimensions(p *sv1.Parameters, dims []*swarm.SwarmingRpcsStringListPair) map[string]string {
|
| + if len(p.Scheduling.SnapshotDimensions) == 0 {
|
| + return nil
|
| + }
|
| + allDims := map[string]string{}
|
| + for _, dim := range dims {
|
| + allDims[dim.Key] = dim.Value[len(dim.Value)-1]
|
| + }
|
| +
|
| + ret := make(map[string]string, len(p.Scheduling.SnapshotDimensions))
|
| + for _, k := range p.Scheduling.SnapshotDimensions {
|
| + if v, ok := allDims[k]; ok {
|
| + ret[k] = v
|
| + }
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func (d *swarmingDist) GetStatus(q *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) {
|
| rslt := (*swarm.SwarmingRpcsTaskResult)(nil)
|
|
|
| err := retry.Retry(d, retry.Default, func() (err error) {
|
| @@ -237,6 +281,15 @@ func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R
|
| retData := &sv1.Result{
|
| ExitCode: rslt.ExitCode,
|
| }
|
| + if rslt.CipdPins != nil {
|
| + retData.CipdPins = cipdSpecFromSwarm(rslt.CipdPins)
|
| + }
|
| + params, err := parseParams(q)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + retData.SnapshotDimensions = snapshotDimensions(params, rslt.BotDimensions)
|
| +
|
| if ref := rslt.OutputsRef; ref != nil {
|
| retData.IsolatedOutdir = &sv1.IsolatedRef{
|
| Id: ref.Isolated, Server: ref.Isolatedserver}
|
|
|