Index: appengine/cmd/dm/mutate/timeout_execution.go |
diff --git a/appengine/cmd/dm/mutate/timeout_execution.go b/appengine/cmd/dm/mutate/timeout_execution.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..76ee16f8b0406dcfdf0891ecf6ff83b4f162f06e |
--- /dev/null |
+++ b/appengine/cmd/dm/mutate/timeout_execution.go |
@@ -0,0 +1,148 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package mutate |
+ |
+import ( |
+ "fmt" |
+ "time" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
+ "github.com/luci/luci-go/appengine/cmd/dm/model" |
+ "github.com/luci/luci-go/appengine/tumble" |
+ dm "github.com/luci/luci-go/common/api/dm/service/v1" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/logging" |
+) |
+ |
+// TimeoutExecution is a named mutation which triggers on a delay. If the |
+// execution is in the noted state when the trigger hits, this sets the |
+// Execution to have an AbnormalFinish status of TIMED_OUT. |
+type TimeoutExecution struct { |
+ For *dm.Execution_ID |
+ State dm.Execution_State |
+ // TimeoutAttempt is the number of attempts to stop a STOPPING execution, |
+ // since this potentially requires an RPC to the distributor to enact. |
+ TimeoutAttempt uint |
+ Deadline time.Time |
+} |
+ |
+const maxTimeoutAttempts = 3 |
+ |
+var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) |
+ |
+// Root implements tumble.Mutation |
+func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { |
+ return model.AttemptKeyFromID(c, t.For.AttemptID()) |
+} |
+ |
+// RollForward implements tumble.Mutation |
+func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
+ e := model.ExecutionFromID(c, t.For) |
+ |
+ ds := datastore.Get(c) |
+ if err = ds.Get(e); err != nil { |
+ return |
+ } |
+ if e.State != t.State { |
+ return |
+ } |
+ |
+ // will be overwritten if this execution is STOPPING and the timeout is not |
+ // abnormal |
+ rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ |
+ Reason: fmt.Sprintf("DM timeout (%s)", e.State), |
+ Status: dm.AbnormalFinish_TIMED_OUT}} |
+ |
+ if e.State == dm.Execution_STOPPING { |
+ // if it's supposed to be STOPPING, maybe we just missed a notification from |
+ // the distributor (or the distributor is not using pubsub). |
+ reg := distributor.GetRegistry(c) |
+ var dist distributor.D |
+ var vers string |
+ dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName) |
+ |
+ if vers != "" && vers != e.DistributorConfigVersion { |
+ logging.Fields{ |
+ "cfg_name": e.DistributorConfigName, |
+ "orig_cfg_vers": e.DistributorConfigVersion, |
+ "cur_cfg_vers": vers, |
+ }.Warningf(c, "mismatched distributor config versions") |
+ } |
+ |
+ if err != nil { |
+ logging.Fields{ |
+ logging.ErrorKey: err, |
+ "cfgName": e.DistributorConfigName, |
+ }.Errorf(c, "Could not MakeDistributor") |
dnj (Google)
2016/06/09 18:00:57
Does this mean that if we ever unregister a Distri
iannucci
2016/06/15 00:46:01
Added todo: need to distinguish luci-config flake
|
+ return |
+ } |
+ var realRslt *distributor.TaskResult |
+ realRslt, err = dist.GetStatus(distributor.Token(e.DistributorToken)) |
+ if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAttempts { |
dnj (Google)
2016/06/09 18:00:57
rslt will never be nil here. Do you mean realRslt?
iannucci
2016/06/15 00:46:01
Done.
|
+ logging.Fields{ |
+ logging.ErrorKey: err, |
+ "task_result": rslt, |
dnj (Google)
2016/06/09 18:00:57
(same here)
iannucci
2016/06/15 00:46:01
Done.
|
+ "timeout_attempt": t.TimeoutAttempt, |
+ }.Infof(c, "GetStatus failed/nop'd while timing out STOPPING execution") |
+ // TODO(riannucci): do randomized exponential backoff instead of constant |
+ // backoff? Kinda don't really want to spend more than 1.5m waiting |
+ // anyway, and the actual GetStatus call does local retries already, so |
+ // hopefully this is fine. If this is wrong, the distributor should adjust |
+ // its timeToStop value to be better. |
+ t.Deadline = t.Deadline.Add(time.Second * 30) |
+ t.TimeoutAttempt++ |
+ err = nil |
+ muts = append(muts, t) |
dnj (Google)
2016/06/09 18:00:56
I think you'll like new Tumble API for this one :)
iannucci
2016/06/15 00:46:01
Acknowledged.
|
+ return |
+ } |
+ |
+ if err != nil { |
dnj (Google)
2016/06/09 18:00:57
Can this ever be hit? You handle "err != nil" abov
iannucci
2016/06/15 00:46:01
I was missing some parens above.
|
+ rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s) w/ error: %s", e.State, err) |
+ err = nil |
+ } else if realRslt != nil { |
+ rslt = realRslt |
+ } |
+ } |
+ |
+ muts = append(muts, &FinishExecution{t.For, rslt}) |
+ return |
+} |
+ |
+// ProcessAfter implements tumble.DelayedMutation |
+func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } |
+ |
+// HighPriority implements tumble.DelayedMutation |
+func (t *TimeoutExecution) HighPriority() bool { return false } |
+ |
+// ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the |
+// Execution's State to determine which timeout should be set, if any. If no |
+// timeout should be active, this will cancel any existing timeouts for this |
+// Execution. |
+func ResetExecutionTimeout(c context.Context, e *model.Execution) error { |
+ howLong := time.Duration(0) |
+ switch e.State { |
+ case dm.Execution_SCHEDULING: |
+ howLong = e.TimeToStart |
+ case dm.Execution_RUNNING: |
+ howLong = e.TimeToRun |
+ case dm.Execution_STOPPING: |
+ howLong = e.TimeToStop |
+ } |
+ eid := e.GetEID() |
+ key := model.ExecutionKeyFromID(c, eid) |
+ if howLong == 0 { |
+ return tumble.CancelNamedMutations(c, key, "timeout") |
+ } |
+ return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ |
+ "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC().Add(howLong)}, |
+ }) |
+} |
+ |
+func init() { |
+ tumble.Register((*TimeoutExecution)(nil)) |
+} |