OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package mutate | |
6 | |
7 import ( | |
8 "fmt" | |
9 "time" | |
10 | |
11 "golang.org/x/net/context" | |
12 | |
13 "github.com/luci/gae/service/datastore" | |
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
15 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
16 "github.com/luci/luci-go/appengine/tumble" | |
17 dm "github.com/luci/luci-go/common/api/dm/service/v1" | |
18 "github.com/luci/luci-go/common/clock" | |
19 "github.com/luci/luci-go/common/logging" | |
20 ) | |
21 | |
22 // TimeoutExecution is a named mutation which triggers on a delay. If the | |
23 // execution is in the noted state when the trigger hits, this sets the | |
24 // Execution to have an AbnormalFinish status of TIMED_OUT. | |
25 type TimeoutExecution struct { | |
26 For *dm.Execution_ID | |
27 State dm.Execution_State | |
28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution , | |
29 // since this potentially requires an RPC to the distributor to enact. | |
30 TimeoutAttempt uint | |
31 Deadline time.Time | |
32 } | |
33 | |
34 const maxTimeoutAttempts = 3 | |
35 | |
36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) | |
37 | |
38 // Root implements tumble.Mutation | |
39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { | |
40 return model.AttemptKeyFromID(c, t.For.AttemptID()) | |
41 } | |
42 | |
43 // RollForward implements tumble.Mutation | |
44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) { | |
45 e := model.ExecutionFromID(c, t.For) | |
46 | |
47 ds := datastore.Get(c) | |
48 if err = ds.Get(e); err != nil { | |
49 return | |
50 } | |
51 if e.State != t.State { | |
52 return | |
53 } | |
54 | |
55 // will be overwritten if this execution is STOPPING and the timeout is not | |
56 // abnormal | |
57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ | |
58 Reason: fmt.Sprintf("DM timeout (%s)", e.State), | |
59 Status: dm.AbnormalFinish_TIMED_OUT}} | |
60 | |
61 if e.State == dm.Execution_STOPPING { | |
62 // if it's supposed to be STOPPING, maybe we just missed a notif ication from | |
63 // the distributor (or the distributor is not using pubsub). | |
64 reg := distributor.GetRegistry(c) | |
65 var dist distributor.D | |
66 var vers string | |
67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName ) | |
68 | |
69 if vers != "" && vers != e.DistributorConfigVersion { | |
70 logging.Fields{ | |
71 "cfg_name": e.DistributorConfigName, | |
72 "orig_cfg_vers": e.DistributorConfigVersion, | |
73 "cur_cfg_vers": vers, | |
74 }.Warningf(c, "mismatched distributor config versions") | |
75 } | |
76 | |
77 if err != nil { | |
78 logging.Fields{ | |
79 logging.ErrorKey: err, | |
80 "cfgName": e.DistributorConfigName, | |
81 }.Errorf(c, "Could not MakeDistributor") | |
dnj (Google)
2016/06/09 18:00:57
Does this mean that if we ever unregister a Distri
iannucci
2016/06/15 00:46:01
Added todo: need to distinguish luci-config flake
| |
82 return | |
83 } | |
84 var realRslt *distributor.TaskResult | |
85 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo ken)) | |
86 if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAtt empts { | |
dnj (Google)
2016/06/09 18:00:57
rslt will never be nil here. Do you mean realRslt?
iannucci
2016/06/15 00:46:01
Done.
| |
87 logging.Fields{ | |
88 logging.ErrorKey: err, | |
89 "task_result": rslt, | |
dnj (Google)
2016/06/09 18:00:57
(same here)
iannucci
2016/06/15 00:46:01
Done.
| |
90 "timeout_attempt": t.TimeoutAttempt, | |
91 }.Infof(c, "GetStatus failed/nop'd while timing out STOP PING execution") | |
92 // TODO(riannucci): do randomized exponential backoff in stead of constant | |
93 // backoff? Kinda don't really want to spend more than 1 .5m waiting | |
94 // anyway, and the actual GetStatus call does local retr ies already, so | |
95 // hopefully this is fine. If this is wrong, the distrib utor should adjust | |
96 // its timeToStop value to be better. | |
97 t.Deadline = t.Deadline.Add(time.Second * 30) | |
98 t.TimeoutAttempt++ | |
99 err = nil | |
100 muts = append(muts, t) | |
dnj (Google)
2016/06/09 18:00:56
I think you'll like new Tumble API for this one :)
iannucci
2016/06/15 00:46:01
Acknowledged.
| |
101 return | |
102 } | |
103 | |
104 if err != nil { | |
dnj (Google)
2016/06/09 18:00:57
Can this ever be hit? You handle "err != nil" abov
iannucci
2016/06/15 00:46:01
I was missing some parens above.
| |
105 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s ) w/ error: %s", e.State, err) | |
106 err = nil | |
107 } else if realRslt != nil { | |
108 rslt = realRslt | |
109 } | |
110 } | |
111 | |
112 muts = append(muts, &FinishExecution{t.For, rslt}) | |
113 return | |
114 } | |
115 | |
116 // ProcessAfter implements tumble.DelayedMutation | |
117 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } | |
118 | |
119 // HighPriority implements tumble.DelayedMutation | |
120 func (t *TimeoutExecution) HighPriority() bool { return false } | |
121 | |
122 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the | |
123 // Execution's State to determine which timeout should be set, if any. If no | |
124 // timeout should be active, this will cancel any existing timeouts for this | |
125 // Execution. | |
126 func ResetExecutionTimeout(c context.Context, e *model.Execution) error { | |
127 howLong := time.Duration(0) | |
128 switch e.State { | |
129 case dm.Execution_SCHEDULING: | |
130 howLong = e.TimeToStart | |
131 case dm.Execution_RUNNING: | |
132 howLong = e.TimeToRun | |
133 case dm.Execution_STOPPING: | |
134 howLong = e.TimeToStop | |
135 } | |
136 eid := e.GetEID() | |
137 key := model.ExecutionKeyFromID(c, eid) | |
138 if howLong == 0 { | |
139 return tumble.CancelNamedMutations(c, key, "timeout") | |
140 } | |
141 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ | |
142 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC() .Add(howLong)}, | |
143 }) | |
144 } | |
145 | |
146 func init() { | |
147 tumble.Register((*TimeoutExecution)(nil)) | |
148 } | |
OLD | NEW |