Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(700)

Side by Side Diff: dm/appengine/distributor/jobsim/distributor.go

Issue 2347973003: Refactor distributor API so that methods always get the Quest_Desc too. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package jobsim 5 package jobsim
6 6
7 import ( 7 import (
8 "encoding/json" 8 "encoding/json"
9 "fmt" 9 "fmt"
10 "io/ioutil" 10 "io/ioutil"
(...skipping 23 matching lines...) Expand all
34 func (j *jobsimDist) jsConfig() *jobsim.Config { 34 func (j *jobsimDist) jsConfig() *jobsim.Config {
35 return j.cfg.Content.(*jobsim.Config) 35 return j.cfg.Content.(*jobsim.Config)
36 } 36 }
37 37
38 func (j *jobsimDist) parsePayload(payload string) (*jobsimExecution, error) { 38 func (j *jobsimDist) parsePayload(payload string) (*jobsimExecution, error) {
39 ret := &jobsimExecution{} 39 ret := &jobsimExecution{}
40 err := jsonpb.UnmarshalString(payload, &ret.Calculation) 40 err := jsonpb.UnmarshalString(payload, &ret.Calculation)
41 return ret, err 41 return ret, err
42 } 42 }
43 43
44 func (j *jobsimDist) Run(tsk *distributor.TaskDescription) (tok distributor.Toke n, _ time.Duration, err error) { 44 func (j *jobsimDist) Run(desc *dm.Quest_Desc, exAuth *dm.Execution_Auth, prev *d m.JsonResult) (tok distributor.Token, _ time.Duration, err error) {
45 // TODO(riannucci): Fix luci-gae so we can truly escape the transaction when 45 // TODO(riannucci): Fix luci-gae so we can truly escape the transaction when
46 // we build the jobsimDist instance. See luci/gae#23. 46 // we build the jobsimDist instance. See luci/gae#23.
47 ds := txnBuf.GetNoTxn(j.c) 47 ds := txnBuf.GetNoTxn(j.c)
48 48
49 logging.Fields{ 49 logging.Fields{
50 » » "eid": tsk.ExecutionAuth().Id, 50 » » "eid": exAuth.Id,
51 }.Infof(j.c, "jobsim: running new task") 51 }.Infof(j.c, "jobsim: running new task")
52 52
53 » jtsk, err := j.parsePayload(tsk.Payload().Parameters) 53 » jtsk, err := j.parsePayload(desc.Parameters)
54 if err != nil { 54 if err != nil {
55 return 55 return
56 } 56 }
57 » jtsk.ExAuth = *tsk.ExecutionAuth() 57 » jtsk.ExAuth = *exAuth
58 jtsk.Status = jobsimRunnable 58 jtsk.Status = jobsimRunnable
59 » jtsk.StateOrReason = tsk.PreviousResult().Object 59 » if prev != nil {
60 » » jtsk.StateOrReason = prev.Object
61 » }
60 jtsk.CfgName = j.cfg.Name 62 jtsk.CfgName = j.cfg.Name
61 63
62 key := []*datastore.Key{ 64 key := []*datastore.Key{
63 ds.MakeKey(datastore.GetMetaDefault(datastore.GetPLS(jtsk), "kin d", ""), 0)} 65 ds.MakeKey(datastore.GetMetaDefault(datastore.GetPLS(jtsk), "kin d", ""), 0)}
64 if err = ds.AllocateIDs(key); err != nil { 66 if err = ds.AllocateIDs(key); err != nil {
65 return 67 return
66 } 68 }
67 69
68 // transactionally commit the job and a taskqueue task to execute it 70 // transactionally commit the job and a taskqueue task to execute it
69 jtsk.ID = fmt.Sprintf("%s|%d", j.jsConfig().Pool, key[0].IntID()) 71 jtsk.ID = fmt.Sprintf("%s|%d", j.jsConfig().Pool, key[0].IntID())
(...skipping 18 matching lines...) Expand all
88 return err 90 return err
89 } 91 }
90 logging.Infof(j.c, "jobsim: EnqueueTask'd") 92 logging.Infof(j.c, "jobsim: EnqueueTask'd")
91 return nil 93 return nil
92 }, nil) 94 }, nil)
93 95
94 tok = distributor.Token(jtsk.ID) 96 tok = distributor.Token(jtsk.ID)
95 return 97 return
96 } 98 }
97 99
98 func (j *jobsimDist) Cancel(tok distributor.Token) error { 100 func (j *jobsimDist) Cancel(_ *dm.Quest_Desc, tok distributor.Token) error {
99 jtsk := &jobsimExecution{ID: string(tok)} 101 jtsk := &jobsimExecution{ID: string(tok)}
100 102
101 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) { 103 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) {
102 if err = ds.Get(jtsk); err != nil { 104 if err = ds.Get(jtsk); err != nil {
103 return 105 return
104 } 106 }
105 if jtsk.Status != jobsimRunnable { 107 if jtsk.Status != jobsimRunnable {
106 return 108 return
107 } 109 }
108 needWrite = true 110 needWrite = true
109 return 111 return
110 } 112 }
111 113
112 ds := datastore.Get(j.c) 114 ds := datastore.Get(j.c)
113 if needWrite, err := cancelBody(ds); err != nil || !needWrite { 115 if needWrite, err := cancelBody(ds); err != nil || !needWrite {
114 return err 116 return err
115 } 117 }
116 118
117 return ds.RunInTransaction(func(c context.Context) error { 119 return ds.RunInTransaction(func(c context.Context) error {
118 ds := datastore.Get(c) 120 ds := datastore.Get(c)
119 if needWrite, err := cancelBody(ds); err != nil || !needWrite { 121 if needWrite, err := cancelBody(ds); err != nil || !needWrite {
120 return err 122 return err
121 } 123 }
122 jtsk.Status = jobsimCancelled 124 jtsk.Status = jobsimCancelled
123 return ds.Put(jtsk) 125 return ds.Put(jtsk)
124 }, nil) 126 }, nil)
125 } 127 }
126 128
127 func (j *jobsimDist) GetStatus(tok distributor.Token) (*dm.Result, error) { 129 func (j *jobsimDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Res ult, error) {
128 jtsk, err := loadTask(j.c, string(tok)) 130 jtsk, err := loadTask(j.c, string(tok))
129 if err != nil { 131 if err != nil {
130 return nil, err 132 return nil, err
131 } 133 }
132 134
133 return getAttemptResult(jtsk.Status, jtsk.StateOrReason), nil 135 return getAttemptResult(jtsk.Status, jtsk.StateOrReason), nil
134 } 136 }
135 137
136 func (j *jobsimDist) InfoURL(tok distributor.Token) string { 138 func (j *jobsimDist) InfoURL(tok distributor.Token) string {
137 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.Name, j.cfg.Versio n, tok) 139 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.Name, j.cfg.Versio n, tok)
138 } 140 }
139 141
140 func (j *jobsimDist) HandleNotification(note *distributor.Notification) (*dm.Res ult, error) { 142 func (j *jobsimDist) HandleNotification(_ *dm.Quest_Desc, note *distributor.Noti fication) (*dm.Result, error) {
141 n := &notification{} 143 n := &notification{}
142 err := json.Unmarshal(note.Data, n) 144 err := json.Unmarshal(note.Data, n)
143 if err != nil { 145 if err != nil {
144 return nil, err 146 return nil, err
145 } 147 }
146 148
147 return getAttemptResult(n.Status, n.StateOrReason), nil 149 return getAttemptResult(n.Status, n.StateOrReason), nil
148 } 150 }
149 151
150 func loadTask(c context.Context, rawTok string) (*jobsimExecution, error) { 152 func loadTask(c context.Context, rawTok string) (*jobsimExecution, error) {
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
235 return err 237 return err
236 } 238 }
237 239
238 // AddFactory adds this distributor implementation into the distributor 240 // AddFactory adds this distributor implementation into the distributor
239 // Registry. 241 // Registry.
240 func AddFactory(m distributor.FactoryMap) { 242 func AddFactory(m distributor.FactoryMap) {
241 m[(*jobsim.Config)(nil)] = func(c context.Context, cfg *distributor.Conf ig) (distributor.D, error) { 243 m[(*jobsim.Config)(nil)] = func(c context.Context, cfg *distributor.Conf ig) (distributor.D, error) {
242 return &jobsimDist{c, cfg}, nil 244 return &jobsimDist{c, cfg}, nil
243 } 245 }
244 } 246 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698