| Index: appengine/cmd/dm/mutate/schedule_execution.go
|
| diff --git a/appengine/cmd/dm/mutate/schedule_execution.go b/appengine/cmd/dm/mutate/schedule_execution.go
|
| index 8b4e4d3f435bca5fa0c52933af1d2b2c81083f6e..11a24003f8f6c06adb1b52c8e2d472c0ec89ef12 100644
|
| --- a/appengine/cmd/dm/mutate/schedule_execution.go
|
| +++ b/appengine/cmd/dm/mutate/schedule_execution.go
|
| @@ -5,10 +5,16 @@
|
| package mutate
|
|
|
| import (
|
| + "fmt"
|
| +
|
| + "github.com/luci/gae/filter/txnBuf"
|
| "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor"
|
| "github.com/luci/luci-go/appengine/cmd/dm/model"
|
| "github.com/luci/luci-go/appengine/tumble"
|
| "github.com/luci/luci-go/common/api/dm/service/v1"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/logging"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| @@ -20,11 +26,71 @@ type ScheduleExecution struct {
|
|
|
| // Root implements tumble.Mutation
|
| func (s *ScheduleExecution) Root(c context.Context) *datastore.Key {
|
| - return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For})
|
| + return model.AttemptKeyFromID(c, s.For)
|
| }
|
|
|
| // RollForward implements tumble.Mutation
|
| func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
|
| + ds := datastore.Get(c)
|
| + a := model.AttemptFromID(s.For)
|
| + if err = ds.Get(a); err != nil {
|
| + return
|
| + }
|
| +
|
| + if a.State != dm.Attempt_SCHEDULING {
|
| + return
|
| + }
|
| +
|
| + q := model.QuestFromID(s.For.Quest)
|
| + if err = txnBuf.GetNoTxn(c).Get(q); err != nil {
|
| + return
|
| + }
|
| +
|
| + reg := distributor.GetRegistry(c)
|
| + dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName)
|
| + if err != nil {
|
| + return
|
| + }
|
| +
|
| + a.CurExecution++
|
| + if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil {
|
| + return
|
| + }
|
| +
|
| + eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution)
|
| + e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver)
|
| +
|
| + exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token}
|
| +
|
| + var distTok distributor.Token
|
| + distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run(
|
| + distributor.NewTaskDescription(c, &q.Desc, exAuth,
|
| + distributor.PersistentState(a.PersistentState)))
|
| + e.DistributorToken = string(distTok)
|
| + if err != nil {
|
| + if errors.IsTransient(err) {
|
| + // tumble will retry us later
|
| + logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution")
|
| + return
|
| + }
|
| + origErr := err
|
| +
|
| + // put a and e to the transaction buffer, so that
|
| + // FinishExecution.RollForward can see them.
|
| + if err = ds.Put(a, e); err != nil {
|
| + return
|
| + }
|
| + return NewFinishExecutionAbnormal(
|
| + eid, dm.AbnormalFinish_REJECTED,
|
| + fmt.Sprintf("rejected during scheduling with non-transient error: %s", origErr),
|
| + ).RollForward(c)
|
| + }
|
| +
|
| + if err = ResetExecutionTimeout(c, e); err != nil {
|
| + return
|
| + }
|
| +
|
| + err = ds.Put(a, e)
|
| return
|
| }
|
|
|
|
|