 Chromium Code Reviews
 Chromium Code Reviews Issue 1537883002:
  Initial distributor implementation  (Closed) 
  Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
    
  
    Issue 1537883002:
  Initial distributor implementation  (Closed) 
  Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master| Index: appengine/cmd/dm/mutate/finish_execution.go | 
| diff --git a/appengine/cmd/dm/mutate/finish_execution.go b/appengine/cmd/dm/mutate/finish_execution.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..ecea9d59962d3a4f50719cf2b801f78c55854edb | 
| --- /dev/null | 
| +++ b/appengine/cmd/dm/mutate/finish_execution.go | 
| @@ -0,0 +1,154 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package mutate | 
| + | 
| +import ( | 
| + "fmt" | 
| + "golang.org/x/net/context" | 
| + | 
| + "github.com/luci/gae/filter/txnBuf" | 
| + "github.com/luci/gae/service/datastore" | 
| + | 
| + "github.com/luci/luci-go/appengine/tumble" | 
| + dm "github.com/luci/luci-go/common/api/dm/service/v1" | 
| + | 
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor" | 
| + "github.com/luci/luci-go/appengine/cmd/dm/model" | 
| +) | 
| + | 
| +// FinishExecution records the final state of the Execution, and advances the | 
| +// Attempt state machine. | 
| +type FinishExecution struct { | 
| 
iannucci
2016/06/08 02:54:24
this is the major state machine change. Previously
 | 
| + EID *dm.Execution_ID | 
| + Result *distributor.TaskResult | 
| +} | 
| + | 
| +// Root implements tumble.Mutation | 
| +func (f *FinishExecution) Root(c context.Context) *datastore.Key { | 
| + return model.ExecutionKeyFromID(c, f.EID) | 
| +} | 
| + | 
| +// shouldRetry loads the quest for this attempt, to determine if the attempt can | 
| +// be retried. As a side-effect, it increments the RetryState counter for the | 
| +// indicated failure type. | 
| +// | 
| +// If stat is not a retryable AbnormalFinish_Status, this will panic. | 
| +func shouldRetry(c context.Context, a *model.Attempt, stat dm.AbnormalFinish_Status) (retry bool, err error) { | 
| + if !stat.CouldRetry() { | 
| + return | 
| + } | 
| + q := model.QuestFromID(a.ID.Quest) | 
| + dsNoTxn := txnBuf.GetNoTxn(c) | 
| + if err = dsNoTxn.Get(q); err != nil { | 
| + return | 
| + } | 
| + var cur, max uint32 | 
| + switch stat { | 
| + case dm.AbnormalFinish_FAILED: | 
| + cur, max = a.RetryState.Failed, q.Desc.Meta.Retry.Failed | 
| + a.RetryState.Failed++ | 
| + case dm.AbnormalFinish_CRASHED: | 
| + cur, max = a.RetryState.Crashed, q.Desc.Meta.Retry.Crashed | 
| + a.RetryState.Crashed++ | 
| + case dm.AbnormalFinish_EXPIRED: | 
| + cur, max = a.RetryState.Expired, q.Desc.Meta.Retry.Expired | 
| + a.RetryState.Expired++ | 
| + case dm.AbnormalFinish_TIMED_OUT: | 
| + cur, max = a.RetryState.TimedOut, q.Desc.Meta.Retry.TimedOut | 
| + a.RetryState.TimedOut++ | 
| + default: | 
| + panic(fmt.Errorf("do not know how to retry %q", stat)) | 
| + } | 
| + retry = cur < max | 
| + return | 
| +} | 
| + | 
| +// RollForward implements tumble.Mutation | 
| +func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { | 
| + a := model.AttemptFromID(f.EID.AttemptID()) | 
| + e := model.ExecutionFromID(c, f.EID) | 
| + | 
| + ds := datastore.Get(c) | 
| + if err = ds.GetMulti([]interface{}{a, e}); err != nil { | 
| + return | 
| + } | 
| + | 
| + if a.State == dm.Attempt_EXECUTING && a.CurExecution == f.EID.Id && !e.State.Terminal() { | 
| 
dnj (Google)
2016/06/09 18:00:56
For indentation purposes, maybe invert this and re
 
iannucci
2016/06/15 00:46:01
Done.
 | 
| + if f.Result.AbnormalFinish == nil && e.State != dm.Execution_STOPPING { | 
| + f.Result.AbnormalFinish = &dm.AbnormalFinish{ | 
| + Status: dm.AbnormalFinish_FAILED, | 
| + Reason: fmt.Sprintf("distributor finished execution while it was in the %s state.", e.State), | 
| + } | 
| + } | 
| + | 
| + if ab := f.Result.AbnormalFinish; ab != nil { | 
| + if err = e.ModifyState(c, dm.Execution_ABNORMAL_FINISHED); err != nil { | 
| + return | 
| + } | 
| + e.AbnormalFinish = *ab | 
| + | 
| + var retry bool | 
| + if retry, err = shouldRetry(c, a, ab.Status); err != nil { | 
| + return | 
| + } else if retry { | 
| + if err = a.ModifyState(c, dm.Attempt_SCHEDULING); err != nil { | 
| + return | 
| + } | 
| + a.DepMap.Reset() | 
| + muts = append(muts, &ScheduleExecution{&a.ID}) | 
| + } else { | 
| + // ran out of retries, or non-retriable error type | 
| + if err = a.ModifyState(c, dm.Attempt_ABNORMAL_FINISHED); err != nil { | 
| + return | 
| + } | 
| + a.AbnormalFinish = *ab | 
| + } | 
| + } else { | 
| + if err = e.ModifyState(c, dm.Execution_FINISHED); err != nil { | 
| + return | 
| + } | 
| + e.ResultPersistentState = string(f.Result.PersistentState) | 
| + | 
| + a.PersistentState = string(f.Result.PersistentState) | 
| + a.RetryState.Reset() | 
| + | 
| + if a.DepMap.Size() > 0 { | 
| + if err = a.ModifyState(c, dm.Attempt_WAITING); err != nil { | 
| + return | 
| + } | 
| + } else { | 
| + if err = a.ModifyState(c, dm.Attempt_FINISHED); err != nil { | 
| + return | 
| + } | 
| + muts = append(muts, &RecordCompletion{f.EID.AttemptID()}) | 
| + } | 
| + } | 
| + | 
| + // best-effort reset execution timeout | 
| + _ = ResetExecutionTimeout(c, e) | 
| + | 
| + err = ds.PutMulti([]interface{}{a, e}) | 
| + } | 
| + return | 
| +} | 
| + | 
| +// FinishExecutionFn is the implementation of distributor.FinishExecutionFn. | 
| +// It's defined here to avoid a circular dependency. | 
| +func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *distributor.TaskResult) ([]tumble.Mutation, error) { | 
| + return []tumble.Mutation{&FinishExecution{EID: eid, Result: rslt}}, nil | 
| +} | 
| + | 
| +// NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation | 
| +// with some abnomal result. | 
| +func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution { | 
| + return &FinishExecution{ | 
| + eid, &distributor.TaskResult{ | 
| + AbnormalFinish: &dm.AbnormalFinish{ | 
| + Status: status, Reason: reason}}} | 
| +} | 
| + | 
| +func init() { | 
| + tumble.Register((*FinishExecution)(nil)) | 
| +} |