Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(138)

Side by Side Diff: appengine/cmd/dm/mutate/timeout_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/mutate/schedule_execution_test.go ('k') | appengine/cmd/dm/notes » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package mutate
6
7 import (
8 "fmt"
9 "time"
10
11 "golang.org/x/net/context"
12
13 "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
15 "github.com/luci/luci-go/appengine/cmd/dm/model"
16 "github.com/luci/luci-go/appengine/tumble"
17 dm "github.com/luci/luci-go/common/api/dm/service/v1"
18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/logging"
20 )
21
22 // TimeoutExecution is a named mutation which triggers on a delay. If the
23 // execution is in the noted state when the trigger hits, this sets the
24 // Execution to have an AbnormalFinish status of TIMED_OUT.
25 type TimeoutExecution struct {
26 For *dm.Execution_ID
27 State dm.Execution_State
28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution ,
29 // since this potentially requires an RPC to the distributor to enact.
30 TimeoutAttempt uint
31 Deadline time.Time
32 }
33
34 const maxTimeoutAttempts = 3
35
36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil)
37
38 // Root implements tumble.Mutation
39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key {
40 return model.AttemptKeyFromID(c, t.For.AttemptID())
41 }
42
43 // RollForward implements tumble.Mutation
44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) {
45 e := model.ExecutionFromID(c, t.For)
46
47 ds := datastore.Get(c)
48 if err = ds.Get(e); err != nil {
49 return
50 }
51 if e.State != t.State {
52 return
53 }
54
55 // will be overwritten if this execution is STOPPING and the timeout is not
56 // abnormal
57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{
58 Reason: fmt.Sprintf("DM timeout (%s)", e.State),
59 Status: dm.AbnormalFinish_TIMED_OUT}}
60
61 if e.State == dm.Execution_STOPPING {
62 // if it's supposed to be STOPPING, maybe we just missed a notif ication from
63 // the distributor (or the distributor is not using pubsub).
64 reg := distributor.GetRegistry(c)
65 var dist distributor.D
66 var vers string
67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName )
68
69 if vers != "" && vers != e.DistributorConfigVersion {
70 logging.Fields{
71 "cfg_name": e.DistributorConfigName,
72 "orig_cfg_vers": e.DistributorConfigVersion,
73 "cur_cfg_vers": vers,
74 }.Warningf(c, "mismatched distributor config versions")
75 }
76
77 // TODO(iannucci): make this set the REJECTED state if we loaded the config,
78 // but the distributor no longer exists.
79 if err != nil {
80 logging.Fields{
81 logging.ErrorKey: err,
82 "cfgName": e.DistributorConfigName,
83 }.Errorf(c, "Could not MakeDistributor")
84 return
85 }
86 var realRslt *distributor.TaskResult
87 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo ken))
88 if (err != nil || realRslt == nil) && t.TimeoutAttempt < maxTime outAttempts {
89 logging.Fields{
90 logging.ErrorKey: err,
91 "task_result": realRslt,
92 "timeout_attempt": t.TimeoutAttempt,
93 }.Infof(c, "GetStatus failed/nop'd while timing out STOP PING execution")
94 // TODO(riannucci): do randomized exponential backoff in stead of constant
95 // backoff? Kinda don't really want to spend more than 1 .5m waiting
96 // anyway, and the actual GetStatus call does local retr ies already, so
97 // hopefully this is fine. If this is wrong, the distrib utor should adjust
98 // its timeToStop value to be better.
99 t.Deadline = t.Deadline.Add(time.Second * 30)
100 t.TimeoutAttempt++
101 err = nil
102 muts = append(muts, t)
103 return
104 }
105
106 if err != nil {
107 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s ) w/ error: %s", e.State, err)
108 err = nil
109 } else if realRslt != nil {
110 rslt = realRslt
111 }
112 }
113
114 muts = append(muts, &FinishExecution{t.For, rslt})
115 return
116 }
117
118 // ProcessAfter implements tumble.DelayedMutation
119 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline }
120
121 // HighPriority implements tumble.DelayedMutation
122 func (t *TimeoutExecution) HighPriority() bool { return false }
123
124 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the
125 // Execution's State to determine which timeout should be set, if any. If no
126 // timeout should be active, this will cancel any existing timeouts for this
127 // Execution.
128 func ResetExecutionTimeout(c context.Context, e *model.Execution) error {
129 howLong := time.Duration(0)
130 switch e.State {
131 case dm.Execution_SCHEDULING:
132 howLong = e.TimeToStart
133 case dm.Execution_RUNNING:
134 howLong = e.TimeToRun
135 case dm.Execution_STOPPING:
136 howLong = e.TimeToStop
137 }
138 eid := e.GetEID()
139 key := model.ExecutionKeyFromID(c, eid)
140 if howLong == 0 {
141 return tumble.CancelNamedMutations(c, key, "timeout")
142 }
143 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{
144 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC() .Add(howLong)},
145 })
146 }
147
148 func init() {
149 tumble.Register((*TimeoutExecution)(nil))
150 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/mutate/schedule_execution_test.go ('k') | appengine/cmd/dm/notes » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698