Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(132)

Side by Side Diff: appengine/cmd/dm/mutate/schedule_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package mutate 5 package mutate
6 6
7 import ( 7 import (
8 "fmt"
9
10 "github.com/luci/gae/filter/txnBuf"
8 "github.com/luci/gae/service/datastore" 11 "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
9 "github.com/luci/luci-go/appengine/cmd/dm/model" 13 "github.com/luci/luci-go/appengine/cmd/dm/model"
10 "github.com/luci/luci-go/appengine/tumble" 14 "github.com/luci/luci-go/appengine/tumble"
11 "github.com/luci/luci-go/common/api/dm/service/v1" 15 "github.com/luci/luci-go/common/api/dm/service/v1"
16 "github.com/luci/luci-go/common/errors"
17 "github.com/luci/luci-go/common/logging"
12 "golang.org/x/net/context" 18 "golang.org/x/net/context"
13 ) 19 )
14 20
15 // ScheduleExecution is a placeholder mutation that will be an entry into the 21 // ScheduleExecution is a placeholder mutation that will be an entry into the
16 // Distributor scheduling state-machine. 22 // Distributor scheduling state-machine.
17 type ScheduleExecution struct { 23 type ScheduleExecution struct {
18 For *dm.Attempt_ID 24 For *dm.Attempt_ID
19 } 25 }
20 26
21 // Root implements tumble.Mutation 27 // Root implements tumble.Mutation
22 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { 28 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key {
23 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) 29 » return model.AttemptKeyFromID(c, s.For)
24 } 30 }
25 31
26 // RollForward implements tumble.Mutation 32 // RollForward implements tumble.Mutation
27 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) { 33 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati on, err error) {
iannucci 2016/06/08 02:54:24 this is the other beefish change: now when a Sched
34 ds := datastore.Get(c)
35 a := model.AttemptFromID(s.For)
36 if err = ds.Get(a); err != nil {
37 return
38 }
39
40 if a.State != dm.Attempt_SCHEDULING {
41 return
42 }
43
44 q := model.QuestFromID(s.For.Quest)
45 if err = txnBuf.GetNoTxn(c).Get(q); err != nil {
46 return
47 }
48
49 reg := distributor.GetRegistry(c)
50 dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName)
51 if err != nil {
52 return
53 }
54
55 a.CurExecution++
56 if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil {
57 return
58 }
59
60 eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution)
61 e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver)
62
63 exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token}
64
65 var distTok distributor.Token
66 distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run(
dnj (Google) 2016/06/09 18:00:56 Since it's possible for the transaction to fail, i
iannucci 2016/06/15 00:46:01 This is documented in the distributor package, but
67 distributor.NewTaskDescription(c, &q.Desc, exAuth,
68 distributor.PersistentState(a.PersistentState)))
69 e.DistributorToken = string(distTok)
70 if err != nil {
71 if errors.IsTransient(err) {
72 // tumble will retry us later
73 logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution")
74 return
75 }
76 origErr := err
77
78 // put a and e to the transaction buffer, so that
79 // FinishExecution.RollForward can see them.
80 if err = ds.PutMulti([]interface{}{a, e}); err != nil {
81 return
82 }
83 return NewFinishExecutionAbnormal(
84 eid, dm.AbnormalFinish_REJECTED,
85 fmt.Sprintf("rejected during scheduling with non-transie nt error: %s", origErr),
86 ).RollForward(c)
87 }
88
89 if err = ResetExecutionTimeout(c, e); err != nil {
90 return
91 }
92
93 err = ds.PutMulti([]interface{}{a, e})
28 return 94 return
29 } 95 }
30 96
31 func init() { 97 func init() {
32 tumble.Register((*ScheduleExecution)(nil)) 98 tumble.Register((*ScheduleExecution)(nil))
33 } 99 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698