| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package swarming | 5 package swarming |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/json" | 9 "encoding/json" |
| 10 "net/http" | |
| 11 "strings" | 10 "strings" |
| 12 | 11 |
| 13 "github.com/golang/protobuf/jsonpb" | 12 "github.com/golang/protobuf/jsonpb" |
| 14 "github.com/golang/protobuf/proto" | 13 "github.com/golang/protobuf/proto" |
| 15 "github.com/luci/gae/service/info" | 14 "github.com/luci/gae/service/info" |
| 16 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" | 15 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" |
| 17 swarm "github.com/luci/luci-go/common/api/swarming/swarming/v1" | 16 swarm "github.com/luci/luci-go/common/api/swarming/swarming/v1" |
| 18 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/isolated" | 18 "github.com/luci/luci-go/common/isolated" |
| 20 "github.com/luci/luci-go/common/isolatedclient" | 19 "github.com/luci/luci-go/common/isolatedclient" |
| 21 "github.com/luci/luci-go/common/sync/parallel" | 20 "github.com/luci/luci-go/common/sync/parallel" |
| 22 sv1 "github.com/luci/luci-go/dm/api/distributor/swarming/v1" | 21 sv1 "github.com/luci/luci-go/dm/api/distributor/swarming/v1" |
| 23 "github.com/luci/luci-go/dm/appengine/distributor" | 22 "github.com/luci/luci-go/dm/appengine/distributor" |
| 24 "github.com/luci/luci-go/server/auth" | |
| 25 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 26 ) | 24 ) |
| 27 | 25 |
| 28 const prevPath = ".dm/previous_execution.json" | 26 const prevPath = ".dm/previous_execution.json" |
| 29 const exAuthPath = ".dm/execution_auth.json" | 27 const exAuthPath = ".dm/execution_auth.json" |
| 30 const descPath = ".dm/quest_description.json" | 28 const descPath = ".dm/quest_description.json" |
| 31 | 29 |
| 32 func mkFile(data []byte) *isolated.File { | 30 func mkFile(data []byte) *isolated.File { |
| 33 mode := 0444 | 31 mode := 0444 |
| 34 size := int64(len(data)) | 32 size := int64(len(data)) |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 78 | 76 |
| 79 return isoData, isoFile | 77 return isoData, isoFile |
| 80 } | 78 } |
| 81 | 79 |
| 82 type isoChunk struct { | 80 type isoChunk struct { |
| 83 data []byte | 81 data []byte |
| 84 isIso bool | 82 isIso bool |
| 85 file *isolated.File | 83 file *isolated.File |
| 86 } | 84 } |
| 87 | 85 |
| 88 func pushIsolate(c context.Context, isolateHost string, chunks []isoChunk) error
{ | 86 func pushIsolate(c context.Context, isolateURL string, chunks []isoChunk) error
{ |
| 89 dgsts := make([]*isolateservice.HandlersEndpointsV1Digest, len(chunks)) | 87 dgsts := make([]*isolateservice.HandlersEndpointsV1Digest, len(chunks)) |
| 90 for i, chnk := range chunks { | 88 for i, chnk := range chunks { |
| 91 dgsts[i] = &isolateservice.HandlersEndpointsV1Digest{ | 89 dgsts[i] = &isolateservice.HandlersEndpointsV1Digest{ |
| 92 Digest: string(chnk.file.Digest), Size: *chnk.file.Size, | 90 Digest: string(chnk.file.Digest), Size: *chnk.file.Size, |
| 93 IsIsolated: chnk.isIso} | 91 IsIsolated: chnk.isIso} |
| 94 } | 92 } |
| 95 | 93 |
| 96 » anonTransport, err := auth.GetRPCTransport(c, auth.NoAuth) | 94 » anonC, authC := httpClients(c) |
| 97 » if err != nil { | |
| 98 » » panic(err) | |
| 99 » } | |
| 100 » anonClient := &http.Client{Transport: anonTransport} | |
| 101 | 95 |
| 102 isoClient := isolatedclient.New( | 96 isoClient := isolatedclient.New( |
| 103 » » anonClient, httpClient(c), "https://"+isolateHost, | 97 » » anonC, authC, isolateURL, isolatedclient.DefaultNamespace, nil) |
| 104 » » isolatedclient.DefaultNamespace, nil) | |
| 105 states, err := isoClient.Contains(c, dgsts) | 98 states, err := isoClient.Contains(c, dgsts) |
| 106 if err != nil { | 99 if err != nil { |
| 107 err = errors.Annotate(err). | 100 err = errors.Annotate(err). |
| 108 D("count", len(dgsts)). | 101 D("count", len(dgsts)). |
| 109 Reason("checking containment for %(count)d digests"). | 102 Reason("checking containment for %(count)d digests"). |
| 110 Err() | 103 Err() |
| 111 return err | 104 return err |
| 112 } | 105 } |
| 113 return parallel.FanOutIn(func(ch chan<- func() error) { | 106 return parallel.FanOutIn(func(ch chan<- func() error) { |
| 114 for i, st := range states { | 107 for i, st := range states { |
| 115 if st != nil { | 108 if st != nil { |
| 116 i, st := i, st | 109 i, st := i, st |
| 117 ch <- func() error { | 110 ch <- func() error { |
| 118 return isoClient.Push(c, st, isolatedcli
ent.NewBytesSource(chunks[i].data)) | 111 return isoClient.Push(c, st, isolatedcli
ent.NewBytesSource(chunks[i].data)) |
| 119 } | 112 } |
| 120 } | 113 } |
| 121 } | 114 } |
| 122 }) | 115 }) |
| 123 } | 116 } |
| 124 | 117 |
| 125 func prepIsolate(c context.Context, isolateHost string, tsk *distributor.TaskDes
cription, params *sv1.Parameters) (*swarm.SwarmingRpcsFilesRef, error) { | 118 func prepIsolate(c context.Context, isolateURL string, tsk *distributor.TaskDesc
ription, params *sv1.Parameters) (*swarm.SwarmingRpcsFilesRef, error) { |
| 126 prevData := []byte("{}") | 119 prevData := []byte("{}") |
| 127 if tsk.PreviousResult() != nil { | 120 if tsk.PreviousResult() != nil { |
| 128 prevData = []byte(tsk.PreviousResult().Object) | 121 prevData = []byte(tsk.PreviousResult().Object) |
| 129 } | 122 } |
| 130 prevFile := mkFile(prevData) | 123 prevFile := mkFile(prevData) |
| 131 authData, authFile := mkMsgFile(tsk.ExecutionAuth()) | 124 authData, authFile := mkMsgFile(tsk.ExecutionAuth()) |
| 132 descData, descFile := mkMsgFile(tsk.Payload()) | 125 descData, descFile := mkMsgFile(tsk.Payload()) |
| 133 isoData, isoFile := mkIsolated(c, params, prevFile, descFile, authFile) | 126 isoData, isoFile := mkIsolated(c, params, prevFile, descFile, authFile) |
| 134 | 127 |
| 135 » err := pushIsolate(c, isolateHost, []isoChunk{ | 128 » err := pushIsolate(c, isolateURL, []isoChunk{ |
| 136 {data: prevData, file: prevFile}, | 129 {data: prevData, file: prevFile}, |
| 137 {data: authData, file: authFile}, | 130 {data: authData, file: authFile}, |
| 138 {data: descData, file: descFile}, | 131 {data: descData, file: descFile}, |
| 139 {data: isoData, file: isoFile, isIso: true}, | 132 {data: isoData, file: isoFile, isIso: true}, |
| 140 }) | 133 }) |
| 141 if err != nil { | 134 if err != nil { |
| 142 err = errors.Annotate(err).Reason("pushing new Isolated").Err() | 135 err = errors.Annotate(err).Reason("pushing new Isolated").Err() |
| 143 return nil, err | 136 return nil, err |
| 144 } | 137 } |
| 145 | 138 |
| 146 return &swarm.SwarmingRpcsFilesRef{ | 139 return &swarm.SwarmingRpcsFilesRef{ |
| 147 Isolated: string(isoFile.Digest), | 140 Isolated: string(isoFile.Digest), |
| 148 » » Isolatedserver: "https://" + isolateHost, | 141 » » Isolatedserver: isolateURL, |
| 149 Namespace: isolatedclient.DefaultNamespace, | 142 Namespace: isolatedclient.DefaultNamespace, |
| 150 }, nil | 143 }, nil |
| 151 } | 144 } |
| OLD | NEW |