Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(496)

Side by Side Diff: appengine/cmd/dm/deps/walk_graph.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/deps/tmp_get_execution.go ('k') | appengine/cmd/dm/deps/walk_graph_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package deps 5 package deps
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "golang.org/x/net/context"
12 "google.golang.org/grpc/codes"
13
11 "github.com/luci/gae/service/datastore" 14 "github.com/luci/gae/service/datastore"
15 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
12 "github.com/luci/luci-go/appengine/cmd/dm/model" 16 "github.com/luci/luci-go/appengine/cmd/dm/model"
13 dm "github.com/luci/luci-go/common/api/dm/service/v1" 17 dm "github.com/luci/luci-go/common/api/dm/service/v1"
14 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/grpcutil" 19 "github.com/luci/luci-go/common/grpcutil"
16 "github.com/luci/luci-go/common/logging" 20 "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/parallel" 21 "github.com/luci/luci-go/common/parallel"
18 "golang.org/x/net/context"
19 "google.golang.org/grpc/codes"
20 ) 22 )
21 23
22 const numWorkers = 16 24 const numWorkers = 16
23 25
24 const maxTimeout = 55 * time.Second // GAE limit is 60s 26 const maxTimeout = 55 * time.Second // GAE limit is 60s
25 27
26 type node struct { 28 type node struct {
27 aid *dm.Attempt_ID 29 aid *dm.Attempt_ID
28 depth int64 30 depth int64
29 canSeeAttemptResult bool 31 canSeeAttemptResult bool
30 } 32 }
31 33
32 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error { 34 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error {
33 return func() error { 35 return func() error {
34 logging.Errorf(c, "SearchQuery not implemented") 36 logging.Errorf(c, "SearchQuery not implemented")
35 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") 37 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented")
36 } 38 }
37 } 39 }
38 40
39 func isCtxErr(err error) bool { 41 func isCtxErr(err error) bool {
40 return err == context.Canceled || err == context.DeadlineExceeded 42 return err == context.Canceled || err == context.DeadlineExceeded
41 } 43 }
42 44
43 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error { 45 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error {
44 return func() error { 46 return func() error {
45 for qst, anum := range al.To { 47 for qst, anum := range al.To {
46 if len(anum.Nums) == 0 { 48 if len(anum.Nums) == 0 {
47 qry := model.QueryAttemptsForQuest(c, qst) 49 qry := model.QueryAttemptsForQuest(c, qst)
48 if !includeExpired { 50 if !includeExpired {
49 » » » » » qry = qry.Eq("Expired", false) 51 » » » » » qry = qry.Eq("ResultExpired", false)
50 } 52 }
51 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error { 53 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error {
52 aid := &dm.Attempt_ID{} 54 aid := &dm.Attempt_ID{}
53 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { 55 if err := aid.SetDMEncoded(k.StringID()) ; err != nil {
54 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) 56 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID())
55 panic(fmt.Errorf("in AttemptList Query: %s", err)) 57 panic(fmt.Errorf("in AttemptList Query: %s", err))
56 } 58 }
57 return send(aid) 59 return send(aid)
58 }) 60 })
59 if err != nil { 61 if err != nil {
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
118 } 120 }
119 if doSend { 121 if doSend {
120 if err := send(aid); err != nil { 122 if err := send(aid); err != nil {
121 return err 123 return err
122 } 124 }
123 } 125 }
124 return nil 126 return nil
125 }) 127 })
126 } 128 }
127 129
128 func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake y *datastore.Key, numEx int64, dst *dm.Attempt) error { 130 func maybeLoadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execu tion_Data_DistributorInfo) {
131 » d, _, err := reg.MakeDistributor(c, dat.ConfigName)
132 » if err != nil {
133 » » logging.Fields{
134 » » » ek: err, "cfgName": dat.ConfigName,
135 » » }.Errorf(c, "unable to load distributor")
136 » } else {
137 » » dat.Url = d.InfoURL(distributor.Token(dat.Token))
138 » }
139 }
140
141 func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model. Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error {
129 start := int64(atmpt.CurExecution) - numEx 142 start := int64(atmpt.CurExecution) - numEx
130 if start <= 0 { 143 if start <= 0 {
131 start = 1 144 start = 1
132 } 145 }
133 toLoad := (int64(atmpt.CurExecution) - start) + 1 146 toLoad := (int64(atmpt.CurExecution) - start) + 1
134 if toLoad <= 0 { 147 if toLoad <= 0 {
135 return nil 148 return nil
136 } 149 }
150 reg := distributor.GetRegistry(c)
137 dst.Executions = make(map[uint32]*dm.Execution, toLoad) 151 dst.Executions = make(map[uint32]*dm.Execution, toLoad)
138 ds := datastore.Get(c) 152 ds := datastore.Get(c)
139 q := datastore.NewQuery("Execution").Ancestor(akey).Gte( 153 q := datastore.NewQuery("Execution").Ancestor(akey).Gte(
140 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart)) 154 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart))
141 return ds.Run(q, func(e *model.Execution) error { 155 return ds.Run(q, func(e *model.Execution) error {
142 if c.Err() != nil { 156 if c.Err() != nil {
143 return datastore.Stop 157 return datastore.Stop
144 } 158 }
145 » » dst.Executions[e.ID] = e.ToProto(includeID) 159 » » p := e.ToProto(includeID)
160 » » if includeURL {
161 » » » maybeLoadInfoURL(c, reg, p.Data.DistributorInfo)
162 » » }
163 » » dst.Executions[e.ID] = p
146 return nil 164 return nil
147 }) 165 })
148 } 166 }
149 167
150 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_A uth, dst *dm.Attempt) func() error { 168 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState []byte, rsltSize uint32, lim *sizeLimit, akey *datastore.K ey, auth *dm.Execution_Auth, dst *dm.Attempt) func() error {
151 return func() error { 169 return func() error {
152 ds := datastore.Get(c) 170 ds := datastore.Get(c)
153 if auth != nil && !authedForResult { 171 if auth != nil && !authedForResult {
154 // we need to prove that the currently authed execution depends on this 172 // we need to prove that the currently authed execution depends on this
155 // attempt. 173 // attempt.
156 from := auth.Id.AttemptID().DMEncoded() 174 from := auth.Id.AttemptID().DMEncoded()
157 to := aid.DMEncoded() 175 to := aid.DMEncoded()
158 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to) 176 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to)
159 exist, err := ds.Exists(fdepKey) 177 exist, err := ds.Exists(fdepKey)
160 if err != nil { 178 if err != nil {
161 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists") 179 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists")
162 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED 180 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED
163 return err 181 return err
164 } 182 }
165 if !exist.All() { 183 if !exist.All() {
166 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED 184 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED
167 return nil 185 return nil
168 } 186 }
169 } 187 }
170 188
171 » » if !lim.PossiblyOK(rsltSize) { 189 » » siz := rsltSize + uint32(len(persistentState))
190 » » if !lim.PossiblyOK(siz) {
172 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT 191 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
173 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid) 192 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid)
174 return nil 193 return nil
175 } 194 }
176 r := &model.AttemptResult{Attempt: akey} 195 r := &model.AttemptResult{Attempt: akey}
177 if err := ds.Get(r); err != nil { 196 if err := ds.Get(r); err != nil {
178 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") 197 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult")
179 return err 198 return err
180 } 199 }
181 » » if lim.Add(r.Size) { 200 » » if lim.Add(siz) {
182 dst.Data.GetFinished().JsonResult = r.Data 201 dst.Data.GetFinished().JsonResult = r.Data
202 dst.Data.GetFinished().PersistentStateResult = persisten tState
183 dst.Partial.Result = dm.Attempt_Partial_LOADED 203 dst.Partial.Result = dm.Attempt_Partial_LOADED
184 } else { 204 } else {
185 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT 205 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
186 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid) 206 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid)
187 } 207 }
188 return nil 208 return nil
189 } 209 }
190 } 210 }
191 211
192 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error { 212 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error {
193 return func() error { 213 return func() error {
194 ds := datastore.Get(c) 214 ds := datastore.Get(c)
195 215
196 atmpt := &model.Attempt{ID: *aid} 216 atmpt := &model.Attempt{ID: *aid}
197 akey := ds.KeyForObj(atmpt) 217 akey := ds.KeyForObj(atmpt)
198 if err := ds.Get(atmpt); err != nil { 218 if err := ds.Get(atmpt); err != nil {
199 if err == datastore.ErrNoSuchEntity { 219 if err == datastore.ErrNoSuchEntity {
200 dst.DNE = true 220 dst.DNE = true
201 dst.Partial = nil 221 dst.Partial = nil
202 return nil 222 return nil
203 } 223 }
204 return err 224 return err
205 } 225 }
206 » » if !req.Include.ExpiredAttempts && atmpt.Expired { 226 » » if !req.Include.ExpiredAttempts && atmpt.ResultExpired {
207 return nil 227 return nil
208 } 228 }
229
230 persistentState := []byte(nil)
209 if req.Include.AttemptData { 231 if req.Include.AttemptData {
210 dst.Data = atmpt.DataProto() 232 dst.Data = atmpt.DataProto()
211 dst.Partial.Data = false 233 dst.Partial.Data = false
234 if fin := dst.Data.GetFinished(); fin != nil {
235 // if we're including data and finished, we only add the persistentState
236 // if we could see the attempt result. Save it o ff here, and restore it
237 // in attemptResultLoader, only if we're able to load the actual result.
238 //
239 // This is done because for some jobs the persis tentState is
240 // almost-as-good as the actual result, and we w ant to prevent
241 // false/accidental dependencies where a job is able to 'depend' on the
242 // results without actually emitting a dependenc y on them.
243 persistentState = fin.PersistentStateResult
244 fin.PersistentStateResult = nil
245 }
212 } 246 }
213 247
214 errChan := parallel.Run(0, func(ch chan<- func() error) { 248 errChan := parallel.Run(0, func(ch chan<- func() error) {
215 if req.Include.AttemptResult { 249 if req.Include.AttemptResult {
216 if atmpt.State == dm.Attempt_FINISHED { 250 if atmpt.State == dm.Attempt_FINISHED {
217 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, atmpt.ResultSize, lim, akey, req.Auth, dst) 251 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst)
218 } else { 252 } else {
219 dst.Partial.Result = dm.Attempt_Partial_ LOADED 253 dst.Partial.Result = dm.Attempt_Partial_ LOADED
220 } 254 }
221 } 255 }
222 256
223 if req.Include.NumExecutions > 0 { 257 if req.Include.NumExecutions > 0 {
224 ch <- func() error { 258 ch <- func() error {
225 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, atmpt, akey, int64(req.Include.NumExecutions), dst) 259 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutio ns), dst)
226 if err != nil { 260 if err != nil {
227 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions") 261 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions")
228 } else { 262 } else {
229 dst.Partial.Executions = false 263 dst.Partial.Executions = false
230 } 264 }
231 return err 265 return err
232 } 266 }
233 } 267 }
234 268
235 writeFwd := req.Include.FwdDeps 269 writeFwd := req.Include.FwdDeps
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
325 // a = get_attempt() 359 // a = get_attempt()
326 // if a exists { 360 // if a exists {
327 // FanOutIn( 361 // FanOutIn(
328 // maybeLoadAttemptResult, 362 // maybeLoadAttemptResult,
329 // maybeLoadExecutions, 363 // maybeLoadExecutions,
330 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both 364 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both
331 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both 365 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both
332 // ) 366 // )
333 // } 367 // }
334 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { 368 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) {
369 if req.Auth != nil {
370 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of ")
371 } else {
372 logging.Debugf(c, "for user")
373 }
374
335 cncl := (func())(nil) 375 cncl := (func())(nil)
336 timeoutProto := req.Limit.MaxTime 376 timeoutProto := req.Limit.MaxTime
337 timeout := timeoutProto.Duration() // .Duration on nil is OK 377 timeout := timeoutProto.Duration() // .Duration on nil is OK
338 if timeoutProto == nil || timeout > maxTimeout { 378 if timeoutProto == nil || timeout > maxTimeout {
339 timeout = maxTimeout 379 timeout = maxTimeout
340 } 380 }
341 c, cncl = clock.WithTimeout(c, timeout) 381 c, cncl = clock.WithTimeout(c, timeout)
342 defer cncl() 382 defer cncl()
343 383
344 // nodeChan recieves attempt nodes to process. If it recieves the 384 // nodeChan recieves attempt nodes to process. If it recieves the
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
411 outstandingJobs-- 451 outstandingJobs--
412 if err == nil { 452 if err == nil {
413 break 453 break
414 } 454 }
415 if !isCtxErr(err) { 455 if !isCtxErr(err) {
416 rsp.HadErrors = true 456 rsp.HadErrors = true
417 } 457 }
418 // assume that contextualized logging already happened 458 // assume that contextualized logging already happened
419 459
420 case n := <-nodeChan: 460 case n := <-nodeChan:
421 logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.dept h}.Infof(c, "got node")
422 qst, ok := rsp.GetQuest(n.aid.Quest) 461 qst, ok := rsp.GetQuest(n.aid.Quest)
423 if !ok { 462 if !ok {
424 if !req.Include.ObjectIds { 463 if !req.Include.ObjectIds {
425 qst.Id = nil 464 qst.Id = nil
426 } 465 }
427 if req.Include.QuestData { 466 if req.Include.QuestData {
428 addJob(questDataLoader(c, n.aid.Quest, q st)) 467 addJob(questDataLoader(c, n.aid.Quest, q st))
429 } 468 }
430 } 469 }
431 if _, ok := qst.Attempts[n.aid.Id]; !ok { 470 if _, ok := qst.Attempts[n.aid.Id]; !ok {
(...skipping 17 matching lines...) Expand all
449 sendNodeAuthed(n.depth+1))) 488 sendNodeAuthed(n.depth+1)))
450 } 489 }
451 } 490 }
452 // otherwise, we've dealt with this attempt before, so i gnore it. 491 // otherwise, we've dealt with this attempt before, so i gnore it.
453 } 492 }
454 } 493 }
455 494
456 if c.Err() != nil { 495 if c.Err() != nil {
457 rsp.HadMore = true 496 rsp.HadMore = true
458 } 497 }
459
460 return 498 return
461 } 499 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/deps/tmp_get_execution.go ('k') | appengine/cmd/dm/deps/walk_graph_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698