Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package deps | 5 package deps |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "golang.org/x/net/context" | |
| 12 "google.golang.org/grpc/codes" | |
| 13 | |
| 11 "github.com/luci/gae/service/datastore" | 14 "github.com/luci/gae/service/datastore" |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
| 12 "github.com/luci/luci-go/appengine/cmd/dm/model" | 16 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 13 dm "github.com/luci/luci-go/common/api/dm/service/v1" | 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 14 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/grpcutil" | 19 "github.com/luci/luci-go/common/grpcutil" |
| 16 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/parallel" | 21 "github.com/luci/luci-go/common/parallel" |
| 18 "golang.org/x/net/context" | |
| 19 "google.golang.org/grpc/codes" | |
| 20 ) | 22 ) |
| 21 | 23 |
| 22 const numWorkers = 16 | 24 const numWorkers = 16 |
| 23 | 25 |
| 24 const maxTimeout = 55 * time.Second // GAE limit is 60s | 26 const maxTimeout = 55 * time.Second // GAE limit is 60s |
| 25 | 27 |
| 26 type node struct { | 28 type node struct { |
| 27 aid *dm.Attempt_ID | 29 aid *dm.Attempt_ID |
| 28 depth int64 | 30 depth int64 |
| 29 canSeeAttemptResult bool | 31 canSeeAttemptResult bool |
| 30 } | 32 } |
| 31 | 33 |
| 32 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error { | 34 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error { |
| 33 return func() error { | 35 return func() error { |
| 34 logging.Errorf(c, "SearchQuery not implemented") | 36 logging.Errorf(c, "SearchQuery not implemented") |
| 35 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") | 37 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") |
| 36 } | 38 } |
| 37 } | 39 } |
| 38 | 40 |
| 39 func isCtxErr(err error) bool { | 41 func isCtxErr(err error) bool { |
| 40 return err == context.Canceled || err == context.DeadlineExceeded | 42 return err == context.Canceled || err == context.DeadlineExceeded |
| 41 } | 43 } |
| 42 | 44 |
| 43 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error { | 45 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error { |
| 44 return func() error { | 46 return func() error { |
| 45 for qst, anum := range al.To { | 47 for qst, anum := range al.To { |
| 46 if len(anum.Nums) == 0 { | 48 if len(anum.Nums) == 0 { |
| 47 qry := model.QueryAttemptsForQuest(c, qst) | 49 qry := model.QueryAttemptsForQuest(c, qst) |
| 48 if !includeExpired { | 50 if !includeExpired { |
| 49 » » » » » qry = qry.Eq("Expired", false) | 51 » » » » » qry = qry.Eq("ResultExpired", false) |
| 50 } | 52 } |
| 51 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error { | 53 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error { |
| 52 aid := &dm.Attempt_ID{} | 54 aid := &dm.Attempt_ID{} |
| 53 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { | 55 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { |
| 54 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) | 56 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) |
| 55 panic(fmt.Errorf("in AttemptList Query: %s", err)) | 57 panic(fmt.Errorf("in AttemptList Query: %s", err)) |
| 56 } | 58 } |
| 57 return send(aid) | 59 return send(aid) |
| 58 }) | 60 }) |
| 59 if err != nil { | 61 if err != nil { |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 118 } | 120 } |
| 119 if doSend { | 121 if doSend { |
| 120 if err := send(aid); err != nil { | 122 if err := send(aid); err != nil { |
| 121 return err | 123 return err |
| 122 } | 124 } |
| 123 } | 125 } |
| 124 return nil | 126 return nil |
| 125 }) | 127 }) |
| 126 } | 128 } |
| 127 | 129 |
| 128 func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake y *datastore.Key, numEx int64, dst *dm.Attempt) error { | 130 func loadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_ Data_DistributorInfo) { |
|
dnj (Google)
2016/06/09 18:00:54
WDYT about actually returning an error here and lo
iannucci
2016/06/15 00:45:58
will rename, since we only call this in one place
| |
| 131 » d, _, err := reg.MakeDistributor(c, dat.ConfigName) | |
| 132 » if err != nil { | |
| 133 » » logging.Fields{ | |
| 134 » » » ek: err, "cfgName": dat.ConfigName, | |
| 135 » » }.Errorf(c, "unable to load distributor") | |
| 136 » } else { | |
| 137 » » dat.Url = d.InfoURL(distributor.Token(dat.Token)) | |
| 138 » } | |
| 139 } | |
| 140 | |
| 141 func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model. Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { | |
| 129 start := int64(atmpt.CurExecution) - numEx | 142 start := int64(atmpt.CurExecution) - numEx |
| 130 if start <= 0 { | 143 if start <= 0 { |
| 131 start = 1 | 144 start = 1 |
| 132 } | 145 } |
| 133 toLoad := (int64(atmpt.CurExecution) - start) + 1 | 146 toLoad := (int64(atmpt.CurExecution) - start) + 1 |
| 134 if toLoad <= 0 { | 147 if toLoad <= 0 { |
| 135 return nil | 148 return nil |
| 136 } | 149 } |
| 150 reg := distributor.GetRegistry(c) | |
| 137 dst.Executions = make(map[uint32]*dm.Execution, toLoad) | 151 dst.Executions = make(map[uint32]*dm.Execution, toLoad) |
| 138 ds := datastore.Get(c) | 152 ds := datastore.Get(c) |
| 139 q := datastore.NewQuery("Execution").Ancestor(akey).Gte( | 153 q := datastore.NewQuery("Execution").Ancestor(akey).Gte( |
| 140 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart)) | 154 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart)) |
| 141 return ds.Run(q, func(e *model.Execution) error { | 155 return ds.Run(q, func(e *model.Execution) error { |
| 142 if c.Err() != nil { | 156 if c.Err() != nil { |
| 143 return datastore.Stop | 157 return datastore.Stop |
| 144 } | 158 } |
| 145 » » dst.Executions[e.ID] = e.ToProto(includeID) | 159 » » p := e.ToProto(includeID) |
| 160 » » if includeURL { | |
| 161 » » » loadInfoURL(c, reg, p.Data.DistributorInfo) | |
| 162 » » } | |
| 163 » » dst.Executions[e.ID] = p | |
| 146 return nil | 164 return nil |
| 147 }) | 165 }) |
| 148 } | 166 } |
| 149 | 167 |
| 150 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_A uth, dst *dm.Attempt) func() error { | 168 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState string, rsltSize uint32, lim *sizeLimit, akey *datastore.K ey, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
| 151 return func() error { | 169 return func() error { |
| 152 ds := datastore.Get(c) | 170 ds := datastore.Get(c) |
| 153 if auth != nil && !authedForResult { | 171 if auth != nil && !authedForResult { |
| 154 // we need to prove that the currently authed execution depends on this | 172 // we need to prove that the currently authed execution depends on this |
| 155 // attempt. | 173 // attempt. |
| 156 from := auth.Id.AttemptID().DMEncoded() | 174 from := auth.Id.AttemptID().DMEncoded() |
| 157 to := aid.DMEncoded() | 175 to := aid.DMEncoded() |
| 158 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to) | 176 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to) |
| 159 exist, err := ds.Exists(fdepKey) | 177 exist, err := ds.Exists(fdepKey) |
| 160 if err != nil { | 178 if err != nil { |
| 161 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists") | 179 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists") |
| 162 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED | 180 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED |
| 163 return err | 181 return err |
| 164 } | 182 } |
| 165 » » » if !exist { | 183 » » » if !exist.Any() { |
|
dnj (Google)
2016/06/09 18:00:54
nit: I would use "All" here. Obviously either work
iannucci
2016/06/15 00:45:58
yeah changed w/ rebarse
| |
| 166 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED | 184 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED |
| 167 return nil | 185 return nil |
| 168 } | 186 } |
| 169 } | 187 } |
| 170 | 188 |
| 171 » » if !lim.PossiblyOK(rsltSize) { | 189 » » siz := rsltSize + uint32(len(persistentState)) |
| 190 » » if !lim.PossiblyOK(siz) { | |
| 172 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT | 191 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
| 173 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid) | 192 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid) |
| 174 return nil | 193 return nil |
| 175 } | 194 } |
| 176 r := &model.AttemptResult{Attempt: akey} | 195 r := &model.AttemptResult{Attempt: akey} |
| 177 if err := ds.Get(r); err != nil { | 196 if err := ds.Get(r); err != nil { |
| 178 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") | 197 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") |
| 179 return err | 198 return err |
| 180 } | 199 } |
| 181 » » if lim.Add(r.Size) { | 200 » » if lim.Add(siz) { |
| 182 dst.Data.GetFinished().JsonResult = r.Data | 201 dst.Data.GetFinished().JsonResult = r.Data |
| 202 dst.Data.GetFinished().PersistentStateResult = persisten tState | |
| 183 dst.Partial.Result = dm.Attempt_Partial_LOADED | 203 dst.Partial.Result = dm.Attempt_Partial_LOADED |
| 184 } else { | 204 } else { |
| 185 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT | 205 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
| 186 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid) | 206 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid) |
| 187 } | 207 } |
| 188 return nil | 208 return nil |
| 189 } | 209 } |
| 190 } | 210 } |
| 191 | 211 |
| 192 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error { | 212 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error { |
| 193 return func() error { | 213 return func() error { |
| 194 ds := datastore.Get(c) | 214 ds := datastore.Get(c) |
| 195 | 215 |
| 196 atmpt := &model.Attempt{ID: *aid} | 216 atmpt := &model.Attempt{ID: *aid} |
| 197 akey := ds.KeyForObj(atmpt) | 217 akey := ds.KeyForObj(atmpt) |
| 198 if err := ds.Get(atmpt); err != nil { | 218 if err := ds.Get(atmpt); err != nil { |
| 199 if err == datastore.ErrNoSuchEntity { | 219 if err == datastore.ErrNoSuchEntity { |
| 200 dst.DNE = true | 220 dst.DNE = true |
| 201 dst.Partial = nil | 221 dst.Partial = nil |
| 202 return nil | 222 return nil |
| 203 } | 223 } |
| 204 return err | 224 return err |
| 205 } | 225 } |
| 206 » » if !req.Include.ExpiredAttempts && atmpt.Expired { | 226 » » if !req.Include.ExpiredAttempts && atmpt.ResultExpired { |
| 207 return nil | 227 return nil |
| 208 } | 228 } |
| 229 | |
| 230 persistentState := "" | |
| 209 if req.Include.AttemptData { | 231 if req.Include.AttemptData { |
| 210 dst.Data = atmpt.DataProto() | 232 dst.Data = atmpt.DataProto() |
| 211 dst.Partial.Data = false | 233 dst.Partial.Data = false |
| 234 if fin := dst.Data.GetFinished(); fin != nil { | |
| 235 // if we're including data and finished, we only add the persistentState | |
| 236 // if we could see the attempt result. Save it o ff here, and restore it | |
| 237 // in attemptResultLoader, only if we're able to load the actual result. | |
| 238 // | |
| 239 // This is done because for some jobs the persis tentState is | |
| 240 // almost-as-good as the actual result, and we w ant to prevent | |
| 241 // false/accidental dependencies where a job is able to 'depend' on the | |
| 242 // results without actually emitting a dependenc y on them. | |
| 243 persistentState = fin.PersistentStateResult | |
| 244 fin.PersistentStateResult = "" | |
| 245 } | |
| 212 } | 246 } |
| 213 | 247 |
| 214 errChan := parallel.Run(0, func(ch chan<- func() error) { | 248 errChan := parallel.Run(0, func(ch chan<- func() error) { |
| 215 if req.Include.AttemptResult { | 249 if req.Include.AttemptResult { |
| 216 if atmpt.State == dm.Attempt_FINISHED { | 250 if atmpt.State == dm.Attempt_FINISHED { |
| 217 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, atmpt.ResultSize, lim, akey, req.Auth, dst) | 251 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst) |
| 218 } else { | 252 } else { |
| 219 dst.Partial.Result = dm.Attempt_Partial_ LOADED | 253 dst.Partial.Result = dm.Attempt_Partial_ LOADED |
| 220 } | 254 } |
| 221 } | 255 } |
| 222 | 256 |
| 223 if req.Include.NumExecutions > 0 { | 257 if req.Include.NumExecutions > 0 { |
| 224 ch <- func() error { | 258 ch <- func() error { |
| 225 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, atmpt, akey, int64(req.Include.NumExecutions), dst) | 259 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutio ns), dst) |
| 226 if err != nil { | 260 if err != nil { |
| 227 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions") | 261 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions") |
| 228 } else { | 262 } else { |
| 229 dst.Partial.Executions = false | 263 dst.Partial.Executions = false |
| 230 } | 264 } |
| 231 return err | 265 return err |
| 232 } | 266 } |
| 233 } | 267 } |
| 234 | 268 |
| 235 writeFwd := req.Include.FwdDeps | 269 writeFwd := req.Include.FwdDeps |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 325 // a = get_attempt() | 359 // a = get_attempt() |
| 326 // if a exists { | 360 // if a exists { |
| 327 // FanOutIn( | 361 // FanOutIn( |
| 328 // maybeLoadAttemptResult, | 362 // maybeLoadAttemptResult, |
| 329 // maybeLoadExecutions, | 363 // maybeLoadExecutions, |
| 330 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both | 364 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both |
| 331 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both | 365 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both |
| 332 // ) | 366 // ) |
| 333 // } | 367 // } |
| 334 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { | 368 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { |
| 369 if req.Auth != nil { | |
| 370 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of ") | |
| 371 } else { | |
| 372 logging.Debugf(c, "for user") | |
| 373 } | |
| 374 | |
| 335 cncl := (func())(nil) | 375 cncl := (func())(nil) |
| 336 timeoutProto := req.Limit.MaxTime | 376 timeoutProto := req.Limit.MaxTime |
| 337 timeout := timeoutProto.Duration() // .Duration on nil is OK | 377 timeout := timeoutProto.Duration() // .Duration on nil is OK |
| 338 if timeoutProto == nil || timeout > maxTimeout { | 378 if timeoutProto == nil || timeout > maxTimeout { |
| 339 timeout = maxTimeout | 379 timeout = maxTimeout |
| 340 } | 380 } |
| 341 c, cncl = clock.WithTimeout(c, timeout) | 381 c, cncl = clock.WithTimeout(c, timeout) |
| 342 defer cncl() | 382 defer cncl() |
| 343 | 383 |
| 344 // nodeChan recieves attempt nodes to process. If it recieves the | 384 // nodeChan recieves attempt nodes to process. If it recieves the |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 411 outstandingJobs-- | 451 outstandingJobs-- |
| 412 if err == nil { | 452 if err == nil { |
| 413 break | 453 break |
| 414 } | 454 } |
| 415 if !isCtxErr(err) { | 455 if !isCtxErr(err) { |
| 416 rsp.HadErrors = true | 456 rsp.HadErrors = true |
| 417 } | 457 } |
| 418 // assume that contextualized logging already happened | 458 // assume that contextualized logging already happened |
| 419 | 459 |
| 420 case n := <-nodeChan: | 460 case n := <-nodeChan: |
| 421 logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.dept h}.Infof(c, "got node") | |
| 422 qst, ok := rsp.GetQuest(n.aid.Quest) | 461 qst, ok := rsp.GetQuest(n.aid.Quest) |
| 423 if !ok { | 462 if !ok { |
| 424 if !req.Include.ObjectIds { | 463 if !req.Include.ObjectIds { |
| 425 qst.Id = nil | 464 qst.Id = nil |
| 426 } | 465 } |
| 427 if req.Include.QuestData { | 466 if req.Include.QuestData { |
| 428 addJob(questDataLoader(c, n.aid.Quest, q st)) | 467 addJob(questDataLoader(c, n.aid.Quest, q st)) |
| 429 } | 468 } |
| 430 } | 469 } |
| 431 if _, ok := qst.Attempts[n.aid.Id]; !ok { | 470 if _, ok := qst.Attempts[n.aid.Id]; !ok { |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 449 sendNodeAuthed(n.depth+1))) | 488 sendNodeAuthed(n.depth+1))) |
| 450 } | 489 } |
| 451 } | 490 } |
| 452 // otherwise, we've dealt with this attempt before, so i gnore it. | 491 // otherwise, we've dealt with this attempt before, so i gnore it. |
| 453 } | 492 } |
| 454 } | 493 } |
| 455 | 494 |
| 456 if c.Err() != nil { | 495 if c.Err() != nil { |
| 457 rsp.HadMore = true | 496 rsp.HadMore = true |
| 458 } | 497 } |
| 459 | |
| 460 return | 498 return |
| 461 } | 499 } |
| OLD | NEW |