Index: appengine/cmd/dm/deps/walk_graph.go |
diff --git a/appengine/cmd/dm/deps/walk_graph.go b/appengine/cmd/dm/deps/walk_graph.go |
index 223b0ba7a5f22c0150e91b7d36c567863a8a7f19..f3e5bfaef5add75b87bd4787f43c4aa4be800ce0 100644 |
--- a/appengine/cmd/dm/deps/walk_graph.go |
+++ b/appengine/cmd/dm/deps/walk_graph.go |
@@ -8,15 +8,17 @@ import ( |
"fmt" |
"time" |
+ "golang.org/x/net/context" |
+ "google.golang.org/grpc/codes" |
+ |
"github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
"github.com/luci/luci-go/appengine/cmd/dm/model" |
dm "github.com/luci/luci-go/common/api/dm/service/v1" |
"github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/grpcutil" |
"github.com/luci/luci-go/common/logging" |
"github.com/luci/luci-go/common/parallel" |
- "golang.org/x/net/context" |
- "google.golang.org/grpc/codes" |
) |
const numWorkers = 16 |
@@ -46,7 +48,7 @@ func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A |
if len(anum.Nums) == 0 { |
qry := model.QueryAttemptsForQuest(c, qst) |
if !includeExpired { |
- qry = qry.Eq("Expired", false) |
+ qry = qry.Eq("ResultExpired", false) |
} |
err := datastore.Get(c).Run(qry, func(k *datastore.Key) error { |
aid := &dm.Attempt_ID{} |
@@ -125,7 +127,18 @@ func loadEdges(c context.Context, send func(*dm.Attempt_ID) error, typ string, b |
}) |
} |
-func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { |
+func loadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_Data_DistributorInfo) { |
dnj (Google)
2016/06/09 18:00:54
WDYT about actually returning an error here and lo
iannucci
2016/06/15 00:45:58
will rename, since we only call this in one place
|
+ d, _, err := reg.MakeDistributor(c, dat.ConfigName) |
+ if err != nil { |
+ logging.Fields{ |
+ ek: err, "cfgName": dat.ConfigName, |
+ }.Errorf(c, "unable to load distributor") |
+ } else { |
+ dat.Url = d.InfoURL(distributor.Token(dat.Token)) |
+ } |
+} |
+ |
+func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error { |
start := int64(atmpt.CurExecution) - numEx |
if start <= 0 { |
start = 1 |
@@ -134,6 +147,7 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake |
if toLoad <= 0 { |
return nil |
} |
+ reg := distributor.GetRegistry(c) |
dst.Executions = make(map[uint32]*dm.Execution, toLoad) |
ds := datastore.Get(c) |
q := datastore.NewQuery("Execution").Ancestor(akey).Gte( |
@@ -142,12 +156,16 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake |
if c.Err() != nil { |
return datastore.Stop |
} |
- dst.Executions[e.ID] = e.ToProto(includeID) |
+ p := e.ToProto(includeID) |
+ if includeURL { |
+ loadInfoURL(c, reg, p.Data.DistributorInfo) |
+ } |
+ dst.Executions[e.ID] = p |
return nil |
}) |
} |
-func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
+func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState string, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error { |
return func() error { |
ds := datastore.Get(c) |
if auth != nil && !authedForResult { |
@@ -162,13 +180,14 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult |
dst.Partial.Result = dm.Attempt_Partial_NOT_AUTHORIZED |
return err |
} |
- if !exist { |
+ if !exist.Any() { |
dnj (Google)
2016/06/09 18:00:54
nit: I would use "All" here. Obviously either work
iannucci
2016/06/15 00:45:58
yeah changed w/ rebarse
|
dst.Partial.Result = dm.Attempt_Partial_NOT_AUTHORIZED |
return nil |
} |
} |
- if !lim.PossiblyOK(rsltSize) { |
+ siz := rsltSize + uint32(len(persistentState)) |
+ if !lim.PossiblyOK(siz) { |
dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
logging.Infof(c, "skipping load of AttemptResult %s (size limit)", aid) |
return nil |
@@ -178,8 +197,9 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult |
logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult") |
return err |
} |
- if lim.Add(r.Size) { |
+ if lim.Add(siz) { |
dst.Data.GetFinished().JsonResult = r.Data |
+ dst.Data.GetFinished().PersistentStateResult = persistentState |
dst.Partial.Result = dm.Attempt_Partial_LOADED |
} else { |
dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT |
@@ -203,18 +223,32 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
} |
return err |
} |
- if !req.Include.ExpiredAttempts && atmpt.Expired { |
+ if !req.Include.ExpiredAttempts && atmpt.ResultExpired { |
return nil |
} |
+ |
+ persistentState := "" |
if req.Include.AttemptData { |
dst.Data = atmpt.DataProto() |
dst.Partial.Data = false |
+ if fin := dst.Data.GetFinished(); fin != nil { |
+ // if we're including data and finished, we only add the persistentState |
+ // if we could see the attempt result. Save it off here, and restore it |
+ // in attemptResultLoader, only if we're able to load the actual result. |
+ // |
+ // This is done because for some jobs the persistentState is |
+ // almost-as-good as the actual result, and we want to prevent |
+ // false/accidental dependencies where a job is able to 'depend' on the |
+ // results without actually emitting a dependency on them. |
+ persistentState = fin.PersistentStateResult |
+ fin.PersistentStateResult = "" |
+ } |
} |
errChan := parallel.Run(0, func(ch chan<- func() error) { |
if req.Include.AttemptResult { |
if atmpt.State == dm.Attempt_FINISHED { |
- ch <- attemptResultLoader(c, aid, authedForResult, atmpt.ResultSize, lim, akey, req.Auth, dst) |
+ ch <- attemptResultLoader(c, aid, authedForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst) |
} else { |
dst.Partial.Result = dm.Attempt_Partial_LOADED |
} |
@@ -222,7 +256,7 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
if req.Include.NumExecutions > 0 { |
ch <- func() error { |
- err := loadExecutions(c, req.Include.ObjectIds, atmpt, akey, int64(req.Include.NumExecutions), dst) |
+ err := loadExecutions(c, req.Include.ObjectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutions), dst) |
if err != nil { |
logging.Fields{ek: err, "aid": aid}.Errorf(c, "error loading executions") |
} else { |
@@ -332,6 +366,12 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID, |
// ) |
// } |
func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, err error) { |
+ if req.Auth != nil { |
+ logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of") |
+ } else { |
+ logging.Debugf(c, "for user") |
+ } |
+ |
cncl := (func())(nil) |
timeoutProto := req.Limit.MaxTime |
timeout := timeoutProto.Duration() // .Duration on nil is OK |
@@ -418,7 +458,6 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph |
// assume that contextualized logging already happened |
case n := <-nodeChan: |
- logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.depth}.Infof(c, "got node") |
qst, ok := rsp.GetQuest(n.aid.Quest) |
if !ok { |
if !req.Include.ObjectIds { |
@@ -456,6 +495,5 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph |
if c.Err() != nil { |
rsp.HadMore = true |
} |
- |
return |
} |