| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 47 | 47 |
| 48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { | 48 func toSwarmMap(m map[string]string) []*swarm.SwarmingRpcsStringPair { |
| 49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) | 49 ret := make([]*swarm.SwarmingRpcsStringPair, 0, len(m)) |
| 50 for key, value := range m { | 50 for key, value := range m { |
| 51 ret = append(ret, &swarm.SwarmingRpcsStringPair{ | 51 ret = append(ret, &swarm.SwarmingRpcsStringPair{ |
| 52 Key: key, Value: value}) | 52 Key: key, Value: value}) |
| 53 } | 53 } |
| 54 return ret | 54 return ret |
| 55 } | 55 } |
| 56 | 56 |
| 57 func httpClient(c context.Context) *http.Client { | 57 func httpClients(c context.Context) (anonC, authC *http.Client) { |
| 58 rt, err := auth.GetRPCTransport(c, auth.AsSelf) | 58 rt, err := auth.GetRPCTransport(c, auth.AsSelf) |
| 59 if err != nil { | 59 if err != nil { |
| 60 // if we can't set up a transport, we're seriously hosed | 60 // if we can't set up a transport, we're seriously hosed |
| 61 panic(err) | 61 panic(err) |
| 62 } | 62 } |
| 63 » return &http.Client{Transport: rt} | 63 » anonTransport, err := auth.GetRPCTransport(c, auth.NoAuth) |
| 64 » if err != nil { |
| 65 » » panic(err) |
| 66 » } |
| 67 » anonC = &http.Client{Transport: anonTransport} |
| 68 » authC = &http.Client{Transport: rt} |
| 69 » return |
| 64 } | 70 } |
| 65 | 71 |
| 66 func newSwarmClient(c context.Context, cfg *sv1.Config) *swarm.Service { | 72 func newSwarmClient(c context.Context, cfg *sv1.Config) *swarm.Service { |
| 67 » svc, err := swarm.New(httpClient(c)) | 73 » _, authC := httpClients(c) |
| 74 » svc, err := swarm.New(authC) |
| 68 if err != nil { | 75 if err != nil { |
| 69 // can only happen with nil client | 76 // can only happen with nil client |
| 70 panic(err) | 77 panic(err) |
| 71 } | 78 } |
| 72 » svc.BasePath = fmt.Sprintf("https://%s/_ah/api/swarming/v1/", cfg.Swarmi
ng.Host) | 79 » svc.BasePath = cfg.Swarming.Url + "/_ah/api/swarming/v1/" |
| 73 return svc | 80 return svc |
| 74 } | 81 } |
| 75 | 82 |
| 76 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To
ken, _ time.Duration, err error) { | 83 func (d *swarmingDist) Run(tsk *distributor.TaskDescription) (tok distributor.To
ken, _ time.Duration, err error) { |
| 77 auth := tsk.ExecutionAuth() | 84 auth := tsk.ExecutionAuth() |
| 78 id := auth.Id | 85 id := auth.Id |
| 79 desc := tsk.Payload() | 86 desc := tsk.Payload() |
| 80 | 87 |
| 81 params := &sv1.Parameters{} | 88 params := &sv1.Parameters{} |
| 82 if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err
!= nil { | 89 if err = jsonpb.UnmarshalString(desc.DistributorParameters, params); err
!= nil { |
| 83 err = errors.Annotate(err). | 90 err = errors.Annotate(err). |
| 84 Reason("unmarshalling DistributorParameters"). | 91 Reason("unmarshalling DistributorParameters"). |
| 85 InternalReason("These paramaeters were already validated
?"). | 92 InternalReason("These paramaeters were already validated
?"). |
| 86 Err() | 93 Err() |
| 87 return | 94 return |
| 88 } | 95 } |
| 89 if err = params.Normalize(); err != nil { | 96 if err = params.Normalize(); err != nil { |
| 90 err = errors.Annotate(err). | 97 err = errors.Annotate(err). |
| 91 Reason("normalizing DistributorParameters"). | 98 Reason("normalizing DistributorParameters"). |
| 92 InternalReason("These paramaeters were already normalize
d successfully once?"). | 99 InternalReason("These paramaeters were already normalize
d successfully once?"). |
| 93 Err() | 100 Err() |
| 94 return | 101 return |
| 95 } | 102 } |
| 96 | 103 |
| 97 isoCtx, _ := context.WithTimeout(d, 30*time.Second) | 104 isoCtx, _ := context.WithTimeout(d, 30*time.Second) |
| 98 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Host, tsk, params) | 105 » iso, err := prepIsolate(isoCtx, d.sCfg.Isolate.Url, tsk, params) |
| 99 if err != nil { | 106 if err != nil { |
| 100 err = errors.Annotate(err).Reason("prepping Isolated").Err() | 107 err = errors.Annotate(err).Reason("prepping Isolated").Err() |
| 101 return | 108 return |
| 102 } | 109 } |
| 103 | 110 |
| 104 topic, token, err := tsk.PrepareTopic() | 111 topic, token, err := tsk.PrepareTopic() |
| 105 if err != nil { | 112 if err != nil { |
| 106 err = errors.Annotate(err).Reason("preparing topic").Err() | 113 err = errors.Annotate(err).Reason("preparing topic").Err() |
| 107 return | 114 return |
| 108 } | 115 } |
| (...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 251 Status: dm.AbnormalFinish_RESULT_MALFORMED, | 258 Status: dm.AbnormalFinish_RESULT_MALFORMED, |
| 252 Reason: fmt.Sprintf("swarming: unknown state %s"
, rslt.State), | 259 Reason: fmt.Sprintf("swarming: unknown state %s"
, rslt.State), |
| 253 } | 260 } |
| 254 } | 261 } |
| 255 } | 262 } |
| 256 | 263 |
| 257 return ret, nil | 264 return ret, nil |
| 258 } | 265 } |
| 259 | 266 |
| 260 func (d *swarmingDist) InfoURL(tok distributor.Token) string { | 267 func (d *swarmingDist) InfoURL(tok distributor.Token) string { |
| 261 » return fmt.Sprintf("https://%s/user/task/%s", d.sCfg.Swarming.Host, tok) | 268 » return fmt.Sprintf("%s/user/task/%s", d.sCfg.Swarming.Url, tok) |
| 262 } | 269 } |
| 263 | 270 |
| 264 func (d *swarmingDist) HandleNotification(notification *distributor.Notification
) (*dm.Result, error) { | 271 func (d *swarmingDist) HandleNotification(notification *distributor.Notification
) (*dm.Result, error) { |
| 265 type Data struct { | 272 type Data struct { |
| 266 TaskID distributor.Token `json:"task_id"` | 273 TaskID distributor.Token `json:"task_id"` |
| 267 } | 274 } |
| 268 dat := &Data{} | 275 dat := &Data{} |
| 269 if err := json.Unmarshal(notification.Data, dat); err != nil { | 276 if err := json.Unmarshal(notification.Data, dat); err != nil { |
| 270 logging.Fields{"payload": notification.Data}.Errorf( | 277 logging.Fields{"payload": notification.Data}.Errorf( |
| 271 d, "Could not unmarshal swarming payload! relying on tim
eout.") | 278 d, "Could not unmarshal swarming payload! relying on tim
eout.") |
| (...skipping 16 matching lines...) Expand all Loading... |
| 288 | 295 |
| 289 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ | 296 func factory(c context.Context, cfg *distributor.Config) (distributor.D, error)
{ |
| 290 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil | 297 return &swarmingDist{c, cfg, cfg.Content.(*sv1.Config)}, nil |
| 291 } | 298 } |
| 292 | 299 |
| 293 // AddFactory adds this distributor implementation into the distributor | 300 // AddFactory adds this distributor implementation into the distributor |
| 294 // Registry. | 301 // Registry. |
| 295 func AddFactory(m distributor.FactoryMap) { | 302 func AddFactory(m distributor.FactoryMap) { |
| 296 m[(*sv1.Config)(nil)] = factory | 303 m[(*sv1.Config)(nil)] = factory |
| 297 } | 304 } |
| OLD | NEW |