Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package mutate | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "time" | |
| 10 | |
| 11 "golang.org/x/net/context" | |
| 12 | |
| 13 "github.com/luci/gae/service/datastore" | |
| 14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
| 16 "github.com/luci/luci-go/appengine/tumble" | |
| 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" | |
| 18 "github.com/luci/luci-go/common/clock" | |
| 19 "github.com/luci/luci-go/common/logging" | |
| 20 ) | |
| 21 | |
| 22 // TimeoutExecution is a named mutation which triggers on a delay. If the | |
| 23 // execution is in the noted state when the trigger hits, this sets the | |
| 24 // Execution to have an AbnormalFinish status of TIMED_OUT. | |
| 25 type TimeoutExecution struct { | |
| 26 For *dm.Execution_ID | |
| 27 State dm.Execution_State | |
| 28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution , | |
| 29 // since this potentially requires an RPC to the distributor to enact. | |
| 30 TimeoutAttempt uint | |
| 31 Deadline time.Time | |
| 32 } | |
| 33 | |
| 34 const maxTimeoutAttempts = 3 | |
| 35 | |
| 36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) | |
| 37 | |
| 38 // Root implements tumble.Mutation | |
| 39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { | |
| 40 return model.AttemptKeyFromID(c, t.For.AttemptID()) | |
| 41 } | |
| 42 | |
| 43 // RollForward implements tumble.Mutation | |
| 44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) { | |
| 45 e := model.ExecutionFromID(c, t.For) | |
| 46 | |
| 47 ds := datastore.Get(c) | |
| 48 if err = ds.Get(e); err != nil { | |
| 49 return | |
| 50 } | |
| 51 if e.State != t.State { | |
| 52 return | |
| 53 } | |
| 54 | |
| 55 // will be overwritten if this execution is STOPPING and the timeout is not | |
| 56 // abnormal | |
| 57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ | |
| 58 Reason: fmt.Sprintf("DM timeout (%s)", e.State), | |
| 59 Status: dm.AbnormalFinish_TIMED_OUT}} | |
| 60 | |
| 61 if e.State == dm.Execution_STOPPING { | |
| 62 // if it's supposed to be STOPPING, maybe we just missed a notif ication from | |
| 63 // the distributor (or the distributor is not using pubsub). | |
| 64 reg := distributor.GetRegistry(c) | |
| 65 var dist distributor.D | |
| 66 var vers string | |
| 67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName ) | |
| 68 | |
| 69 if vers != "" && vers != e.DistributorConfigVersion { | |
| 70 logging.Fields{ | |
| 71 "cfg_name": e.DistributorConfigName, | |
| 72 "orig_cfg_vers": e.DistributorConfigVersion, | |
| 73 "cur_cfg_vers": vers, | |
| 74 }.Warningf(c, "mismatched distributor config versions") | |
| 75 } | |
| 76 | |
| 77 if err != nil { | |
| 78 logging.Fields{ | |
| 79 logging.ErrorKey: err, | |
| 80 "cfgName": e.DistributorConfigName, | |
| 81 }.Errorf(c, "Could not MakeDistributor") | |
|
dnj (Google)
2016/06/09 18:00:57
Does this mean that if we ever unregister a Distri
iannucci
2016/06/15 00:46:01
Added todo: need to distinguish luci-config flake
| |
| 82 return | |
| 83 } | |
| 84 var realRslt *distributor.TaskResult | |
| 85 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo ken)) | |
| 86 if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAtt empts { | |
|
dnj (Google)
2016/06/09 18:00:57
rslt will never be nil here. Do you mean realRslt?
iannucci
2016/06/15 00:46:01
Done.
| |
| 87 logging.Fields{ | |
| 88 logging.ErrorKey: err, | |
| 89 "task_result": rslt, | |
|
dnj (Google)
2016/06/09 18:00:57
(same here)
iannucci
2016/06/15 00:46:01
Done.
| |
| 90 "timeout_attempt": t.TimeoutAttempt, | |
| 91 }.Infof(c, "GetStatus failed/nop'd while timing out STOP PING execution") | |
| 92 // TODO(riannucci): do randomized exponential backoff in stead of constant | |
| 93 // backoff? Kinda don't really want to spend more than 1 .5m waiting | |
| 94 // anyway, and the actual GetStatus call does local retr ies already, so | |
| 95 // hopefully this is fine. If this is wrong, the distrib utor should adjust | |
| 96 // its timeToStop value to be better. | |
| 97 t.Deadline = t.Deadline.Add(time.Second * 30) | |
| 98 t.TimeoutAttempt++ | |
| 99 err = nil | |
| 100 muts = append(muts, t) | |
|
dnj (Google)
2016/06/09 18:00:56
I think you'll like new Tumble API for this one :)
iannucci
2016/06/15 00:46:01
Acknowledged.
| |
| 101 return | |
| 102 } | |
| 103 | |
| 104 if err != nil { | |
|
dnj (Google)
2016/06/09 18:00:57
Can this ever be hit? You handle "err != nil" abov
iannucci
2016/06/15 00:46:01
I was missing some parens above.
| |
| 105 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s ) w/ error: %s", e.State, err) | |
| 106 err = nil | |
| 107 } else if realRslt != nil { | |
| 108 rslt = realRslt | |
| 109 } | |
| 110 } | |
| 111 | |
| 112 muts = append(muts, &FinishExecution{t.For, rslt}) | |
| 113 return | |
| 114 } | |
| 115 | |
| 116 // ProcessAfter implements tumble.DelayedMutation | |
| 117 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } | |
| 118 | |
| 119 // HighPriority implements tumble.DelayedMutation | |
| 120 func (t *TimeoutExecution) HighPriority() bool { return false } | |
| 121 | |
| 122 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the | |
| 123 // Execution's State to determine which timeout should be set, if any. If no | |
| 124 // timeout should be active, this will cancel any existing timeouts for this | |
| 125 // Execution. | |
| 126 func ResetExecutionTimeout(c context.Context, e *model.Execution) error { | |
| 127 howLong := time.Duration(0) | |
| 128 switch e.State { | |
| 129 case dm.Execution_SCHEDULING: | |
| 130 howLong = e.TimeToStart | |
| 131 case dm.Execution_RUNNING: | |
| 132 howLong = e.TimeToRun | |
| 133 case dm.Execution_STOPPING: | |
| 134 howLong = e.TimeToStop | |
| 135 } | |
| 136 eid := e.GetEID() | |
| 137 key := model.ExecutionKeyFromID(c, eid) | |
| 138 if howLong == 0 { | |
| 139 return tumble.CancelNamedMutations(c, key, "timeout") | |
| 140 } | |
| 141 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ | |
| 142 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC() .Add(howLong)}, | |
| 143 }) | |
| 144 } | |
| 145 | |
| 146 func init() { | |
| 147 tumble.Register((*TimeoutExecution)(nil)) | |
| 148 } | |
| OLD | NEW |