OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | |
9 | |
10 "github.com/luci/gae/filter/txnBuf" | |
8 "github.com/luci/gae/service/datastore" | 11 "github.com/luci/gae/service/datastore" |
12 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 13 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 14 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 15 "github.com/luci/luci-go/common/api/dm/service/v1" |
16 "github.com/luci/luci-go/common/errors" | |
17 "github.com/luci/luci-go/common/logging" | |
12 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
13 ) | 19 ) |
14 | 20 |
15 // ScheduleExecution is a placeholder mutation that will be an entry into the | 21 // ScheduleExecution is a placeholder mutation that will be an entry into the |
16 // Distributor scheduling state-machine. | 22 // Distributor scheduling state-machine. |
17 type ScheduleExecution struct { | 23 type ScheduleExecution struct { |
18 For *dm.Attempt_ID | 24 For *dm.Attempt_ID |
19 } | 25 } |
20 | 26 |
21 // Root implements tumble.Mutation | 27 // Root implements tumble.Mutation |
22 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { | 28 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { |
23 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) | 29 » return model.AttemptKeyFromID(c, s.For) |
24 } | 30 } |
25 | 31 |
26 // RollForward implements tumble.Mutation | 32 // RollForward implements tumble.Mutation |
27 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) { | 33 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) { |
iannucci
2016/06/08 02:54:24
this is the other beefish change: now when a Sched
| |
34 ds := datastore.Get(c) | |
35 a := model.AttemptFromID(s.For) | |
36 if err = ds.Get(a); err != nil { | |
37 return | |
38 } | |
39 | |
40 if a.State != dm.Attempt_SCHEDULING { | |
41 return | |
42 } | |
43 | |
44 q := model.QuestFromID(s.For.Quest) | |
45 if err = txnBuf.GetNoTxn(c).Get(q); err != nil { | |
46 return | |
47 } | |
48 | |
49 reg := distributor.GetRegistry(c) | |
50 dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName) | |
51 if err != nil { | |
52 return | |
53 } | |
54 | |
55 a.CurExecution++ | |
56 if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil { | |
57 return | |
58 } | |
59 | |
60 eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution) | |
61 e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver) | |
62 | |
63 exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token} | |
64 | |
65 var distTok distributor.Token | |
66 distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run( | |
dnj (Google)
2016/06/09 18:00:56
Since it's possible for the transaction to fail, i
iannucci
2016/06/15 00:46:01
This is documented in the distributor package, but
| |
67 distributor.NewTaskDescription(c, &q.Desc, exAuth, | |
68 distributor.PersistentState(a.PersistentState))) | |
69 e.DistributorToken = string(distTok) | |
70 if err != nil { | |
71 if errors.IsTransient(err) { | |
72 // tumble will retry us later | |
73 logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution") | |
74 return | |
75 } | |
76 origErr := err | |
77 | |
78 // put a and e to the transaction buffer, so that | |
79 // FinishExecution.RollForward can see them. | |
80 if err = ds.PutMulti([]interface{}{a, e}); err != nil { | |
81 return | |
82 } | |
83 return NewFinishExecutionAbnormal( | |
84 eid, dm.AbnormalFinish_REJECTED, | |
85 fmt.Sprintf("rejected during scheduling with non-transie nt error: %s", origErr), | |
86 ).RollForward(c) | |
87 } | |
88 | |
89 if err = ResetExecutionTimeout(c, e); err != nil { | |
90 return | |
91 } | |
92 | |
93 err = ds.PutMulti([]interface{}{a, e}) | |
28 return | 94 return |
29 } | 95 } |
30 | 96 |
31 func init() { | 97 func init() { |
32 tumble.Register((*ScheduleExecution)(nil)) | 98 tumble.Register((*ScheduleExecution)(nil)) |
33 } | 99 } |
OLD | NEW |