Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(287)

Unified Diff: dm/appengine/distributor/swarming/v1/distributor.go

Issue 2338153003: Add snapshotting for CIPD packages and dimensions to DM. (Closed)
Patch Set: Remove cleanup noise Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: dm/appengine/distributor/swarming/v1/distributor.go
diff --git a/dm/appengine/distributor/swarming/v1/distributor.go b/dm/appengine/distributor/swarming/v1/distributor.go
index 1c3b53f21c7c0304d013633e31440bc33fd45737..cf939a024ce06da99bc7059ec7080f2caacf6a73 100644
--- a/dm/appengine/distributor/swarming/v1/distributor.go
+++ b/dm/appengine/distributor/swarming/v1/distributor.go
@@ -45,6 +45,26 @@ var swarmBotLookup = map[string]dm.AbnormalFinish_Status{
"TIMED_OUT": dm.AbnormalFinish_TIMED_OUT,
}
+func cipdPackageFromSwarm(pkg *swarm.SwarmingRpcsCipdPackage) *sv1.CipdPackage {
+ return &sv1.CipdPackage{Name: pkg.PackageName, Version: pkg.Version}
+}
+
+func cipdSpecFromSwarm(pkgs *swarm.SwarmingRpcsCipdPins) *sv1.CipdSpec {
+ ret := &sv1.CipdSpec{
+ Client: cipdPackageFromSwarm(pkgs.ClientPackage),
+ ByPath: map[string]*sv1.CipdSpec_CipdPackages{},
+ }
+ for _, pkg := range pkgs.Packages {
+ pkgs, ok := ret.ByPath[pkg.Path]
+ if !ok {
+ pkgs = &sv1.CipdSpec_CipdPackages{}
+ ret.ByPath[pkg.Path] = pkgs
+ }
+ pkgs.Pkg = append(pkgs.Pkg, cipdPackageFromSwarm(pkg))
+ }
+ return ret
+}
+
func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair {
ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m))
for key, value := range m {
@@ -107,6 +127,15 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
return
}
+ prevParsed := (*sv1.Result)(nil)
+ if prev != nil {
+ prevParsed = &sv1.Result{}
+ if err = jsonpb.UnmarshalString(prev.Object, prevParsed); err != nil {
+ err = errors.Annotate(err).Reason("parsing previous result").Err()
+ return
+ }
+ }
+
isoCtx, _ := context.WithTimeout(d, 30*time.Second)
iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, params)
if err != nil {
@@ -114,36 +143,34 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
return
}
+ cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
+ if prev != nil {
Vadim Sh. 2016/09/26 23:16:06 nit: prevParsed != nil (and below)
iannucci 2016/09/27 01:23:56 ah yeah. done
+ cipdInput = prevParsed.CipdPins.ToCipdInput()
+ } else {
+ cipdInput = params.Job.Inputs.Cipd.ToCipdInput()
+ }
iannucci 2016/09/26 23:15:17 this is where the pinned cipd inputs are applied
+
topic, token, err := d.cfg.PrepareTopic(d, auth.Id)
if err != nil {
err = errors.Annotate(err).Reason("preparing topic").Err()
return
}
- cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
- if len(params.Job.Inputs.Packages) > 0 {
- cipdInput := &swarm.SwarmingRpcsCipdInput{
- Server: params.Job.Inputs.CipdServer,
- }
- for _, pkg := range params.Job.Inputs.Packages {
- cipdInput.Packages = append(cipdInput.Packages, &swarm.SwarmingRpcsCipdPackage{
- PackageName: pkg.Name,
- Path: pkg.Path,
- Version: pkg.Version,
- })
- }
- }
-
- dims := []*swarm.SwarmingRpcsStringPair(nil)
- for key, value := range params.Scheduling.Dimensions {
- dims = append(dims, &swarm.SwarmingRpcsStringPair{Key: key, Value: value})
- }
-
prefix := params.Meta.NamePrefix
if len(prefix) > 0 {
prefix += " / "
}
+ dims := make(map[string]string, len(params.Scheduling.Dimensions))
+ for k, v := range params.Scheduling.Dimensions {
+ dims[k] = v
+ }
+ if prev != nil {
+ for k, v := range prevParsed.SnapshotDimensions {
Vadim Sh. 2016/09/26 23:16:06 I'm not 100% sure it is what we want all the time.
iannucci 2016/09/27 01:23:56 yeah me either, but it's a start. I think even if
Vadim Sh. 2016/09/27 03:07:11 No, it's fine as is.
+ dims[k] = v
+ }
+ }
iannucci 2016/09/26 23:15:17 this is where the pinned dimensions are applied
+
tags := []string{
"requestor:DM",
"requestor:" + info.TrimmedAppID(d),
@@ -160,12 +187,11 @@ func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
ExpirationSecs: int64(desc.Meta.Timeouts.Start.Duration().Seconds()),
Name: fmt.Sprintf("%s%s|%d|%d", prefix, id.Quest, id.Attempt, id.Id),
- // Priority is already pre-Normalize()'d
Priority: int64(params.Scheduling.Priority),
Properties: &swarm.SwarmingRpcsTaskProperties{
CipdInput: cipdInput,
- Dimensions: toSwarmMap(params.Scheduling.Dimensions),
+ Dimensions: toSwarmMap(dims),
Env: toSwarmMap(params.Job.Env),
ExecutionTimeoutSecs: int64(desc.Meta.Timeouts.Run.Duration().Seconds()),
GracePeriodSecs: int64(desc.Meta.Timeouts.Stop.Duration().Seconds()),
@@ -197,7 +223,25 @@ func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error {
}, retry.LogCallback(d, "swarm.Task.Cancel"))
}
-func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) {
+func snapshotDimensions(p *sv1.Parameters, dims []*swarm.SwarmingRpcsStringListPair) map[string]string {
+ if len(p.Scheduling.SnapshotDimensions) == 0 {
+ return nil
+ }
+ allDims := map[string]string{}
+ for _, dim := range dims {
+ allDims[dim.Key] = dim.Value[len(dim.Value)-1]
Vadim Sh. 2016/09/26 23:16:06 I don't think we guarantee the order of dimensions
+ }
+
+ ret := make(map[string]string, len(p.Scheduling.SnapshotDimensions))
+ for _, k := range p.Scheduling.SnapshotDimensions {
+ if v, ok := allDims[k]; ok {
+ ret[k] = v
+ }
+ }
+ return ret
+}
+
+func (d *swarmingDist) GetStatus(q *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) {
rslt := (*swarm.SwarmingRpcsTaskResult)(nil)
err := retry.Retry(d, retry.Default, func() (err error) {
@@ -237,6 +281,15 @@ func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R
retData := &sv1.Result{
ExitCode: rslt.ExitCode,
}
+ if rslt.CipdPins != nil {
+ retData.CipdPins = cipdSpecFromSwarm(rslt.CipdPins)
+ }
+ params, err := parseParams(q)
+ if err != nil {
+ return nil, err
+ }
+ retData.SnapshotDimensions = snapshotDimensions(params, rslt.BotDimensions)
iannucci 2016/09/26 23:15:17 this is where the pinning happens
+
if ref := rslt.OutputsRef; ref != nil {
retData.IsolatedOutdir = &sv1.IsolatedRef{
Id: ref.Isolated, Server: ref.Isolatedserver}
« dm/api/distributor/swarming/v1/result.proto ('K') | « dm/api/distributor/swarming/v1/result.pb.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698