Index: appengine/cmd/dm/mutate/schedule_execution.go |
diff --git a/appengine/cmd/dm/mutate/schedule_execution.go b/appengine/cmd/dm/mutate/schedule_execution.go |
index 8b4e4d3f435bca5fa0c52933af1d2b2c81083f6e..52b3bd549f6e6e446bb2bf2f386c8cb3c681318a 100644 |
--- a/appengine/cmd/dm/mutate/schedule_execution.go |
+++ b/appengine/cmd/dm/mutate/schedule_execution.go |
@@ -5,10 +5,16 @@ |
package mutate |
import ( |
+ "fmt" |
+ |
+ "github.com/luci/gae/filter/txnBuf" |
"github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
"github.com/luci/luci-go/appengine/cmd/dm/model" |
"github.com/luci/luci-go/appengine/tumble" |
"github.com/luci/luci-go/common/api/dm/service/v1" |
+ "github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/logging" |
"golang.org/x/net/context" |
) |
@@ -20,11 +26,71 @@ type ScheduleExecution struct { |
// Root implements tumble.Mutation |
func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { |
- return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) |
+ return model.AttemptKeyFromID(c, s.For) |
} |
// RollForward implements tumble.Mutation |
func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
iannucci
2016/06/08 02:54:24
this is the other beefish change: now when a Sched
|
+ ds := datastore.Get(c) |
+ a := model.AttemptFromID(s.For) |
+ if err = ds.Get(a); err != nil { |
+ return |
+ } |
+ |
+ if a.State != dm.Attempt_SCHEDULING { |
+ return |
+ } |
+ |
+ q := model.QuestFromID(s.For.Quest) |
+ if err = txnBuf.GetNoTxn(c).Get(q); err != nil { |
+ return |
+ } |
+ |
+ reg := distributor.GetRegistry(c) |
+ dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName) |
+ if err != nil { |
+ return |
+ } |
+ |
+ a.CurExecution++ |
+ if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil { |
+ return |
+ } |
+ |
+ eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution) |
+ e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver) |
+ |
+ exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token} |
+ |
+ var distTok distributor.Token |
+ distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run( |
dnj (Google)
2016/06/09 18:00:56
Since it's possible for the transaction to fail, i
iannucci
2016/06/15 00:46:01
This is documented in the distributor package, but
|
+ distributor.NewTaskDescription(c, &q.Desc, exAuth, |
+ distributor.PersistentState(a.PersistentState))) |
+ e.DistributorToken = string(distTok) |
+ if err != nil { |
+ if errors.IsTransient(err) { |
+ // tumble will retry us later |
+ logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution") |
+ return |
+ } |
+ origErr := err |
+ |
+ // put a and e to the transaction buffer, so that |
+ // FinishExecution.RollForward can see them. |
+ if err = ds.PutMulti([]interface{}{a, e}); err != nil { |
+ return |
+ } |
+ return NewFinishExecutionAbnormal( |
+ eid, dm.AbnormalFinish_REJECTED, |
+ fmt.Sprintf("rejected during scheduling with non-transient error: %s", origErr), |
+ ).RollForward(c) |
+ } |
+ |
+ if err = ResetExecutionTimeout(c, e); err != nil { |
+ return |
+ } |
+ |
+ err = ds.PutMulti([]interface{}{a, e}) |
return |
} |