Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package mutate | 5 package mutate |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | |
| 9 | |
| 10 "github.com/luci/gae/filter/txnBuf" | |
| 8 "github.com/luci/gae/service/datastore" | 11 "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/cmd/dm/distributor" | |
| 9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 13 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 10 "github.com/luci/luci-go/appengine/tumble" | 14 "github.com/luci/luci-go/appengine/tumble" |
| 11 "github.com/luci/luci-go/common/api/dm/service/v1" | 15 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 16 "github.com/luci/luci-go/common/errors" | |
| 17 "github.com/luci/luci-go/common/logging" | |
| 12 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
| 13 ) | 19 ) |
| 14 | 20 |
| 15 // ScheduleExecution is a placeholder mutation that will be an entry into the | 21 // ScheduleExecution is a placeholder mutation that will be an entry into the |
| 16 // Distributor scheduling state-machine. | 22 // Distributor scheduling state-machine. |
| 17 type ScheduleExecution struct { | 23 type ScheduleExecution struct { |
| 18 For *dm.Attempt_ID | 24 For *dm.Attempt_ID |
| 19 } | 25 } |
| 20 | 26 |
| 21 // Root implements tumble.Mutation | 27 // Root implements tumble.Mutation |
| 22 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { | 28 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { |
| 23 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) | 29 » return model.AttemptKeyFromID(c, s.For) |
| 24 } | 30 } |
| 25 | 31 |
| 26 // RollForward implements tumble.Mutation | 32 // RollForward implements tumble.Mutation |
| 27 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) { | 33 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) { |
|
iannucci
2016/06/08 02:54:24
this is the other beefish change: now when a Sched
| |
| 34 ds := datastore.Get(c) | |
| 35 a := model.AttemptFromID(s.For) | |
| 36 if err = ds.Get(a); err != nil { | |
| 37 return | |
| 38 } | |
| 39 | |
| 40 if a.State != dm.Attempt_SCHEDULING { | |
| 41 return | |
| 42 } | |
| 43 | |
| 44 q := model.QuestFromID(s.For.Quest) | |
| 45 if err = txnBuf.GetNoTxn(c).Get(q); err != nil { | |
| 46 return | |
| 47 } | |
| 48 | |
| 49 reg := distributor.GetRegistry(c) | |
| 50 dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName) | |
| 51 if err != nil { | |
| 52 return | |
| 53 } | |
| 54 | |
| 55 a.CurExecution++ | |
| 56 if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil { | |
| 57 return | |
| 58 } | |
| 59 | |
| 60 eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution) | |
| 61 e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver) | |
| 62 | |
| 63 exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token} | |
| 64 | |
| 65 var distTok distributor.Token | |
| 66 distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run( | |
|
dnj (Google)
2016/06/09 18:00:56
Since it's possible for the transaction to fail, i
iannucci
2016/06/15 00:46:01
This is documented in the distributor package, but
| |
| 67 distributor.NewTaskDescription(c, &q.Desc, exAuth, | |
| 68 distributor.PersistentState(a.PersistentState))) | |
| 69 e.DistributorToken = string(distTok) | |
| 70 if err != nil { | |
| 71 if errors.IsTransient(err) { | |
| 72 // tumble will retry us later | |
| 73 logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution") | |
| 74 return | |
| 75 } | |
| 76 origErr := err | |
| 77 | |
| 78 // put a and e to the transaction buffer, so that | |
| 79 // FinishExecution.RollForward can see them. | |
| 80 if err = ds.PutMulti([]interface{}{a, e}); err != nil { | |
| 81 return | |
| 82 } | |
| 83 return NewFinishExecutionAbnormal( | |
| 84 eid, dm.AbnormalFinish_REJECTED, | |
| 85 fmt.Sprintf("rejected during scheduling with non-transie nt error: %s", origErr), | |
| 86 ).RollForward(c) | |
| 87 } | |
| 88 | |
| 89 if err = ResetExecutionTimeout(c, e); err != nil { | |
| 90 return | |
| 91 } | |
| 92 | |
| 93 err = ds.PutMulti([]interface{}{a, e}) | |
| 28 return | 94 return |
| 29 } | 95 } |
| 30 | 96 |
| 31 func init() { | 97 func init() { |
| 32 tumble.Register((*ScheduleExecution)(nil)) | 98 tumble.Register((*ScheduleExecution)(nil)) |
| 33 } | 99 } |
| OLD | NEW |