Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Side by Side Diff: appengine/cmd/dm/mutate/finish_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/mutate/finish_attempt_test.go ('k') | appengine/cmd/dm/mutate/merge_quest.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package mutate
6
7 import (
8 "fmt"
9
10 "golang.org/x/net/context"
11
12 "github.com/luci/gae/filter/txnBuf"
13 "github.com/luci/gae/service/datastore"
14
15 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
16 "github.com/luci/luci-go/appengine/cmd/dm/model"
17 "github.com/luci/luci-go/appengine/tumble"
18 dm "github.com/luci/luci-go/common/api/dm/service/v1"
19 )
20
21 // FinishExecution records the final state of the Execution, and advances the
22 // Attempt state machine.
23 type FinishExecution struct {
24 EID *dm.Execution_ID
25 Result *distributor.TaskResult
26 }
27
28 // Root implements tumble.Mutation
29 func (f *FinishExecution) Root(c context.Context) *datastore.Key {
30 return model.ExecutionKeyFromID(c, f.EID)
31 }
32
33 // shouldRetry loads the quest for this attempt, to determine if the attempt can
34 // be retried. As a side-effect, it increments the RetryState counter for the
35 // indicated failure type.
36 //
37 // If stat is not a retryable AbnormalFinish_Status, this will panic.
38 func shouldRetry(c context.Context, a *model.Attempt, stat dm.AbnormalFinish_Sta tus) (retry bool, err error) {
39 if !stat.CouldRetry() {
40 return
41 }
42 q := model.QuestFromID(a.ID.Quest)
43 dsNoTxn := txnBuf.GetNoTxn(c)
44 if err = dsNoTxn.Get(q); err != nil {
45 return
46 }
47 var cur, max uint32
48 switch stat {
49 case dm.AbnormalFinish_FAILED:
50 cur, max = a.RetryState.Failed, q.Desc.Meta.Retry.Failed
51 a.RetryState.Failed++
52 case dm.AbnormalFinish_CRASHED:
53 cur, max = a.RetryState.Crashed, q.Desc.Meta.Retry.Crashed
54 a.RetryState.Crashed++
55 case dm.AbnormalFinish_EXPIRED:
56 cur, max = a.RetryState.Expired, q.Desc.Meta.Retry.Expired
57 a.RetryState.Expired++
58 case dm.AbnormalFinish_TIMED_OUT:
59 cur, max = a.RetryState.TimedOut, q.Desc.Meta.Retry.TimedOut
60 a.RetryState.TimedOut++
61 default:
62 panic(fmt.Errorf("do not know how to retry %q", stat))
63 }
64 retry = cur < max
65 return
66 }
67
68 // RollForward implements tumble.Mutation
69 func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation , err error) {
70 a := model.AttemptFromID(f.EID.AttemptID())
71 e := model.ExecutionFromID(c, f.EID)
72
73 ds := datastore.Get(c)
74 if err = ds.Get(a, e); err != nil {
75 return
76 }
77
78 if a.State != dm.Attempt_EXECUTING || a.CurExecution != f.EID.Id || e.St ate.Terminal() {
79 return
80 }
81
82 if f.Result.AbnormalFinish == nil && e.State != dm.Execution_STOPPING {
83 f.Result.AbnormalFinish = &dm.AbnormalFinish{
84 Status: dm.AbnormalFinish_FAILED,
85 Reason: fmt.Sprintf("distributor finished execution whil e it was in the %s state.", e.State),
86 }
87 }
88
89 if ab := f.Result.AbnormalFinish; ab != nil {
90 if err = e.ModifyState(c, dm.Execution_ABNORMAL_FINISHED); err ! = nil {
91 return
92 }
93 e.AbnormalFinish = *ab
94
95 var retry bool
96 if retry, err = shouldRetry(c, a, ab.Status); err != nil {
97 return
98 } else if retry {
99 if err = a.ModifyState(c, dm.Attempt_SCHEDULING); err != nil {
100 return
101 }
102 a.DepMap.Reset()
103 muts = append(muts, &ScheduleExecution{&a.ID})
104 } else {
105 // ran out of retries, or non-retriable error type
106 if err = a.ModifyState(c, dm.Attempt_ABNORMAL_FINISHED); err != nil {
107 return
108 }
109 a.AbnormalFinish = *ab
110 }
111 } else {
112 if err = e.ModifyState(c, dm.Execution_FINISHED); err != nil {
113 return
114 }
115 e.ResultPersistentState = f.Result.PersistentState
116
117 a.PersistentState = f.Result.PersistentState
118 a.RetryState.Reset()
119
120 if a.DepMap.Size() > 0 {
121 if err = a.ModifyState(c, dm.Attempt_WAITING); err != ni l {
122 return
123 }
124 } else {
125 if err = a.ModifyState(c, dm.Attempt_FINISHED); err != n il {
126 return
127 }
128 muts = append(muts, &RecordCompletion{f.EID.AttemptID()} )
129 }
130 }
131
132 // best-effort reset execution timeout
133 _ = ResetExecutionTimeout(c, e)
134
135 err = ds.Put(a, e)
136 return
137 }
138
139 // FinishExecutionFn is the implementation of distributor.FinishExecutionFn.
140 // It's defined here to avoid a circular dependency.
141 func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *distributo r.TaskResult) ([]tumble.Mutation, error) {
142 return []tumble.Mutation{&FinishExecution{EID: eid, Result: rslt}}, nil
143 }
144
145 // NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation
146 // with some abnomal result.
147 func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_S tatus, reason string) *FinishExecution {
148 return &FinishExecution{
149 eid, &distributor.TaskResult{
150 AbnormalFinish: &dm.AbnormalFinish{
151 Status: status, Reason: reason}}}
152 }
153
154 func init() {
155 tumble.Register((*FinishExecution)(nil))
156 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/mutate/finish_attempt_test.go ('k') | appengine/cmd/dm/mutate/merge_quest.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698