Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1724)

Side by Side Diff: dm/appengine/distributor/swarming/v1/distributor.go

Issue 2347973003: Refactor distributor API so that methods always get the Quest_Desc too. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "encoding/json" 8 "encoding/json"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 _, authC := httpClients(c) 73 _, authC := httpClients(c)
74 svc, err := swarm.New(authC) 74 svc, err := swarm.New(authC)
75 if err != nil { 75 if err != nil {
76 // can only happen with nil client 76 // can only happen with nil client
77 panic(err) 77 panic(err)
78 } 78 }
79 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" 79 svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/"
80 return svc 80 return svc
81 } 81 }
82 82
83 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To ken, _ time.Duration, err error) { 83 func parseParams(desc *dm.Quest_Desc) (ret *sv1.Parameters, err error) {
84 » auth := tsk.ExecutionAuth() 84 » ret = &sv1.Parameters{}
85 » id := auth.Id 85 » if err = jsonpb.UnmarshalString(desc.DistributorParameters, ret); err != nil {
86 » desc := tsk.Payload()
87
88 » params := &sv1.Parameters{}
89 » if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err != nil {
90 err = errors.Annotate(err). 86 err = errors.Annotate(err).
91 Reason("unmarshalling DistributorParameters"). 87 Reason("unmarshalling DistributorParameters").
92 InternalReason("These paramaeters were already validated ?"). 88 InternalReason("These paramaeters were already validated ?").
93 Err() 89 Err()
94 return 90 return
95 } 91 }
96 » if err = params.Normalize(); err != nil { 92 » if err = ret.Normalize(); err != nil {
97 err = errors.Annotate(err). 93 err = errors.Annotate(err).
98 Reason("normalizing DistributorParameters"). 94 Reason("normalizing DistributorParameters").
99 InternalReason("These paramaeters were already normalize d successfully once?"). 95 InternalReason("These paramaeters were already normalize d successfully once?").
100 Err() 96 Err()
101 return 97 return
102 } 98 }
99 return
100 }
101
102 func (d *swarmingDist) Run(desc *dm.Quest_Desc, auth *dm.Execution_Auth, prev *d m.JsonResult) (tok distributor.Token, _ time.Duration, err error) {
103 id := auth.Id
104
105 params, err := parseParams(desc)
106 if err != nil {
107 return
108 }
103 109
104 isoCtx, _ := context.WithTimeout(d, 30*time.Second) 110 isoCtx, _ := context.WithTimeout(d, 30*time.Second)
105 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, tsk, params) 111 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, desc, auth, prev, pa rams)
106 if err != nil { 112 if err != nil {
107 err = errors.Annotate(err).Reason("prepping Isolated").Err() 113 err = errors.Annotate(err).Reason("prepping Isolated").Err()
108 return 114 return
109 } 115 }
110 116
111 » topic, token, err := tsk.PrepareTopic() 117 » topic, token, err := d.cfg.PrepareTopic(d, auth.Id)
112 if err != nil { 118 if err != nil {
113 err = errors.Annotate(err).Reason("preparing topic").Err() 119 err = errors.Annotate(err).Reason("preparing topic").Err()
114 return 120 return
115 } 121 }
116 122
117 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil) 123 cipdInput := (*swarm.SwarmingRpcsCipdInput)(nil)
118 if len(params.Job.Inputs.Packages) > 0 { 124 if len(params.Job.Inputs.Packages) > 0 {
119 cipdInput := &swarm.SwarmingRpcsCipdInput{ 125 cipdInput := &swarm.SwarmingRpcsCipdInput{
120 Server: params.Job.Inputs.CipdServer, 126 Server: params.Job.Inputs.CipdServer,
121 } 127 }
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 }, retry.LogCallback(d, "swarm.Tasks.New")) 182 }, retry.LogCallback(d, "swarm.Tasks.New"))
177 if err != nil { 183 if err != nil {
178 err = errors.Annotate(err).Reason("calling swarm.Tasks.New").Err () 184 err = errors.Annotate(err).Reason("calling swarm.Tasks.New").Err ()
179 return 185 return
180 } 186 }
181 187
182 tok = distributor.Token(rslt.TaskId) 188 tok = distributor.Token(rslt.TaskId)
183 return 189 return
184 } 190 }
185 191
186 func (d *swarmingDist) Cancel(tok distributor.Token) error { 192 func (d *swarmingDist) Cancel(q *dm.Quest_Desc, tok distributor.Token) error {
187 return retry.Retry(d, retry.Default, func() (err error) { 193 return retry.Retry(d, retry.Default, func() (err error) {
188 ctx, _ := context.WithTimeout(d, 10*time.Second) 194 ctx, _ := context.WithTimeout(d, 10*time.Second)
189 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co ntext(ctx).Do() 195 _, err = newSwarmClient(ctx, d.sCfg).Task.Cancel(string(tok)).Co ntext(ctx).Do()
190 return 196 return
191 }, retry.LogCallback(d, "swarm.Task.Cancel")) 197 }, retry.LogCallback(d, "swarm.Task.Cancel"))
192 } 198 }
193 199
194 func (d *swarmingDist) GetStatus(tok distributor.Token) (*dm.Result, error) { 200 func (d *swarmingDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.R esult, error) {
195 rslt := (*swarm.SwarmingRpcsTaskResult)(nil) 201 rslt := (*swarm.SwarmingRpcsTaskResult)(nil)
196 202
197 err := retry.Retry(d, retry.Default, func() (err error) { 203 err := retry.Retry(d, retry.Default, func() (err error) {
198 ctx, _ := context.WithTimeout(d, 10*time.Second) 204 ctx, _ := context.WithTimeout(d, 10*time.Second)
199 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok)) .Context(ctx).Do() 205 rslt, err = newSwarmClient(ctx, d.sCfg).Task.Result(string(tok)) .Context(ctx).Do()
200 return 206 return
201 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok))) 207 }, retry.LogCallback(d, fmt.Sprintf("swarm.Task.Result(%s)", tok)))
202 if err != nil { 208 if err != nil {
203 if gerr := err.(*googleapi.Error); gerr != nil { 209 if gerr := err.(*googleapi.Error); gerr != nil {
204 if gerr.Code == http.StatusNotFound { 210 if gerr.Code == http.StatusNotFound {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
261 } 267 }
262 } 268 }
263 269
264 return ret, nil 270 return ret, nil
265 } 271 }
266 272
267 func (d *swarmingDist) InfoURL(tok distributor.Token) string { 273 func (d *swarmingDist) InfoURL(tok distributor.Token) string {
268 return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok) 274 return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok)
269 } 275 }
270 276
271 func (d *swarmingDist) HandleNotification(notification *distributor.Notification ) (*dm.Result, error) { 277 func (d *swarmingDist) HandleNotification(q *dm.Quest_Desc, notification *distri butor.Notification) (*dm.Result, error) {
272 type Data struct { 278 type Data struct {
273 TaskID distributor.Token `json:"task_id"` 279 TaskID distributor.Token `json:"task_id"`
274 } 280 }
275 dat := &Data{} 281 dat := &Data{}
276 if err := json.Unmarshal(notification.Data, dat); err != nil { 282 if err := json.Unmarshal(notification.Data, dat); err != nil {
277 logging.Fields{"payload": notification.Data}.Errorf( 283 logging.Fields{"payload": notification.Data}.Errorf(
278 d, "Could not unmarshal swarming payload! relying on tim eout.") 284 d, "Could not unmarshal swarming payload! relying on tim eout.")
279 return nil, nil 285 return nil, nil
280 } 286 }
281 » return d.GetStatus(dat.TaskID) 287 » return d.GetStatus(q, dat.TaskID)
282 } 288 }
283 289
284 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi cation, error) { 290 func (*swarmingDist) HandleTaskQueueTask(r *http.Request) ([]*distributor.Notifi cation, error) {
285 return nil, nil 291 return nil, nil
286 } 292 }
287 293
288 func (*swarmingDist) Validate(payload string) error { 294 func (*swarmingDist) Validate(payload string) error {
289 msg := &sv1.Parameters{} 295 msg := &sv1.Parameters{}
290 if err := jsonpb.UnmarshalString(payload, msg); err != nil { 296 if err := jsonpb.UnmarshalString(payload, msg); err != nil {
291 return errors.Annotate(err).Reason("unmarshal").D("payload", pay load).Err() 297 return errors.Annotate(err).Reason("unmarshal").D("payload", pay load).Err()
292 } 298 }
293 return errors.Annotate(msg.Normalize()).Reason("normalize").D("payload", payload).Err() 299 return errors.Annotate(msg.Normalize()).Reason("normalize").D("payload", payload).Err()
294 } 300 }
295 301
296 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) { 302 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) {
297 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil 303 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil
298 } 304 }
299 305
300 // AddFactory adds this distributor implementation into the distributor 306 // AddFactory adds this distributor implementation into the distributor
301 // Registry. 307 // Registry.
302 func AddFactory(m distributor.FactoryMap) { 308 func AddFactory(m distributor.FactoryMap) {
303 m[(*sv1.Config)(nil)] = factory 309 m[(*sv1.Config)(nil)] = factory
304 } 310 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698