| Index: dm/appengine/distributor/swarming/v1/distributor.go
|
| diff --git a/dm/appengine/distributor/swarming/v1/distributor.go b/dm/appengine/distributor/swarming/v1/distributor.go
|
| index d416137a1aff3727e1d7ba6bd122d68a9ae1c967..e3aa02767dae5356c655417a82fc6fc5dc03b8e2 100644
|
| --- a/dm/appengine/distributor/swarming/v1/distributor.go
|
| +++ b/dm/appengine/distributor/swarming/v1/distributor.go
|
| @@ -80,35 +80,41 @@ func newSwarmClient(c context.Context, cfg *sv1.Config) *swarm.Service {
|
| return svc
|
| }
|
|
|
| -func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.Token, _ time.Duration, err error) {
|
| - auth := tsk.ExecutionAuth()
|
| - id := auth.Id
|
| - desc := tsk.Payload()
|
| -
|
| - params := &sv1.Parameters{}
|
| - if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err != nil {
|
| +func parseParams(desc *dm.Quest_Desc) (ret *sv1.Parameters, err error) {
|
| + ret = &sv1.Parameters{}
|
| + if err = jsonpb.UnmarshalString(desc.DistributorParameters, ret); err != nil {
|
| err = errors.Annotate(err).
|
| Reason("unmarshalling DistributorParameters").
|
| InternalReason("These paramaeters were already validated?").
|
| Err()
|
| return
|
| }
|
| - if err = params.Normalize(); err != nil {
|
| + if err = ret.Normalize(); err != nil {
|
| err = errors.Annotate(err).
|
| Reason("normalizing DistributorParameters").
|
| InternalReason("These paramaeters were already normalized successfully once?").
|
| Err()
|
| return
|
| }
|
| + return
|
| +}
|
| +
|
| +func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *dm.JsonResult) (tok distributor.Token, _ time.Duration, err error) {
|
| + id := auth.Id
|
| +
|
| + params, err := parseParams(desc)
|
| + if err != nil {
|
| + return
|
| + }
|
|
|
| isoCtx, _ := context.WithTimeout(d, 30*time.Second)
|
| - iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, tsk, params)
|
| + iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, params)
|
| if err != nil {
|
| err = errors.Annotate(err).Reason("prepping Isolated").Err()
|
| return
|
| }
|
|
|
| - topic, token, err := tsk.PrepareTopic()
|
| + topic, token, err := d.cfg.PrepareTopic(d, auth.Id)
|
| if err != nil {
|
| err = errors.Annotate(err).Reason("preparing topic").Err()
|
| return
|
| @@ -183,7 +189,7 @@ func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To
|
| return
|
| }
|
|
|
| -func (d *swarmingDist) Cancel(tok distributor.Token) error {
|
| +func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error {
|
| return retry.Retry(d, retry.Default, func() (err error) {
|
| ctx, _ := context.WithTimeout(d, 10*time.Second)
|
| _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Context(ctx).Do()
|
| @@ -191,7 +197,7 @@ func (d *swarmingDist) Cancel(tok distributor.Token) error {
|
| }, retry.LogCallback(d, "swarm.Task.Cancel"))
|
| }
|
|
|
| -func (d *swarmingDist) GetStatus(tok distributor.Token) (*dm.Result, error) {
|
| +func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Result, error) {
|
| rslt := (*swarm.SwarmingRpcsTaskResult)(nil)
|
|
|
| err := retry.Retry(d, retry.Default, func() (err error) {
|
| @@ -268,7 +274,7 @@ func (d *swarmingDist) InfoURL(tok distributor.Token) string {
|
| return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok)
|
| }
|
|
|
| -func (d *swarmingDist) HandleNotification(notification *distributor.Notification) (*dm.Result, error) {
|
| +func (d *swarmingDist) HandleNotification(q *dm.Quest_Desc, notification *distributor.Notification) (*dm.Result, error) {
|
| type Data struct {
|
| TaskID distributor.Token `json:"task_id"`
|
| }
|
| @@ -278,7 +284,7 @@ func (d *swarmingDist) HandleNotification(notification *distributor.Notification
|
| d, "Could not unmarshal swarming payload! relying on timeout.")
|
| return nil, nil
|
| }
|
| - return d.GetStatus(dat.TaskID)
|
| + return d.GetStatus(q, dat.TaskID)
|
| }
|
|
|
| func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notification, error) {
|
|
|