Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2016)

Unified Diff: appengine/cmd/dm/mutate/schedule_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/dm/mutate/schedule_execution.go
diff --git a/appengine/cmd/dm/mutate/schedule_execution.go b/appengine/cmd/dm/mutate/schedule_execution.go
index 8b4e4d3f435bca5fa0c52933af1d2b2c81083f6e..52b3bd549f6e6e446bb2bf2f386c8cb3c681318a 100644
--- a/appengine/cmd/dm/mutate/schedule_execution.go
+++ b/appengine/cmd/dm/mutate/schedule_execution.go
@@ -5,10 +5,16 @@
package mutate
import (
+ "fmt"
+
+ "github.com/luci/gae/filter/txnBuf"
"github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor"
"github.com/luci/luci-go/appengine/cmd/dm/model"
"github.com/luci/luci-go/appengine/tumble"
"github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logging"
"golang.org/x/net/context"
)
@@ -20,11 +26,71 @@ type ScheduleExecution struct {
// Root implements tumble.Mutation
func (s *ScheduleExecution) Root(c context.Context) *datastore.Key {
- return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For})
+ return model.AttemptKeyFromID(c, s.For)
}
// RollForward implements tumble.Mutation
func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
iannucci 2016/06/08 02:54:24 this is the other beefish change: now when a Sched
+ ds := datastore.Get(c)
+ a := model.AttemptFromID(s.For)
+ if err = ds.Get(a); err != nil {
+ return
+ }
+
+ if a.State != dm.Attempt_SCHEDULING {
+ return
+ }
+
+ q := model.QuestFromID(s.For.Quest)
+ if err = txnBuf.GetNoTxn(c).Get(q); err != nil {
+ return
+ }
+
+ reg := distributor.GetRegistry(c)
+ dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName)
+ if err != nil {
+ return
+ }
+
+ a.CurExecution++
+ if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil {
+ return
+ }
+
+ eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution)
+ e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver)
+
+ exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token}
+
+ var distTok distributor.Token
+ distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run(
dnj (Google) 2016/06/09 18:00:56 Since it's possible for the transaction to fail, i
iannucci 2016/06/15 00:46:01 This is documented in the distributor package, but
+ distributor.NewTaskDescription(c, &q.Desc, exAuth,
+ distributor.PersistentState(a.PersistentState)))
+ e.DistributorToken = string(distTok)
+ if err != nil {
+ if errors.IsTransient(err) {
+ // tumble will retry us later
+ logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution")
+ return
+ }
+ origErr := err
+
+ // put a and e to the transaction buffer, so that
+ // FinishExecution.RollForward can see them.
+ if err = ds.PutMulti([]interface{}{a, e}); err != nil {
+ return
+ }
+ return NewFinishExecutionAbnormal(
+ eid, dm.AbnormalFinish_REJECTED,
+ fmt.Sprintf("rejected during scheduling with non-transient error: %s", origErr),
+ ).RollForward(c)
+ }
+
+ if err = ResetExecutionTimeout(c, e); err != nil {
+ return
+ }
+
+ err = ds.PutMulti([]interface{}{a, e})
return
}

Powered by Google App Engine
This is Rietveld 408576698