Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Side by Side Diff: dm/appengine/distributor/swarming/v1/distributor.go

Issue 2267143002: Add additional validation to swarming v1 distributor. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@dump_all_stacks
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "encoding/json" 8 "encoding/json"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
47 47
48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { 48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair {
49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) 49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m))
50 for key, value := range m { 50 for key, value := range m {
51 ret = append(ret, &swarm.SwarmingRpcsStringPair{ 51 ret = append(ret, &swarm.SwarmingRpcsStringPair{
52 Key: key, Value: value}) 52 Key: key, Value: value})
53 } 53 }
54 return ret 54 return ret
55 } 55 }
56 56
57 func httpClient(c context.Context) *http.Client { 57 func httpClients(c context.Context) (anonC, authC *http.Client) {
58 rt, err := auth.GetRPCTransport(c, auth.AsSelf) 58 rt, err := auth.GetRPCTransport(c, auth.AsSelf)
59 if err != nil { 59 if err != nil {
60 // if we can't set up a transport, we're seriously hosed 60 // if we can't set up a transport, we're seriously hosed
61 panic(err) 61 panic(err)
62 } 62 }
63 » return &http.Client{Transport: rt} 63 » anonTransport, err := auth.GetRPCTransport(c, auth.NoAuth)
64 » if err != nil {
65 » » panic(err)
66 » }
67 » anonC = &http.Client{Transport: anonTransport}
68 » authC = &http.Client{Transport: rt}
69 » return
64 } 70 }
65 71
66 func newSwarmClient(c context.Context, cfg *sv1.Config) *swarm.Service { 72 func newSwarmClient(c context.Context, cfg *sv1.Config) *swarm.Service {
67 » svc, err := swarm.New(httpClient(c)) 73 » _, authC := httpClients(c)
74 » svc, err := swarm.New(authC)
68 if err != nil { 75 if err != nil {
69 // can only happen with nil client 76 // can only happen with nil client
70 panic(err) 77 panic(err)
71 } 78 }
72 » svc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", cfg.Swarmi ng.Host) 79 » svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/"
73 return svc 80 return svc
74 } 81 }
75 82
76 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To ken, _ time.Duration, err error) { 83 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To ken, _ time.Duration, err error) {
77 auth := tsk.ExecutionAuth() 84 auth := tsk.ExecutionAuth()
78 id := auth.Id 85 id := auth.Id
79 desc := tsk.Payload() 86 desc := tsk.Payload()
80 87
81 params := &sv1.Parameters{} 88 params := &sv1.Parameters{}
82 if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err != nil { 89 if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err != nil {
83 err = errors.Annotate(err). 90 err = errors.Annotate(err).
84 Reason("unmarshalling DistributorParameters"). 91 Reason("unmarshalling DistributorParameters").
85 InternalReason("These paramaeters were already validated ?"). 92 InternalReason("These paramaeters were already validated ?").
86 Err() 93 Err()
87 return 94 return
88 } 95 }
89 if err = params.Normalize(); err != nil { 96 if err = params.Normalize(); err != nil {
90 err = errors.Annotate(err). 97 err = errors.Annotate(err).
91 Reason("normalizing DistributorParameters"). 98 Reason("normalizing DistributorParameters").
92 InternalReason("These paramaeters were already normalize d successfully once?"). 99 InternalReason("These paramaeters were already normalize d successfully once?").
93 Err() 100 Err()
94 return 101 return
95 } 102 }
96 103
97 isoCtx, _ := context.WithTimeout(d, 30*time.Second) 104 isoCtx, _ := context.WithTimeout(d, 30*time.Second)
98 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Host, tsk, params) 105 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, tsk, params)
99 if err != nil { 106 if err != nil {
100 err = errors.Annotate(err).Reason("prepping Isolated").Err() 107 err = errors.Annotate(err).Reason("prepping Isolated").Err()
101 return 108 return
102 } 109 }
103 110
104 topic, token, err := tsk.PrepareTopic() 111 topic, token, err := tsk.PrepareTopic()
105 if err != nil { 112 if err != nil {
106 err = errors.Annotate(err).Reason("preparing topic").Err() 113 err = errors.Annotate(err).Reason("preparing topic").Err()
107 return 114 return
108 } 115 }
(...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 Status: dm.AbnormalFinish_RESULT_MALFORMED, 258 Status: dm.AbnormalFinish_RESULT_MALFORMED,
252 Reason: fmt.Sprintf("swarming: unknown state %s" , rslt.State), 259 Reason: fmt.Sprintf("swarming: unknown state %s" , rslt.State),
253 } 260 }
254 } 261 }
255 } 262 }
256 263
257 return ret, nil 264 return ret, nil
258 } 265 }
259 266
260 func (d *swarmingDist) InfoURL(tok distributor.Token) string { 267 func (d *swarmingDist) InfoURL(tok distributor.Token) string {
261 » return fmt.Sprintf("https://%s/user/task/%s", d.sCfg.Swarming.Host, tok) 268 » return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok)
262 } 269 }
263 270
264 func (d *swarmingDist) HandleNotification(notification *distributor.Notification ) (*dm.Result, error) { 271 func (d *swarmingDist) HandleNotification(notification *distributor.Notification ) (*dm.Result, error) {
265 type Data struct { 272 type Data struct {
266 TaskID distributor.Token `json:"task_id"` 273 TaskID distributor.Token `json:"task_id"`
267 } 274 }
268 dat := &Data{} 275 dat := &Data{}
269 if err := json.Unmarshal(notification.Data, dat); err != nil { 276 if err := json.Unmarshal(notification.Data, dat); err != nil {
270 logging.Fields{"payload": notification.Data}.Errorf( 277 logging.Fields{"payload": notification.Data}.Errorf(
271 d, "Could not unmarshal swarming payload! relying on tim eout.") 278 d, "Could not unmarshal swarming payload! relying on tim eout.")
(...skipping 16 matching lines...) Expand all
288 295
289 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) { 296 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error) {
290 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil 297 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil
291 } 298 }
292 299
293 // AddFactory adds this distributor implementation into the distributor 300 // AddFactory adds this distributor implementation into the distributor
294 // Registry. 301 // Registry.
295 func AddFactory(m distributor.FactoryMap) { 302 func AddFactory(m distributor.FactoryMap) {
296 m[(*sv1.Config)(nil)] = factory 303 m[(*sv1.Config)(nil)] = factory
297 } 304 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698