Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3208)

Unified Diff: appengine/cmd/dm/deps/walk_graph.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/deps/tmp_get_execution.go ('k') | appengine/cmd/dm/deps/walk_graph_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/deps/walk_graph.go
diff --git a/appengine/cmd/dm/deps/walk_graph.go b/appengine/cmd/dm/deps/walk_graph.go
index ca581c1c3455148932993d2cb504a3ebc54abd6d..19666801c1232ede5631b34817fc2ed25177924a 100644
--- a/appengine/cmd/dm/deps/walk_graph.go
+++ b/appengine/cmd/dm/deps/walk_graph.go
@@ -8,15 +8,17 @@ import (
"fmt"
"time"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+
"github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor"
"github.com/luci/luci-go/appengine/cmd/dm/model"
dm "github.com/luci/luci-go/common/api/dm/service/v1"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/grpcutil"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/parallel"
- "golang.org/x/net/context"
- "google.golang.org/grpc/codes"
)
const numWorkers = 16
@@ -46,7 +48,7 @@ func runAttemptListQuery(c context.Context, includeExpired bool, send func(*dm.A
if len(anum.Nums) == 0 {
qry := model.QueryAttemptsForQuest(c, qst)
if !includeExpired {
- qry = qry.Eq("Expired", false)
+ qry = qry.Eq("ResultExpired", false)
}
err := datastore.Get(c).Run(qry, func(k *datastore.Key) error {
aid := &dm.Attempt_ID{}
@@ -125,7 +127,18 @@ func loadEdges(c context.Context, send func(*dm.Attempt_ID) error, typ string, b
})
}
-func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error {
+func maybeLoadInfoURL(c context.Context, reg distributor.Registry, dat *dm.Execution_Data_DistributorInfo) {
+ d, _, err := reg.MakeDistributor(c, dat.ConfigName)
+ if err != nil {
+ logging.Fields{
+ ek: err, "cfgName": dat.ConfigName,
+ }.Errorf(c, "unable to load distributor")
+ } else {
+ dat.Url = d.InfoURL(distributor.Token(dat.Token))
+ }
+}
+
+func loadExecutions(c context.Context, includeID, includeURL bool, atmpt *model.Attempt, akey *datastore.Key, numEx int64, dst *dm.Attempt) error {
start := int64(atmpt.CurExecution) - numEx
if start <= 0 {
start = 1
@@ -134,6 +147,7 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake
if toLoad <= 0 {
return nil
}
+ reg := distributor.GetRegistry(c)
dst.Executions = make(map[uint32]*dm.Execution, toLoad)
ds := datastore.Get(c)
q := datastore.NewQuery("Execution").Ancestor(akey).Gte(
@@ -142,12 +156,16 @@ func loadExecutions(c context.Context, includeID bool, atmpt *model.Attempt, ake
if c.Err() != nil {
return datastore.Stop
}
- dst.Executions[e.ID] = e.ToProto(includeID)
+ p := e.ToProto(includeID)
+ if includeURL {
+ maybeLoadInfoURL(c, reg, p.Data.DistributorInfo)
+ }
+ dst.Executions[e.ID] = p
return nil
})
}
-func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error {
+func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult bool, persistentState []byte, rsltSize uint32, lim *sizeLimit, akey *datastore.Key, auth *dm.Execution_Auth, dst *dm.Attempt) func() error {
return func() error {
ds := datastore.Get(c)
if auth != nil && !authedForResult {
@@ -168,7 +186,8 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult
}
}
- if !lim.PossiblyOK(rsltSize) {
+ siz := rsltSize + uint32(len(persistentState))
+ if !lim.PossiblyOK(siz) {
dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
logging.Infof(c, "skipping load of AttemptResult %s (size limit)", aid)
return nil
@@ -178,8 +197,9 @@ func attemptResultLoader(c context.Context, aid *dm.Attempt_ID, authedForResult
logging.Fields{ek: err, "aid": aid}.Errorf(c, "failed to load AttemptResult")
return err
}
- if lim.Add(r.Size) {
+ if lim.Add(siz) {
dst.Data.GetFinished().JsonResult = r.Data
+ dst.Data.GetFinished().PersistentStateResult = persistentState
dst.Partial.Result = dm.Attempt_Partial_LOADED
} else {
dst.Partial.Result = dm.Attempt_Partial_DATA_SIZE_LIMIT
@@ -203,18 +223,32 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
}
return err
}
- if !req.Include.ExpiredAttempts && atmpt.Expired {
+ if !req.Include.ExpiredAttempts && atmpt.ResultExpired {
return nil
}
+
+ persistentState := []byte(nil)
if req.Include.AttemptData {
dst.Data = atmpt.DataProto()
dst.Partial.Data = false
+ if fin := dst.Data.GetFinished(); fin != nil {
+ // if we're including data and finished, we only add the persistentState
+ // if we could see the attempt result. Save it off here, and restore it
+ // in attemptResultLoader, only if we're able to load the actual result.
+ //
+ // This is done because for some jobs the persistentState is
+ // almost-as-good as the actual result, and we want to prevent
+ // false/accidental dependencies where a job is able to 'depend' on the
+ // results without actually emitting a dependency on them.
+ persistentState = fin.PersistentStateResult
+ fin.PersistentStateResult = nil
+ }
}
errChan := parallel.Run(0, func(ch chan<- func() error) {
if req.Include.AttemptResult {
if atmpt.State == dm.Attempt_FINISHED {
- ch <- attemptResultLoader(c, aid, authedForResult, atmpt.ResultSize, lim, akey, req.Auth, dst)
+ ch <- attemptResultLoader(c, aid, authedForResult, persistentState, atmpt.ResultSize, lim, akey, req.Auth, dst)
} else {
dst.Partial.Result = dm.Attempt_Partial_LOADED
}
@@ -222,7 +256,7 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
if req.Include.NumExecutions > 0 {
ch <- func() error {
- err := loadExecutions(c, req.Include.ObjectIds, atmpt, akey, int64(req.Include.NumExecutions), dst)
+ err := loadExecutions(c, req.Include.ObjectIds, req.Include.ExecutionInfoUrl, atmpt, akey, int64(req.Include.NumExecutions), dst)
if err != nil {
logging.Fields{ek: err, "aid": aid}.Errorf(c, "error loading executions")
} else {
@@ -332,6 +366,12 @@ func attemptLoader(c context.Context, req *dm.WalkGraphReq, aid *dm.Attempt_ID,
// )
// }
func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.GraphData, err error) {
+ if req.Auth != nil {
+ logging.Fields{"execution": req.Auth.Id}.Debugf(c, "on behalf of")
+ } else {
+ logging.Debugf(c, "for user")
+ }
+
cncl := (func())(nil)
timeoutProto := req.Limit.MaxTime
timeout := timeoutProto.Duration() // .Duration on nil is OK
@@ -418,7 +458,6 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
// assume that contextualized logging already happened
case n := <-nodeChan:
- logging.Fields{"aid": n.aid.DMEncoded(), "depth": n.depth}.Infof(c, "got node")
qst, ok := rsp.GetQuest(n.aid.Quest)
if !ok {
if !req.Include.ObjectIds {
@@ -456,6 +495,5 @@ func (d *deps) WalkGraph(c context.Context, req *dm.WalkGraphReq) (rsp *dm.Graph
if c.Err() != nil {
rsp.HadMore = true
}
-
return
}
« no previous file with comments | « appengine/cmd/dm/deps/tmp_get_execution.go ('k') | appengine/cmd/dm/deps/walk_graph_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698