Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(491)

Side by Side Diff: dm/appengine/distributor/swarming/v1/isolate.go

Issue 2267143002: Add additional validation to swarming v1 distributor. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@dump_all_stacks
Patch Set: rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « dm/appengine/distributor/swarming/v1/distributor.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package swarming 5 package swarming
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/json" 9 "encoding/json"
10 "net/http"
11 "strings" 10 "strings"
12 11
13 "github.com/golang/protobuf/jsonpb" 12 "github.com/golang/protobuf/jsonpb"
14 "github.com/golang/protobuf/proto" 13 "github.com/golang/protobuf/proto"
15 "github.com/luci/gae/service/info" 14 "github.com/luci/gae/service/info"
16 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" 15 "github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
17 swarm "github.com/luci/luci-go/common/api/swarming/swarming/v1" 16 swarm "github.com/luci/luci-go/common/api/swarming/swarming/v1"
18 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
19 "github.com/luci/luci-go/common/isolated" 18 "github.com/luci/luci-go/common/isolated"
20 "github.com/luci/luci-go/common/isolatedclient" 19 "github.com/luci/luci-go/common/isolatedclient"
21 "github.com/luci/luci-go/common/sync/parallel" 20 "github.com/luci/luci-go/common/sync/parallel"
22 sv1 "github.com/luci/luci-go/dm/api/distributor/swarming/v1" 21 sv1 "github.com/luci/luci-go/dm/api/distributor/swarming/v1"
23 "github.com/luci/luci-go/dm/appengine/distributor" 22 "github.com/luci/luci-go/dm/appengine/distributor"
24 "github.com/luci/luci-go/server/auth"
25 "golang.org/x/net/context" 23 "golang.org/x/net/context"
26 ) 24 )
27 25
28 const prevPath = ".dm/previous_execution.json" 26 const prevPath = ".dm/previous_execution.json"
29 const exAuthPath = ".dm/execution_auth.json" 27 const exAuthPath = ".dm/execution_auth.json"
30 const descPath = ".dm/quest_description.json" 28 const descPath = ".dm/quest_description.json"
31 29
32 func mkFile(data []byte) *isolated.File { 30 func mkFile(data []byte) *isolated.File {
33 mode := 0444 31 mode := 0444
34 size := int64(len(data)) 32 size := int64(len(data))
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 76
79 return isoData, isoFile 77 return isoData, isoFile
80 } 78 }
81 79
82 type isoChunk struct { 80 type isoChunk struct {
83 data []byte 81 data []byte
84 isIso bool 82 isIso bool
85 file *isolated.File 83 file *isolated.File
86 } 84 }
87 85
88 func pushIsolate(c context.Context, isolateHost string, chunks []isoChunk) error { 86 func pushIsolate(c context.Context, isolateURL string, chunks []isoChunk) error {
89 dgsts := make([]*isolateservice.HandlersEndpointsV1Digest, len(chunks)) 87 dgsts := make([]*isolateservice.HandlersEndpointsV1Digest, len(chunks))
90 for i, chnk := range chunks { 88 for i, chnk := range chunks {
91 dgsts[i] = &isolateservice.HandlersEndpointsV1Digest{ 89 dgsts[i] = &isolateservice.HandlersEndpointsV1Digest{
92 Digest: string(chnk.file.Digest), Size: *chnk.file.Size, 90 Digest: string(chnk.file.Digest), Size: *chnk.file.Size,
93 IsIsolated: chnk.isIso} 91 IsIsolated: chnk.isIso}
94 } 92 }
95 93
96 » anonTransport, err := auth.GetRPCTransport(c, auth.NoAuth) 94 » anonC, authC := httpClients(c)
97 » if err != nil {
98 » » panic(err)
99 » }
100 » anonClient := &http.Client{Transport: anonTransport}
101 95
102 isoClient := isolatedclient.New( 96 isoClient := isolatedclient.New(
103 » » anonClient, httpClient(c), "https://"+isolateHost, 97 » » anonC, authC, isolateURL, isolatedclient.DefaultNamespace, nil)
104 » » isolatedclient.DefaultNamespace, nil)
105 states, err := isoClient.Contains(c, dgsts) 98 states, err := isoClient.Contains(c, dgsts)
106 if err != nil { 99 if err != nil {
107 err = errors.Annotate(err). 100 err = errors.Annotate(err).
108 D("count", len(dgsts)). 101 D("count", len(dgsts)).
109 Reason("checking containment for %(count)d digests"). 102 Reason("checking containment for %(count)d digests").
110 Err() 103 Err()
111 return err 104 return err
112 } 105 }
113 return parallel.FanOutIn(func(ch chan<- func() error) { 106 return parallel.FanOutIn(func(ch chan<- func() error) {
114 for i, st := range states { 107 for i, st := range states {
115 if st != nil { 108 if st != nil {
116 i, st := i, st 109 i, st := i, st
117 ch <- func() error { 110 ch <- func() error {
118 return isoClient.Push(c, st, isolatedcli ent.NewBytesSource(chunks[i].data)) 111 return isoClient.Push(c, st, isolatedcli ent.NewBytesSource(chunks[i].data))
119 } 112 }
120 } 113 }
121 } 114 }
122 }) 115 })
123 } 116 }
124 117
125 func prepIsolate(c context.Context, isolateHost string, tsk *distributor.TaskDes cription, params *sv1.Parameters) (*swarm.SwarmingRpcsFilesRef, error) { 118 func prepIsolate(c context.Context, isolateURL string, tsk *distributor.TaskDesc ription, params *sv1.Parameters) (*swarm.SwarmingRpcsFilesRef, error) {
126 prevData := []byte("{}") 119 prevData := []byte("{}")
127 if tsk.PreviousResult() != nil { 120 if tsk.PreviousResult() != nil {
128 prevData = []byte(tsk.PreviousResult().Object) 121 prevData = []byte(tsk.PreviousResult().Object)
129 } 122 }
130 prevFile := mkFile(prevData) 123 prevFile := mkFile(prevData)
131 authData, authFile := mkMsgFile(tsk.ExecutionAuth()) 124 authData, authFile := mkMsgFile(tsk.ExecutionAuth())
132 descData, descFile := mkMsgFile(tsk.Payload()) 125 descData, descFile := mkMsgFile(tsk.Payload())
133 isoData, isoFile := mkIsolated(c, params, prevFile, descFile, authFile) 126 isoData, isoFile := mkIsolated(c, params, prevFile, descFile, authFile)
134 127
135 » err := pushIsolate(c, isolateHost, []isoChunk{ 128 » err := pushIsolate(c, isolateURL, []isoChunk{
136 {data: prevData, file: prevFile}, 129 {data: prevData, file: prevFile},
137 {data: authData, file: authFile}, 130 {data: authData, file: authFile},
138 {data: descData, file: descFile}, 131 {data: descData, file: descFile},
139 {data: isoData, file: isoFile, isIso: true}, 132 {data: isoData, file: isoFile, isIso: true},
140 }) 133 })
141 if err != nil { 134 if err != nil {
142 err = errors.Annotate(err).Reason("pushing new Isolated").Err() 135 err = errors.Annotate(err).Reason("pushing new Isolated").Err()
143 return nil, err 136 return nil, err
144 } 137 }
145 138
146 return &swarm.SwarmingRpcsFilesRef{ 139 return &swarm.SwarmingRpcsFilesRef{
147 Isolated: string(isoFile.Digest), 140 Isolated: string(isoFile.Digest),
148 » » Isolatedserver: "https://" + isolateHost, 141 » » Isolatedserver: isolateURL,
149 Namespace: isolatedclient.DefaultNamespace, 142 Namespace: isolatedclient.DefaultNamespace,
150 }, nil 143 }, nil
151 } 144 }
OLDNEW
« no previous file with comments | « dm/appengine/distributor/swarming/v1/distributor.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698