Index: appengine/cmd/dm/mutate/finish_execution.go |
diff --git a/appengine/cmd/dm/mutate/finish_execution.go b/appengine/cmd/dm/mutate/finish_execution.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ecea9d59962d3a4f50719cf2b801f78c55854edb |
--- /dev/null |
+++ b/appengine/cmd/dm/mutate/finish_execution.go |
@@ -0,0 +1,154 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package mutate |
+ |
+import ( |
+ "fmt" |
+ "golang.org/x/net/context" |
+ |
+ "github.com/luci/gae/filter/txnBuf" |
+ "github.com/luci/gae/service/datastore" |
+ |
+ "github.com/luci/luci-go/appengine/tumble" |
+ dm "github.com/luci/luci-go/common/api/dm/service/v1" |
+ |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
+ "github.com/luci/luci-go/appengine/cmd/dm/model" |
+) |
+ |
+// FinishExecution records the final state of the Execution, and advances the |
+// Attempt state machine. |
+type FinishExecution struct { |
iannucci
2016/06/08 02:54:24
this is the major state machine change. Previously
|
+ EID *dm.Execution_ID |
+ Result *distributor.TaskResult |
+} |
+ |
+// Root implements tumble.Mutation |
+func (f *FinishExecution) Root(c context.Context) *datastore.Key { |
+ return model.ExecutionKeyFromID(c, f.EID) |
+} |
+ |
+// shouldRetry loads the quest for this attempt, to determine if the attempt can |
+// be retried. As a side-effect, it increments the RetryState counter for the |
+// indicated failure type. |
+// |
+// If stat is not a retryable AbnormalFinish_Status, this will panic. |
+func shouldRetry(c context.Context, a *model.Attempt, stat dm.AbnormalFinish_Status) (retry bool, err error) { |
+ if !stat.CouldRetry() { |
+ return |
+ } |
+ q := model.QuestFromID(a.ID.Quest) |
+ dsNoTxn := txnBuf.GetNoTxn(c) |
+ if err = dsNoTxn.Get(q); err != nil { |
+ return |
+ } |
+ var cur, max uint32 |
+ switch stat { |
+ case dm.AbnormalFinish_FAILED: |
+ cur, max = a.RetryState.Failed, q.Desc.Meta.Retry.Failed |
+ a.RetryState.Failed++ |
+ case dm.AbnormalFinish_CRASHED: |
+ cur, max = a.RetryState.Crashed, q.Desc.Meta.Retry.Crashed |
+ a.RetryState.Crashed++ |
+ case dm.AbnormalFinish_EXPIRED: |
+ cur, max = a.RetryState.Expired, q.Desc.Meta.Retry.Expired |
+ a.RetryState.Expired++ |
+ case dm.AbnormalFinish_TIMED_OUT: |
+ cur, max = a.RetryState.TimedOut, q.Desc.Meta.Retry.TimedOut |
+ a.RetryState.TimedOut++ |
+ default: |
+ panic(fmt.Errorf("do not know how to retry %q", stat)) |
+ } |
+ retry = cur < max |
+ return |
+} |
+ |
+// RollForward implements tumble.Mutation |
+func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
+ a := model.AttemptFromID(f.EID.AttemptID()) |
+ e := model.ExecutionFromID(c, f.EID) |
+ |
+ ds := datastore.Get(c) |
+ if err = ds.GetMulti([]interface{}{a, e}); err != nil { |
+ return |
+ } |
+ |
+ if a.State == dm.Attempt_EXECUTING && a.CurExecution == f.EID.Id && !e.State.Terminal() { |
dnj (Google)
2016/06/09 18:00:56
For indentation purposes, maybe invert this and re
iannucci
2016/06/15 00:46:01
Done.
|
+ if f.Result.AbnormalFinish == nil && e.State != dm.Execution_STOPPING { |
+ f.Result.AbnormalFinish = &dm.AbnormalFinish{ |
+ Status: dm.AbnormalFinish_FAILED, |
+ Reason: fmt.Sprintf("distributor finished execution while it was in the %s state.", e.State), |
+ } |
+ } |
+ |
+ if ab := f.Result.AbnormalFinish; ab != nil { |
+ if err = e.ModifyState(c, dm.Execution_ABNORMAL_FINISHED); err != nil { |
+ return |
+ } |
+ e.AbnormalFinish = *ab |
+ |
+ var retry bool |
+ if retry, err = shouldRetry(c, a, ab.Status); err != nil { |
+ return |
+ } else if retry { |
+ if err = a.ModifyState(c, dm.Attempt_SCHEDULING); err != nil { |
+ return |
+ } |
+ a.DepMap.Reset() |
+ muts = append(muts, &ScheduleExecution{&a.ID}) |
+ } else { |
+ // ran out of retries, or non-retriable error type |
+ if err = a.ModifyState(c, dm.Attempt_ABNORMAL_FINISHED); err != nil { |
+ return |
+ } |
+ a.AbnormalFinish = *ab |
+ } |
+ } else { |
+ if err = e.ModifyState(c, dm.Execution_FINISHED); err != nil { |
+ return |
+ } |
+ e.ResultPersistentState = string(f.Result.PersistentState) |
+ |
+ a.PersistentState = string(f.Result.PersistentState) |
+ a.RetryState.Reset() |
+ |
+ if a.DepMap.Size() > 0 { |
+ if err = a.ModifyState(c, dm.Attempt_WAITING); err != nil { |
+ return |
+ } |
+ } else { |
+ if err = a.ModifyState(c, dm.Attempt_FINISHED); err != nil { |
+ return |
+ } |
+ muts = append(muts, &RecordCompletion{f.EID.AttemptID()}) |
+ } |
+ } |
+ |
+ // best-effort reset execution timeout |
+ _ = ResetExecutionTimeout(c, e) |
+ |
+ err = ds.PutMulti([]interface{}{a, e}) |
+ } |
+ return |
+} |
+ |
+// FinishExecutionFn is the implementation of distributor.FinishExecutionFn. |
+// It's defined here to avoid a circular dependency. |
+func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *distributor.TaskResult) ([]tumble.Mutation, error) { |
+ return []tumble.Mutation{&FinishExecution{EID: eid, Result: rslt}}, nil |
+} |
+ |
+// NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation |
+// with some abnomal result. |
+func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution { |
+ return &FinishExecution{ |
+ eid, &distributor.TaskResult{ |
+ AbnormalFinish: &dm.AbnormalFinish{ |
+ Status: status, Reason: reason}}} |
+} |
+ |
+func init() { |
+ tumble.Register((*FinishExecution)(nil)) |
+} |