Chromium Code Reviews| Index: appengine/cmd/dm/deps/walk_graph.go |
| diff --git a/appengine/cmd/dm/deps/walk_graph.go b/appengine/cmd/dm/deps/walk_graph.go |
| index 223b0ba7a5f22c0150e91b7d36c567863a8a7f19..f3e5bfaef5add75b87bd4787f43c4aa4be800ce0 100644 |
| --- a/appengine/cmd/dm/deps/walk_graph.go |
| +++ b/appengine/cmd/dm/deps/walk_graph.go |
| @@ -8,15 +8,17 @@ import ( |
| "fmt" |
| "time" |
| + "golang.org/x/net/context" |
| + "google.golang.org/grpc/codes" |
| + |
| "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| "github.com/luci/luci-go/appengine/cmd/dm/model" |
| dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/grpcutil" |
| "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/parallel" |
| - "golang.org/x/net/context" |
| - "google.golang.org/grpc/codes" |
| ) |
| const numWorkers = 16 |
| @@ -46,7 +48,7 @@ func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A |
| if len(anum.Nums) == 0 { |
| qry := model.QueryAttemptsForQuest(c, qst) |
| if !includeExpired { |
| - qry = qry.Eq("Expired", false) |
| + qry = qry.Eq("ResultExpired", false) |
| } |
| err := datastore.Get(c).Run(qry, func(k *datastore.Key) error { |
| aid := &dm.Attempt_ID{} |
| @@ -125,7 +127,18 @@ func loadEdges(c context.Context, send func(*dm.Attempt_ID) error, typ string, b |
| }) |
| } |
| -func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { |
| +func loadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_Data_DistributorInfo) { |
|
dnj (Google)
2016/06/09 18:00:54
WDYT about actually returning an error here and lo
iannucci
2016/06/15 00:45:58
will rename, since we only call this in one place
|
| + d, _, err := reg.MakeDistributor(c, dat.ConfigName) |
| + if err != nil { |
| + logging.Fields{ |
| + ek: err, "cfgName": dat.ConfigName, |
| + }.Errorf(c, "unable to load distributor") |
| + } else { |
| + dat.Url = d.InfoURL(distributor.Token(dat.Token)) |
| + } |
| +} |
| + |
| +func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { |
| start := int64(atmpt.CurExecution) - numEx |
| if start <= 0 { |
| start = 1 |
| @@ -134,6 +147,7 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake |
| if toLoad <= 0 { |
| return nil |
| } |
| + reg := distributor.GetRegistry(c) |
| dst.Executions = make(map[uint32]*dm.Execution, toLoad) |
| ds := datastore.Get(c) |
| q := datastore.NewQuery("Execution").Ancestor(akey).Gte( |
| @@ -142,12 +156,16 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake |
| if c.Err() != nil { |
| return datastore.Stop |
| } |
| - dst.Executions[e.ID] = e.ToProto(includeID) |
| + p := e.ToProto(includeID) |
| + if includeURL { |
| + loadInfoURL(c, reg, p.Data.DistributorInfo) |
| + } |
| + dst.Executions[e.ID] = p |
| return nil |
| }) |
| } |
| -func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
| +func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState string, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
| return func() error { |
| ds := datastore.Get(c) |
| if auth != nil && !authedForResult { |
| @@ -162,13 +180,14 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult |
| dst.Partial.Result = dm.Attempt_Partial_NOT_AUTHORIZED |
| return err |
| } |
| - if !exist { |
| + if !exist.Any() { |
|
dnj (Google)
2016/06/09 18:00:54
nit: I would use "All" here. Obviously either work
iannucci
2016/06/15 00:45:58
yeah changed w/ rebarse
|
| dst.Partial.Result = dm.Attempt_Partial_NOT_AUTHORIZED |
| return nil |
| } |
| } |
| - if !lim.PossiblyOK(rsltSize) { |
| + siz := rsltSize + uint32(len(persistentState)) |
| + if !lim.PossiblyOK(siz) { |
| dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
| logging.Infof(c, "skipping load of AttemptResult %s (size limit)", aid) |
| return nil |
| @@ -178,8 +197,9 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult |
| logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") |
| return err |
| } |
| - if lim.Add(r.Size) { |
| + if lim.Add(siz) { |
| dst.Data.GetFinished().JsonResult = r.Data |
| + dst.Data.GetFinished().PersistentStateResult = persistentState |
| dst.Partial.Result = dm.Attempt_Partial_LOADED |
| } else { |
| dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
| @@ -203,18 +223,32 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
| } |
| return err |
| } |
| - if !req.Include.ExpiredAttempts && atmpt.Expired { |
| + if !req.Include.ExpiredAttempts && atmpt.ResultExpired { |
| return nil |
| } |
| + |
| + persistentState := "" |
| if req.Include.AttemptData { |
| dst.Data = atmpt.DataProto() |
| dst.Partial.Data = false |
| + if fin := dst.Data.GetFinished(); fin != nil { |
| + // if we're including data and finished, we only add the persistentState |
| + // if we could see the attempt result. Save it off here, and restore it |
| + // in attemptResultLoader, only if we're able to load the actual result. |
| + // |
| + // This is done because for some jobs the persistentState is |
| + // almost-as-good as the actual result, and we want to prevent |
| + // false/accidental dependencies where a job is able to 'depend' on the |
| + // results without actually emitting a dependency on them. |
| + persistentState = fin.PersistentStateResult |
| + fin.PersistentStateResult = "" |
| + } |
| } |
| errChan := parallel.Run(0, func(ch chan<- func() error) { |
| if req.Include.AttemptResult { |
| if atmpt.State == dm.Attempt_FINISHED { |
| - ch <- attemptResultLoader(c, aid, authedForResult, atmpt.ResultSize, lim, akey, req.Auth, dst) |
| + ch <- attemptResultLoader(c, aid, authedForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst) |
| } else { |
| dst.Partial.Result = dm.Attempt_Partial_LOADED |
| } |
| @@ -222,7 +256,7 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
| if req.Include.NumExecutions > 0 { |
| ch <- func() error { |
| - err := loadExecutions(c, req.Include.ObjectIds, atmpt, akey, int64(req.Include.NumExecutions), dst) |
| + err := loadExecutions(c, req.Include.ObjectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutions), dst) |
| if err != nil { |
| logging.Fields{ek: err, "aid": aid}.Errorf(c, "error loading executions") |
| } else { |
| @@ -332,6 +366,12 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
| // ) |
| // } |
| func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, err error) { |
| + if req.Auth != nil { |
| + logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of") |
| + } else { |
| + logging.Debugf(c, "for user") |
| + } |
| + |
| cncl := (func())(nil) |
| timeoutProto := req.Limit.MaxTime |
| timeout := timeoutProto.Duration() // .Duration on nil is OK |
| @@ -418,7 +458,6 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph |
| // assume that contextualized logging already happened |
| case n := <-nodeChan: |
| - logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.depth}.Infof(c, "got node") |
| qst, ok := rsp.GetQuest(n.aid.Quest) |
| if !ok { |
| if !req.Include.ObjectIds { |
| @@ -456,6 +495,5 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph |
| if c.Err() != nil { |
| rsp.HadMore = true |
| } |
| - |
| return |
| } |