| Index: appengine/cmd/dm/deps/walk_graph.go
|
| diff --git a/appengine/cmd/dm/deps/walk_graph.go b/appengine/cmd/dm/deps/walk_graph.go
|
| index ca581c1c3455148932993d2cb504a3ebc54abd6d..19666801c1232ede5631b34817fc2ed25177924a 100644
|
| --- a/appengine/cmd/dm/deps/walk_graph.go
|
| +++ b/appengine/cmd/dm/deps/walk_graph.go
|
| @@ -8,15 +8,17 @@ import (
|
| "fmt"
|
| "time"
|
|
|
| + "golang.org/x/net/context"
|
| + "google.golang.org/grpc/codes"
|
| +
|
| "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor"
|
| "github.com/luci/luci-go/appengine/cmd/dm/model"
|
| dm "github.com/luci/luci-go/common/api/dm/service/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/parallel"
|
| - "golang.org/x/net/context"
|
| - "google.golang.org/grpc/codes"
|
| )
|
|
|
| const numWorkers = 16
|
| @@ -46,7 +48,7 @@ func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A
|
| if len(anum.Nums) == 0 {
|
| qry := model.QueryAttemptsForQuest(c, qst)
|
| if !includeExpired {
|
| - qry = qry.Eq("Expired", false)
|
| + qry = qry.Eq("ResultExpired", false)
|
| }
|
| err := datastore.Get(c).Run(qry, func(k *datastore.Key) error {
|
| aid := &dm.Attempt_ID{}
|
| @@ -125,7 +127,18 @@ func loadEdges(c context.Context, send func(*dm.Attempt_ID) error, typ string, b
|
| })
|
| }
|
|
|
| -func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error {
|
| +func maybeLoadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_Data_DistributorInfo) {
|
| + d, _, err := reg.MakeDistributor(c, dat.ConfigName)
|
| + if err != nil {
|
| + logging.Fields{
|
| + ek: err, "cfgName": dat.ConfigName,
|
| + }.Errorf(c, "unable to load distributor")
|
| + } else {
|
| + dat.Url = d.InfoURL(distributor.Token(dat.Token))
|
| + }
|
| +}
|
| +
|
| +func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error {
|
| start := int64(atmpt.CurExecution) - numEx
|
| if start <= 0 {
|
| start = 1
|
| @@ -134,6 +147,7 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake
|
| if toLoad <= 0 {
|
| return nil
|
| }
|
| + reg := distributor.GetRegistry(c)
|
| dst.Executions = make(map[uint32]*dm.Execution, toLoad)
|
| ds := datastore.Get(c)
|
| q := datastore.NewQuery("Execution").Ancestor(akey).Gte(
|
| @@ -142,12 +156,16 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake
|
| if c.Err() != nil {
|
| return datastore.Stop
|
| }
|
| - dst.Executions[e.ID] = e.ToProto(includeID)
|
| + p := e.ToProto(includeID)
|
| + if includeURL {
|
| + maybeLoadInfoURL(c, reg, p.Data.DistributorInfo)
|
| + }
|
| + dst.Executions[e.ID] = p
|
| return nil
|
| })
|
| }
|
|
|
| -func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error {
|
| +func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState []byte, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error {
|
| return func() error {
|
| ds := datastore.Get(c)
|
| if auth != nil && !authedForResult {
|
| @@ -168,7 +186,8 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult
|
| }
|
| }
|
|
|
| - if !lim.PossiblyOK(rsltSize) {
|
| + siz := rsltSize + uint32(len(persistentState))
|
| + if !lim.PossiblyOK(siz) {
|
| dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
|
| logging.Infof(c, "skipping load of AttemptResult %s (size limit)", aid)
|
| return nil
|
| @@ -178,8 +197,9 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult
|
| logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult")
|
| return err
|
| }
|
| - if lim.Add(r.Size) {
|
| + if lim.Add(siz) {
|
| dst.Data.GetFinished().JsonResult = r.Data
|
| + dst.Data.GetFinished().PersistentStateResult = persistentState
|
| dst.Partial.Result = dm.Attempt_Partial_LOADED
|
| } else {
|
| dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
|
| @@ -203,18 +223,32 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
|
| }
|
| return err
|
| }
|
| - if !req.Include.ExpiredAttempts && atmpt.Expired {
|
| + if !req.Include.ExpiredAttempts && atmpt.ResultExpired {
|
| return nil
|
| }
|
| +
|
| + persistentState := []byte(nil)
|
| if req.Include.AttemptData {
|
| dst.Data = atmpt.DataProto()
|
| dst.Partial.Data = false
|
| + if fin := dst.Data.GetFinished(); fin != nil {
|
| + // if we're including data and finished, we only add the persistentState
|
| + // if we could see the attempt result. Save it off here, and restore it
|
| + // in attemptResultLoader, only if we're able to load the actual result.
|
| + //
|
| + // This is done because for some jobs the persistentState is
|
| + // almost-as-good as the actual result, and we want to prevent
|
| + // false/accidental dependencies where a job is able to 'depend' on the
|
| + // results without actually emitting a dependency on them.
|
| + persistentState = fin.PersistentStateResult
|
| + fin.PersistentStateResult = nil
|
| + }
|
| }
|
|
|
| errChan := parallel.Run(0, func(ch chan<- func() error) {
|
| if req.Include.AttemptResult {
|
| if atmpt.State == dm.Attempt_FINISHED {
|
| - ch <- attemptResultLoader(c, aid, authedForResult, atmpt.ResultSize, lim, akey, req.Auth, dst)
|
| + ch <- attemptResultLoader(c, aid, authedForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst)
|
| } else {
|
| dst.Partial.Result = dm.Attempt_Partial_LOADED
|
| }
|
| @@ -222,7 +256,7 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
|
|
|
| if req.Include.NumExecutions > 0 {
|
| ch <- func() error {
|
| - err := loadExecutions(c, req.Include.ObjectIds, atmpt, akey, int64(req.Include.NumExecutions), dst)
|
| + err := loadExecutions(c, req.Include.ObjectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutions), dst)
|
| if err != nil {
|
| logging.Fields{ek: err, "aid": aid}.Errorf(c, "error loading executions")
|
| } else {
|
| @@ -332,6 +366,12 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
|
| // )
|
| // }
|
| func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, err error) {
|
| + if req.Auth != nil {
|
| + logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of")
|
| + } else {
|
| + logging.Debugf(c, "for user")
|
| + }
|
| +
|
| cncl := (func())(nil)
|
| timeoutProto := req.Limit.MaxTime
|
| timeout := timeoutProto.Duration() // .Duration on nil is OK
|
| @@ -418,7 +458,6 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
|
| // assume that contextualized logging already happened
|
|
|
| case n := <-nodeChan:
|
| - logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.depth}.Infof(c, "got node")
|
| qst, ok := rsp.GetQuest(n.aid.Quest)
|
| if !ok {
|
| if !req.Include.ObjectIds {
|
| @@ -456,6 +495,5 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
|
| if c.Err() != nil {
|
| rsp.HadMore = true
|
| }
|
| -
|
| return
|
| }
|
|
|