| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 73 _, authC := httpClients(c) | 73 _, authC := httpClients(c) |
| 74 svc, err := swarm.New(authC) | 74 svc, err := swarm.New(authC) |
| 75 if err != nil { | 75 if err != nil { |
| 76 // can only happen with nil client | 76 // can only happen with nil client |
| 77 panic(err) | 77 panic(err) |
| 78 } | 78 } |
| 79 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" | 79 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" |
| 80 return svc | 80 return svc |
| 81 } | 81 } |
| 82 | 82 |
| 83 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To
ken, _ time.Duration, err error) { | 83 func parseParams(desc *dm.Quest_Desc) (ret *sv1.Parameters, err error) { |
| 84 » auth := tsk.ExecutionAuth() | 84 » ret = &sv1.Parameters{} |
| 85 » id := auth.Id | 85 » if err = jsonpb.UnmarshalString(desc.DistributorParameters, ret); err !=
nil { |
| 86 » desc := tsk.Payload() | |
| 87 | |
| 88 » params := &sv1.Parameters{} | |
| 89 » if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err
!= nil { | |
| 90 err = errors.Annotate(err). | 86 err = errors.Annotate(err). |
| 91 Reason("unmarshalling DistributorParameters"). | 87 Reason("unmarshalling DistributorParameters"). |
| 92 InternalReason("These paramaeters were already validated
?"). | 88 InternalReason("These paramaeters were already validated
?"). |
| 93 Err() | 89 Err() |
| 94 return | 90 return |
| 95 } | 91 } |
| 96 » if err = params.Normalize(); err != nil { | 92 » if err = ret.Normalize(); err != nil { |
| 97 err = errors.Annotate(err). | 93 err = errors.Annotate(err). |
| 98 Reason("normalizing DistributorParameters"). | 94 Reason("normalizing DistributorParameters"). |
| 99 InternalReason("These paramaeters were already normalize
d successfully once?"). | 95 InternalReason("These paramaeters were already normalize
d successfully once?"). |
| 100 Err() | 96 Err() |
| 101 return | 97 return |
| 102 } | 98 } |
| 99 return |
| 100 } |
| 101 |
| 102 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { |
| 103 id := auth.Id |
| 104 |
| 105 params, err := parseParams(desc) |
| 106 if err != nil { |
| 107 return |
| 108 } |
| 103 | 109 |
| 104 isoCtx, _ := context.WithTimeout(d, 30*time.Second) | 110 isoCtx, _ := context.WithTimeout(d, 30*time.Second) |
| 105 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, tsk, params) | 111 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa
rams) |
| 106 if err != nil { | 112 if err != nil { |
| 107 err = errors.Annotate(err).Reason("prepping Isolated").Err() | 113 err = errors.Annotate(err).Reason("prepping Isolated").Err() |
| 108 return | 114 return |
| 109 } | 115 } |
| 110 | 116 |
| 111 » topic, token, err := tsk.PrepareTopic() | 117 » topic, token, err := d.cfg.PrepareTopic(d, auth.Id) |
| 112 if err != nil { | 118 if err != nil { |
| 113 err = errors.Annotate(err).Reason("preparing topic").Err() | 119 err = errors.Annotate(err).Reason("preparing topic").Err() |
| 114 return | 120 return |
| 115 } | 121 } |
| 116 | 122 |
| 117 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) | 123 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) |
| 118 if len(params.Job.Inputs.Packages) > 0 { | 124 if len(params.Job.Inputs.Packages) > 0 { |
| 119 cipdInput := &swarm.SwarmingRpcsCipdInput{ | 125 cipdInput := &swarm.SwarmingRpcsCipdInput{ |
| 120 Server: params.Job.Inputs.CipdServer, | 126 Server: params.Job.Inputs.CipdServer, |
| 121 } | 127 } |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 176 }, retry.LogCallback(d, "swarm.Tasks.New")) | 182 }, retry.LogCallback(d, "swarm.Tasks.New")) |
| 177 if err != nil { | 183 if err != nil { |
| 178 err = errors.Annotate(err).Reason("calling swarm.Tasks.New").Err
() | 184 err = errors.Annotate(err).Reason("calling swarm.Tasks.New").Err
() |
| 179 return | 185 return |
| 180 } | 186 } |
| 181 | 187 |
| 182 tok = distributor.Token(rslt.TaskId) | 188 tok = distributor.Token(rslt.TaskId) |
| 183 return | 189 return |
| 184 } | 190 } |
| 185 | 191 |
| 186 func (d *swarmingDist) Cancel(tok distributor.Token) error { | 192 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error { |
| 187 return retry.Retry(d, retry.Default, func() (err error) { | 193 return retry.Retry(d, retry.Default, func() (err error) { |
| 188 ctx, _ := context.WithTimeout(d, 10*time.Second) | 194 ctx, _ := context.WithTimeout(d, 10*time.Second) |
| 189 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co
ntext(ctx).Do() | 195 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co
ntext(ctx).Do() |
| 190 return | 196 return |
| 191 }, retry.LogCallback(d, "swarm.Task.Cancel")) | 197 }, retry.LogCallback(d, "swarm.Task.Cancel")) |
| 192 } | 198 } |
| 193 | 199 |
| 194 func (d *swarmingDist) GetStatus(tok distributor.Token) (*dm.Result, error) { | 200 func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R
esult, error) { |
| 195 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) | 201 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) |
| 196 | 202 |
| 197 err := retry.Retry(d, retry.Default, func() (err error) { | 203 err := retry.Retry(d, retry.Default, func() (err error) { |
| 198 ctx, _ := context.WithTimeout(d, 10*time.Second) | 204 ctx, _ := context.WithTimeout(d, 10*time.Second) |
| 199 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok))
.Context(ctx).Do() | 205 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok))
.Context(ctx).Do() |
| 200 return | 206 return |
| 201 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) | 207 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) |
| 202 if err != nil { | 208 if err != nil { |
| 203 if gerr := err.(*googleapi.Error); gerr != nil { | 209 if gerr := err.(*googleapi.Error); gerr != nil { |
| 204 if gerr.Code == http.StatusNotFound { | 210 if gerr.Code == http.StatusNotFound { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 261 } | 267 } |
| 262 } | 268 } |
| 263 | 269 |
| 264 return ret, nil | 270 return ret, nil |
| 265 } | 271 } |
| 266 | 272 |
| 267 func (d *swarmingDist) InfoURL(tok distributor.Token) string { | 273 func (d *swarmingDist) InfoURL(tok distributor.Token) string { |
| 268 return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok) | 274 return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok) |
| 269 } | 275 } |
| 270 | 276 |
| 271 func (d *swarmingDist) HandleNotification(notification *distributor.Notification
) (*dm.Result, error) { | 277 func (d *swarmingDist) HandleNotification(q *dm.Quest_Desc, notification *distri
butor.Notification) (*dm.Result, error) { |
| 272 type Data struct { | 278 type Data struct { |
| 273 TaskID distributor.Token `json:"task_id"` | 279 TaskID distributor.Token `json:"task_id"` |
| 274 } | 280 } |
| 275 dat := &Data{} | 281 dat := &Data{} |
| 276 if err := json.Unmarshal(notification.Data, dat); err != nil { | 282 if err := json.Unmarshal(notification.Data, dat); err != nil { |
| 277 logging.Fields{"payload": notification.Data}.Errorf( | 283 logging.Fields{"payload": notification.Data}.Errorf( |
| 278 d, "Could not unmarshal swarming payload! relying on tim
eout.") | 284 d, "Could not unmarshal swarming payload! relying on tim
eout.") |
| 279 return nil, nil | 285 return nil, nil |
| 280 } | 286 } |
| 281 » return d.GetStatus(dat.TaskID) | 287 » return d.GetStatus(q, dat.TaskID) |
| 282 } | 288 } |
| 283 | 289 |
| 284 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi
cation, error) { | 290 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi
cation, error) { |
| 285 return nil, nil | 291 return nil, nil |
| 286 } | 292 } |
| 287 | 293 |
| 288 func (*swarmingDist) Validate(payload string) error { | 294 func (*swarmingDist) Validate(payload string) error { |
| 289 msg := &sv1.Parameters{} | 295 msg := &sv1.Parameters{} |
| 290 if err := jsonpb.UnmarshalString(payload, msg); err != nil { | 296 if err := jsonpb.UnmarshalString(payload, msg); err != nil { |
| 291 return errors.Annotate(err).Reason("unmarshal").D("payload", pay
load).Err() | 297 return errors.Annotate(err).Reason("unmarshal").D("payload", pay
load).Err() |
| 292 } | 298 } |
| 293 return errors.Annotate(msg.Normalize()).Reason("normalize").D("payload",
payload).Err() | 299 return errors.Annotate(msg.Normalize()).Reason("normalize").D("payload",
payload).Err() |
| 294 } | 300 } |
| 295 | 301 |
| 296 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ | 302 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ |
| 297 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil | 303 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil |
| 298 } | 304 } |
| 299 | 305 |
| 300 // AddFactory adds this distributor implementation into the distributor | 306 // AddFactory adds this distributor implementation into the distributor |
| 301 // Registry. | 307 // Registry. |
| 302 func AddFactory(m distributor.FactoryMap) { | 308 func AddFactory(m distributor.FactoryMap) { |
| 303 m[(*sv1.Config)(nil)] = factory | 309 m[(*sv1.Config)(nil)] = factory |
| 304 } | 310 } |
| OLD | NEW |