OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package deps | 5 package deps |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "time" | 9 "time" |
10 | 10 |
11 "golang.org/x/net/context" | |
12 "google.golang.org/grpc/codes" | |
13 | |
11 "github.com/luci/gae/service/datastore" | 14 "github.com/luci/gae/service/datastore" |
15 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
12 "github.com/luci/luci-go/appengine/cmd/dm/model" | 16 "github.com/luci/luci-go/appengine/cmd/dm/model" |
13 dm "github.com/luci/luci-go/common/api/dm/service/v1" | 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
14 "github.com/luci/luci-go/common/clock" | 18 "github.com/luci/luci-go/common/clock" |
15 "github.com/luci/luci-go/common/grpcutil" | 19 "github.com/luci/luci-go/common/grpcutil" |
16 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
17 "github.com/luci/luci-go/common/parallel" | 21 "github.com/luci/luci-go/common/parallel" |
18 "golang.org/x/net/context" | |
19 "google.golang.org/grpc/codes" | |
20 ) | 22 ) |
21 | 23 |
22 const numWorkers = 16 | 24 const numWorkers = 16 |
23 | 25 |
24 const maxTimeout = 55 * time.Second // GAE limit is 60s | 26 const maxTimeout = 55 * time.Second // GAE limit is 60s |
25 | 27 |
26 type node struct { | 28 type node struct { |
27 aid *dm.Attempt_ID | 29 aid *dm.Attempt_ID |
28 depth int64 | 30 depth int64 |
29 canSeeAttemptResult bool | 31 canSeeAttemptResult bool |
30 } | 32 } |
31 | 33 |
32 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error { | 34 func runSearchQuery(c context.Context, send func(*dm.Attempt_ID) error, s *dm.Gr aphQuery_Search) func() error { |
33 return func() error { | 35 return func() error { |
34 logging.Errorf(c, "SearchQuery not implemented") | 36 logging.Errorf(c, "SearchQuery not implemented") |
35 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") | 37 return grpcutil.Errf(codes.Unimplemented, "GraphQuery.Search is not implemented") |
36 } | 38 } |
37 } | 39 } |
38 | 40 |
39 func isCtxErr(err error) bool { | 41 func isCtxErr(err error) bool { |
40 return err == context.Canceled || err == context.DeadlineExceeded | 42 return err == context.Canceled || err == context.DeadlineExceeded |
41 } | 43 } |
42 | 44 |
43 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error { | 45 func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A ttempt_ID) error, al *dm.AttemptList) func() error { |
44 return func() error { | 46 return func() error { |
45 for qst, anum := range al.To { | 47 for qst, anum := range al.To { |
46 if len(anum.Nums) == 0 { | 48 if len(anum.Nums) == 0 { |
47 qry := model.QueryAttemptsForQuest(c, qst) | 49 qry := model.QueryAttemptsForQuest(c, qst) |
48 if !includeExpired { | 50 if !includeExpired { |
49 » » » » » qry = qry.Eq("Expired", false) | 51 » » » » » qry = qry.Eq("ResultExpired", false) |
50 } | 52 } |
51 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error { | 53 err := datastore.Get(c).Run(qry, func(k *datasto re.Key) error { |
52 aid := &dm.Attempt_ID{} | 54 aid := &dm.Attempt_ID{} |
53 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { | 55 if err := aid.SetDMEncoded(k.StringID()) ; err != nil { |
54 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) | 56 logging.WithError(err).Errorf(c, "Attempt_ID.SetDMEncoded returned an error with input: %q", k.StringID()) |
55 panic(fmt.Errorf("in AttemptList Query: %s", err)) | 57 panic(fmt.Errorf("in AttemptList Query: %s", err)) |
56 } | 58 } |
57 return send(aid) | 59 return send(aid) |
58 }) | 60 }) |
59 if err != nil { | 61 if err != nil { |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
118 } | 120 } |
119 if doSend { | 121 if doSend { |
120 if err := send(aid); err != nil { | 122 if err := send(aid); err != nil { |
121 return err | 123 return err |
122 } | 124 } |
123 } | 125 } |
124 return nil | 126 return nil |
125 }) | 127 }) |
126 } | 128 } |
127 | 129 |
128 func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake y *datastore.Key, numEx int64, dst *dm.Attempt) error { | 130 func loadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_ Data_DistributorInfo) { |
dnj (Google)
2016/06/09 18:00:54
WDYT about actually returning an error here and lo
iannucci
2016/06/15 00:45:58
will rename, since we only call this in one place
| |
131 » d, _, err := reg.MakeDistributor(c, dat.ConfigName) | |
132 » if err != nil { | |
133 » » logging.Fields{ | |
134 » » » ek: err, "cfgName": dat.ConfigName, | |
135 » » }.Errorf(c, "unable to load distributor") | |
136 » } else { | |
137 » » dat.Url = d.InfoURL(distributor.Token(dat.Token)) | |
138 » } | |
139 } | |
140 | |
141 func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model. Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { | |
129 start := int64(atmpt.CurExecution) - numEx | 142 start := int64(atmpt.CurExecution) - numEx |
130 if start <= 0 { | 143 if start <= 0 { |
131 start = 1 | 144 start = 1 |
132 } | 145 } |
133 toLoad := (int64(atmpt.CurExecution) - start) + 1 | 146 toLoad := (int64(atmpt.CurExecution) - start) + 1 |
134 if toLoad <= 0 { | 147 if toLoad <= 0 { |
135 return nil | 148 return nil |
136 } | 149 } |
150 reg := distributor.GetRegistry(c) | |
137 dst.Executions = make(map[uint32]*dm.Execution, toLoad) | 151 dst.Executions = make(map[uint32]*dm.Execution, toLoad) |
138 ds := datastore.Get(c) | 152 ds := datastore.Get(c) |
139 q := datastore.NewQuery("Execution").Ancestor(akey).Gte( | 153 q := datastore.NewQuery("Execution").Ancestor(akey).Gte( |
140 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart)) | 154 "__key__", ds.MakeKey("Attempt", akey.StringID(), "Execution", s tart)) |
141 return ds.Run(q, func(e *model.Execution) error { | 155 return ds.Run(q, func(e *model.Execution) error { |
142 if c.Err() != nil { | 156 if c.Err() != nil { |
143 return datastore.Stop | 157 return datastore.Stop |
144 } | 158 } |
145 » » dst.Executions[e.ID] = e.ToProto(includeID) | 159 » » p := e.ToProto(includeID) |
160 » » if includeURL { | |
161 » » » loadInfoURL(c, reg, p.Data.DistributorInfo) | |
162 » » } | |
163 » » dst.Executions[e.ID] = p | |
146 return nil | 164 return nil |
147 }) | 165 }) |
148 } | 166 } |
149 | 167 |
150 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_A uth, dst *dm.Attempt) func() error { | 168 func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState string, rsltSize uint32, lim *sizeLimit, akey *datastore.K ey, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
151 return func() error { | 169 return func() error { |
152 ds := datastore.Get(c) | 170 ds := datastore.Get(c) |
153 if auth != nil && !authedForResult { | 171 if auth != nil && !authedForResult { |
154 // we need to prove that the currently authed execution depends on this | 172 // we need to prove that the currently authed execution depends on this |
155 // attempt. | 173 // attempt. |
156 from := auth.Id.AttemptID().DMEncoded() | 174 from := auth.Id.AttemptID().DMEncoded() |
157 to := aid.DMEncoded() | 175 to := aid.DMEncoded() |
158 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to) | 176 fdepKey := ds.MakeKey("Attempt", from, "FwdDep", to) |
159 exist, err := ds.Exists(fdepKey) | 177 exist, err := ds.Exists(fdepKey) |
160 if err != nil { | 178 if err != nil { |
161 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists") | 179 logging.Fields{ek: err, "key": fdepKey}.Errorf(c , "failed to determine if FwdDep exists") |
162 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED | 180 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED |
163 return err | 181 return err |
164 } | 182 } |
165 » » » if !exist { | 183 » » » if !exist.Any() { |
dnj (Google)
2016/06/09 18:00:54
nit: I would use "All" here. Obviously either work
iannucci
2016/06/15 00:45:58
yeah changed w/ rebarse
| |
166 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED | 184 dst.Partial.Result = dm.Attempt_Partial_NOT_AUTH ORIZED |
167 return nil | 185 return nil |
168 } | 186 } |
169 } | 187 } |
170 | 188 |
171 » » if !lim.PossiblyOK(rsltSize) { | 189 » » siz := rsltSize + uint32(len(persistentState)) |
190 » » if !lim.PossiblyOK(siz) { | |
172 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT | 191 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
173 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid) | 192 logging.Infof(c, "skipping load of AttemptResult %s (siz e limit)", aid) |
174 return nil | 193 return nil |
175 } | 194 } |
176 r := &model.AttemptResult{Attempt: akey} | 195 r := &model.AttemptResult{Attempt: akey} |
177 if err := ds.Get(r); err != nil { | 196 if err := ds.Get(r); err != nil { |
178 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") | 197 logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") |
179 return err | 198 return err |
180 } | 199 } |
181 » » if lim.Add(r.Size) { | 200 » » if lim.Add(siz) { |
182 dst.Data.GetFinished().JsonResult = r.Data | 201 dst.Data.GetFinished().JsonResult = r.Data |
202 dst.Data.GetFinished().PersistentStateResult = persisten tState | |
183 dst.Partial.Result = dm.Attempt_Partial_LOADED | 203 dst.Partial.Result = dm.Attempt_Partial_LOADED |
184 } else { | 204 } else { |
185 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT | 205 dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
186 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid) | 206 logging.Infof(c, "loaded AttemptResult %s, but hit size limit after", aid) |
187 } | 207 } |
188 return nil | 208 return nil |
189 } | 209 } |
190 } | 210 } |
191 | 211 |
192 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error { | 212 func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, authedForResult bool, lim *sizeLimit, dst *dm.Attempt, send func(aid *dm.Attempt _ID, fwd bool) error) func() error { |
193 return func() error { | 213 return func() error { |
194 ds := datastore.Get(c) | 214 ds := datastore.Get(c) |
195 | 215 |
196 atmpt := &model.Attempt{ID: *aid} | 216 atmpt := &model.Attempt{ID: *aid} |
197 akey := ds.KeyForObj(atmpt) | 217 akey := ds.KeyForObj(atmpt) |
198 if err := ds.Get(atmpt); err != nil { | 218 if err := ds.Get(atmpt); err != nil { |
199 if err == datastore.ErrNoSuchEntity { | 219 if err == datastore.ErrNoSuchEntity { |
200 dst.DNE = true | 220 dst.DNE = true |
201 dst.Partial = nil | 221 dst.Partial = nil |
202 return nil | 222 return nil |
203 } | 223 } |
204 return err | 224 return err |
205 } | 225 } |
206 » » if !req.Include.ExpiredAttempts && atmpt.Expired { | 226 » » if !req.Include.ExpiredAttempts && atmpt.ResultExpired { |
207 return nil | 227 return nil |
208 } | 228 } |
229 | |
230 persistentState := "" | |
209 if req.Include.AttemptData { | 231 if req.Include.AttemptData { |
210 dst.Data = atmpt.DataProto() | 232 dst.Data = atmpt.DataProto() |
211 dst.Partial.Data = false | 233 dst.Partial.Data = false |
234 if fin := dst.Data.GetFinished(); fin != nil { | |
235 // if we're including data and finished, we only add the persistentState | |
236 // if we could see the attempt result. Save it o ff here, and restore it | |
237 // in attemptResultLoader, only if we're able to load the actual result. | |
238 // | |
239 // This is done because for some jobs the persis tentState is | |
240 // almost-as-good as the actual result, and we w ant to prevent | |
241 // false/accidental dependencies where a job is able to 'depend' on the | |
242 // results without actually emitting a dependenc y on them. | |
243 persistentState = fin.PersistentStateResult | |
244 fin.PersistentStateResult = "" | |
245 } | |
212 } | 246 } |
213 | 247 |
214 errChan := parallel.Run(0, func(ch chan<- func() error) { | 248 errChan := parallel.Run(0, func(ch chan<- func() error) { |
215 if req.Include.AttemptResult { | 249 if req.Include.AttemptResult { |
216 if atmpt.State == dm.Attempt_FINISHED { | 250 if atmpt.State == dm.Attempt_FINISHED { |
217 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, atmpt.ResultSize, lim, akey, req.Auth, dst) | 251 » » » » » ch <- attemptResultLoader(c, aid, authed ForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst) |
218 } else { | 252 } else { |
219 dst.Partial.Result = dm.Attempt_Partial_ LOADED | 253 dst.Partial.Result = dm.Attempt_Partial_ LOADED |
220 } | 254 } |
221 } | 255 } |
222 | 256 |
223 if req.Include.NumExecutions > 0 { | 257 if req.Include.NumExecutions > 0 { |
224 ch <- func() error { | 258 ch <- func() error { |
225 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, atmpt, akey, int64(req.Include.NumExecutions), dst) | 259 » » » » » err := loadExecutions(c, req.Include.Obj ectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutio ns), dst) |
226 if err != nil { | 260 if err != nil { |
227 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions") | 261 logging.Fields{ek: err, "aid": a id}.Errorf(c, "error loading executions") |
228 } else { | 262 } else { |
229 dst.Partial.Executions = false | 263 dst.Partial.Executions = false |
230 } | 264 } |
231 return err | 265 return err |
232 } | 266 } |
233 } | 267 } |
234 | 268 |
235 writeFwd := req.Include.FwdDeps | 269 writeFwd := req.Include.FwdDeps |
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
325 // a = get_attempt() | 359 // a = get_attempt() |
326 // if a exists { | 360 // if a exists { |
327 // FanOutIn( | 361 // FanOutIn( |
328 // maybeLoadAttemptResult, | 362 // maybeLoadAttemptResult, |
329 // maybeLoadExecutions, | 363 // maybeLoadExecutions, |
330 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both | 364 // maybeLoadFwdDeps, // sends to nodeChan if walking direction Fwd|Both |
331 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both | 365 // maybeLoadBackDeps // sends to nodeChan if walking direction Back|Both |
332 // ) | 366 // ) |
333 // } | 367 // } |
334 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { | 368 func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph Data, err error) { |
369 if req.Auth != nil { | |
370 logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of ") | |
371 } else { | |
372 logging.Debugf(c, "for user") | |
373 } | |
374 | |
335 cncl := (func())(nil) | 375 cncl := (func())(nil) |
336 timeoutProto := req.Limit.MaxTime | 376 timeoutProto := req.Limit.MaxTime |
337 timeout := timeoutProto.Duration() // .Duration on nil is OK | 377 timeout := timeoutProto.Duration() // .Duration on nil is OK |
338 if timeoutProto == nil || timeout > maxTimeout { | 378 if timeoutProto == nil || timeout > maxTimeout { |
339 timeout = maxTimeout | 379 timeout = maxTimeout |
340 } | 380 } |
341 c, cncl = clock.WithTimeout(c, timeout) | 381 c, cncl = clock.WithTimeout(c, timeout) |
342 defer cncl() | 382 defer cncl() |
343 | 383 |
344 // nodeChan recieves attempt nodes to process. If it recieves the | 384 // nodeChan recieves attempt nodes to process. If it recieves the |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
411 outstandingJobs-- | 451 outstandingJobs-- |
412 if err == nil { | 452 if err == nil { |
413 break | 453 break |
414 } | 454 } |
415 if !isCtxErr(err) { | 455 if !isCtxErr(err) { |
416 rsp.HadErrors = true | 456 rsp.HadErrors = true |
417 } | 457 } |
418 // assume that contextualized logging already happened | 458 // assume that contextualized logging already happened |
419 | 459 |
420 case n := <-nodeChan: | 460 case n := <-nodeChan: |
421 logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.dept h}.Infof(c, "got node") | |
422 qst, ok := rsp.GetQuest(n.aid.Quest) | 461 qst, ok := rsp.GetQuest(n.aid.Quest) |
423 if !ok { | 462 if !ok { |
424 if !req.Include.ObjectIds { | 463 if !req.Include.ObjectIds { |
425 qst.Id = nil | 464 qst.Id = nil |
426 } | 465 } |
427 if req.Include.QuestData { | 466 if req.Include.QuestData { |
428 addJob(questDataLoader(c, n.aid.Quest, q st)) | 467 addJob(questDataLoader(c, n.aid.Quest, q st)) |
429 } | 468 } |
430 } | 469 } |
431 if _, ok := qst.Attempts[n.aid.Id]; !ok { | 470 if _, ok := qst.Attempts[n.aid.Id]; !ok { |
(...skipping 17 matching lines...) Expand all Loading... | |
449 sendNodeAuthed(n.depth+1))) | 488 sendNodeAuthed(n.depth+1))) |
450 } | 489 } |
451 } | 490 } |
452 // otherwise, we've dealt with this attempt before, so i gnore it. | 491 // otherwise, we've dealt with this attempt before, so i gnore it. |
453 } | 492 } |
454 } | 493 } |
455 | 494 |
456 if c.Err() != nil { | 495 if c.Err() != nil { |
457 rsp.HadMore = true | 496 rsp.HadMore = true |
458 } | 497 } |
459 | |
460 return | 498 return |
461 } | 499 } |
OLD | NEW |