Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Side by Side Diff: appengine/cmd/dm/mutate/finish_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package mutate
6
7 import (
8 "fmt"
9 "golang.org/x/net/context"
10
11 "github.com/luci/gae/filter/txnBuf"
12 "github.com/luci/gae/service/datastore"
13
14 "github.com/luci/luci-go/appengine/tumble"
15 dm "github.com/luci/luci-go/common/api/dm/service/v1"
16
17 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
18 "github.com/luci/luci-go/appengine/cmd/dm/model"
19 )
20
21 // FinishExecution records the final state of the Execution, and advances the
22 // Attempt state machine.
23 type FinishExecution struct {
iannucci 2016/06/08 02:54:24 this is the major state machine change. Previously
24 EID *dm.Execution_ID
25 Result *distributor.TaskResult
26 }
27
28 // Root implements tumble.Mutation
29 func (f *FinishExecution) Root(c context.Context) *datastore.Key {
30 return model.ExecutionKeyFromID(c, f.EID)
31 }
32
33 // shouldRetry loads the quest for this attempt, to determine if the attempt can
34 // be retried. As a side-effect, it increments the RetryState counter for the
35 // indicated failure type.
36 //
37 // If stat is not a retryable AbnormalFinish_Status, this will panic.
38 func shouldRetry(c context.Context, a *model.Attempt, stat dm.AbnormalFinish_Sta tus) (retry bool, err error) {
39 if !stat.CouldRetry() {
40 return
41 }
42 q := model.QuestFromID(a.ID.Quest)
43 dsNoTxn := txnBuf.GetNoTxn(c)
44 if err = dsNoTxn.Get(q); err != nil {
45 return
46 }
47 var cur, max uint32
48 switch stat {
49 case dm.AbnormalFinish_FAILED:
50 cur, max = a.RetryState.Failed, q.Desc.Meta.Retry.Failed
51 a.RetryState.Failed++
52 case dm.AbnormalFinish_CRASHED:
53 cur, max = a.RetryState.Crashed, q.Desc.Meta.Retry.Crashed
54 a.RetryState.Crashed++
55 case dm.AbnormalFinish_EXPIRED:
56 cur, max = a.RetryState.Expired, q.Desc.Meta.Retry.Expired
57 a.RetryState.Expired++
58 case dm.AbnormalFinish_TIMED_OUT:
59 cur, max = a.RetryState.TimedOut, q.Desc.Meta.Retry.TimedOut
60 a.RetryState.TimedOut++
61 default:
62 panic(fmt.Errorf("do not know how to retry %q", stat))
63 }
64 retry = cur < max
65 return
66 }
67
68 // RollForward implements tumble.Mutation
69 func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation , err error) {
70 a := model.AttemptFromID(f.EID.AttemptID())
71 e := model.ExecutionFromID(c, f.EID)
72
73 ds := datastore.Get(c)
74 if err = ds.GetMulti([]interface{}{a, e}); err != nil {
75 return
76 }
77
78 if a.State == dm.Attempt_EXECUTING && a.CurExecution == f.EID.Id && !e.S tate.Terminal() {
dnj (Google) 2016/06/09 18:00:56 For indentation purposes, maybe invert this and re
iannucci 2016/06/15 00:46:01 Done.
79 if f.Result.AbnormalFinish == nil && e.State != dm.Execution_STO PPING {
80 f.Result.AbnormalFinish = &dm.AbnormalFinish{
81 Status: dm.AbnormalFinish_FAILED,
82 Reason: fmt.Sprintf("distributor finished execut ion while it was in the %s state.", e.State),
83 }
84 }
85
86 if ab := f.Result.AbnormalFinish; ab != nil {
87 if err = e.ModifyState(c, dm.Execution_ABNORMAL_FINISHED ); err != nil {
88 return
89 }
90 e.AbnormalFinish = *ab
91
92 var retry bool
93 if retry, err = shouldRetry(c, a, ab.Status); err != nil {
94 return
95 } else if retry {
96 if err = a.ModifyState(c, dm.Attempt_SCHEDULING) ; err != nil {
97 return
98 }
99 a.DepMap.Reset()
100 muts = append(muts, &ScheduleExecution{&a.ID})
101 } else {
102 // ran out of retries, or non-retriable error ty pe
103 if err = a.ModifyState(c, dm.Attempt_ABNORMAL_FI NISHED); err != nil {
104 return
105 }
106 a.AbnormalFinish = *ab
107 }
108 } else {
109 if err = e.ModifyState(c, dm.Execution_FINISHED); err != nil {
110 return
111 }
112 e.ResultPersistentState = string(f.Result.PersistentStat e)
113
114 a.PersistentState = string(f.Result.PersistentState)
115 a.RetryState.Reset()
116
117 if a.DepMap.Size() > 0 {
118 if err = a.ModifyState(c, dm.Attempt_WAITING); e rr != nil {
119 return
120 }
121 } else {
122 if err = a.ModifyState(c, dm.Attempt_FINISHED); err != nil {
123 return
124 }
125 muts = append(muts, &RecordCompletion{f.EID.Atte mptID()})
126 }
127 }
128
129 // best-effort reset execution timeout
130 _ = ResetExecutionTimeout(c, e)
131
132 err = ds.PutMulti([]interface{}{a, e})
133 }
134 return
135 }
136
137 // FinishExecutionFn is the implementation of distributor.FinishExecutionFn.
138 // It's defined here to avoid a circular dependency.
139 func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *distributo r.TaskResult) ([]tumble.Mutation, error) {
140 return []tumble.Mutation{&FinishExecution{EID: eid, Result: rslt}}, nil
141 }
142
143 // NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation
144 // with some abnomal result.
145 func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_S tatus, reason string) *FinishExecution {
146 return &FinishExecution{
147 eid, &distributor.TaskResult{
148 AbnormalFinish: &dm.AbnormalFinish{
149 Status: status, Reason: reason}}}
150 }
151
152 func init() {
153 tumble.Register((*FinishExecution)(nil))
154 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698