| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package jobsim | 5 package jobsim |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/json" | 8 "encoding/json" |
| 9 "fmt" | 9 "fmt" |
| 10 "io/ioutil" | 10 "io/ioutil" |
| (...skipping 23 matching lines...) Expand all Loading... |
| 34 func (j *jobsimDist) jsConfig() *jobsim.Config { | 34 func (j *jobsimDist) jsConfig() *jobsim.Config { |
| 35 return j.cfg.Content.(*jobsim.Config) | 35 return j.cfg.Content.(*jobsim.Config) |
| 36 } | 36 } |
| 37 | 37 |
| 38 func (j *jobsimDist) parsePayload(payload string) (*jobsimExecution, error) { | 38 func (j *jobsimDist) parsePayload(payload string) (*jobsimExecution, error) { |
| 39 ret := &jobsimExecution{} | 39 ret := &jobsimExecution{} |
| 40 err := jsonpb.UnmarshalString(payload, &ret.Calculation) | 40 err := jsonpb.UnmarshalString(payload, &ret.Calculation) |
| 41 return ret, err | 41 return ret, err |
| 42 } | 42 } |
| 43 | 43 |
| 44 func (j *jobsimDist) Run(tsk *distributor.TaskDescription) (tok distributor.Toke
n, _ time.Duration, err error) { | 44 func (j *jobsimDist) Run(desc *dm.Quest_Desc, exAuth *dm.Execution_Auth, prev *d
m.JsonResult) (tok distributor.Token, _ time.Duration, err error) { |
| 45 // TODO(riannucci): Fix luci-gae so we can truly escape the transaction
when | 45 // TODO(riannucci): Fix luci-gae so we can truly escape the transaction
when |
| 46 // we build the jobsimDist instance. See luci/gae#23. | 46 // we build the jobsimDist instance. See luci/gae#23. |
| 47 ds := txnBuf.GetNoTxn(j.c) | 47 ds := txnBuf.GetNoTxn(j.c) |
| 48 | 48 |
| 49 logging.Fields{ | 49 logging.Fields{ |
| 50 » » "eid": tsk.ExecutionAuth().Id, | 50 » » "eid": exAuth.Id, |
| 51 }.Infof(j.c, "jobsim: running new task") | 51 }.Infof(j.c, "jobsim: running new task") |
| 52 | 52 |
| 53 » jtsk, err := j.parsePayload(tsk.Payload().Parameters) | 53 » jtsk, err := j.parsePayload(desc.Parameters) |
| 54 if err != nil { | 54 if err != nil { |
| 55 return | 55 return |
| 56 } | 56 } |
| 57 » jtsk.ExAuth = *tsk.ExecutionAuth() | 57 » jtsk.ExAuth = *exAuth |
| 58 jtsk.Status = jobsimRunnable | 58 jtsk.Status = jobsimRunnable |
| 59 » jtsk.StateOrReason = tsk.PreviousResult().Object | 59 » if prev != nil { |
| 60 » » jtsk.StateOrReason = prev.Object |
| 61 » } |
| 60 jtsk.CfgName = j.cfg.Name | 62 jtsk.CfgName = j.cfg.Name |
| 61 | 63 |
| 62 key := []*datastore.Key{ | 64 key := []*datastore.Key{ |
| 63 ds.MakeKey(datastore.GetMetaDefault(datastore.GetPLS(jtsk), "kin
d", ""), 0)} | 65 ds.MakeKey(datastore.GetMetaDefault(datastore.GetPLS(jtsk), "kin
d", ""), 0)} |
| 64 if err = ds.AllocateIDs(key); err != nil { | 66 if err = ds.AllocateIDs(key); err != nil { |
| 65 return | 67 return |
| 66 } | 68 } |
| 67 | 69 |
| 68 // transactionally commit the job and a taskqueue task to execute it | 70 // transactionally commit the job and a taskqueue task to execute it |
| 69 jtsk.ID = fmt.Sprintf("%s|%d", j.jsConfig().Pool, key[0].IntID()) | 71 jtsk.ID = fmt.Sprintf("%s|%d", j.jsConfig().Pool, key[0].IntID()) |
| (...skipping 18 matching lines...) Expand all Loading... |
| 88 return err | 90 return err |
| 89 } | 91 } |
| 90 logging.Infof(j.c, "jobsim: EnqueueTask'd") | 92 logging.Infof(j.c, "jobsim: EnqueueTask'd") |
| 91 return nil | 93 return nil |
| 92 }, nil) | 94 }, nil) |
| 93 | 95 |
| 94 tok = distributor.Token(jtsk.ID) | 96 tok = distributor.Token(jtsk.ID) |
| 95 return | 97 return |
| 96 } | 98 } |
| 97 | 99 |
| 98 func (j *jobsimDist) Cancel(tok distributor.Token) error { | 100 func (j *jobsimDist) Cancel(_ *dm.Quest_Desc, tok distributor.Token) error { |
| 99 jtsk := &jobsimExecution{ID: string(tok)} | 101 jtsk := &jobsimExecution{ID: string(tok)} |
| 100 | 102 |
| 101 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) { | 103 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) { |
| 102 if err = ds.Get(jtsk); err != nil { | 104 if err = ds.Get(jtsk); err != nil { |
| 103 return | 105 return |
| 104 } | 106 } |
| 105 if jtsk.Status != jobsimRunnable { | 107 if jtsk.Status != jobsimRunnable { |
| 106 return | 108 return |
| 107 } | 109 } |
| 108 needWrite = true | 110 needWrite = true |
| 109 return | 111 return |
| 110 } | 112 } |
| 111 | 113 |
| 112 ds := datastore.Get(j.c) | 114 ds := datastore.Get(j.c) |
| 113 if needWrite, err := cancelBody(ds); err != nil || !needWrite { | 115 if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
| 114 return err | 116 return err |
| 115 } | 117 } |
| 116 | 118 |
| 117 return ds.RunInTransaction(func(c context.Context) error { | 119 return ds.RunInTransaction(func(c context.Context) error { |
| 118 ds := datastore.Get(c) | 120 ds := datastore.Get(c) |
| 119 if needWrite, err := cancelBody(ds); err != nil || !needWrite { | 121 if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
| 120 return err | 122 return err |
| 121 } | 123 } |
| 122 jtsk.Status = jobsimCancelled | 124 jtsk.Status = jobsimCancelled |
| 123 return ds.Put(jtsk) | 125 return ds.Put(jtsk) |
| 124 }, nil) | 126 }, nil) |
| 125 } | 127 } |
| 126 | 128 |
| 127 func (j *jobsimDist) GetStatus(tok distributor.Token) (*dm.Result, error) { | 129 func (j *jobsimDist) GetStatus(_ *dm.Quest_Desc, tok distributor.Token) (*dm.Res
ult, error) { |
| 128 jtsk, err := loadTask(j.c, string(tok)) | 130 jtsk, err := loadTask(j.c, string(tok)) |
| 129 if err != nil { | 131 if err != nil { |
| 130 return nil, err | 132 return nil, err |
| 131 } | 133 } |
| 132 | 134 |
| 133 return getAttemptResult(jtsk.Status, jtsk.StateOrReason), nil | 135 return getAttemptResult(jtsk.Status, jtsk.StateOrReason), nil |
| 134 } | 136 } |
| 135 | 137 |
| 136 func (j *jobsimDist) InfoURL(tok distributor.Token) string { | 138 func (j *jobsimDist) InfoURL(tok distributor.Token) string { |
| 137 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.Name, j.cfg.Versio
n, tok) | 139 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.Name, j.cfg.Versio
n, tok) |
| 138 } | 140 } |
| 139 | 141 |
| 140 func (j *jobsimDist) HandleNotification(note *distributor.Notification) (*dm.Res
ult, error) { | 142 func (j *jobsimDist) HandleNotification(_ *dm.Quest_Desc, note *distributor.Noti
fication) (*dm.Result, error) { |
| 141 n := ¬ification{} | 143 n := ¬ification{} |
| 142 err := json.Unmarshal(note.Data, n) | 144 err := json.Unmarshal(note.Data, n) |
| 143 if err != nil { | 145 if err != nil { |
| 144 return nil, err | 146 return nil, err |
| 145 } | 147 } |
| 146 | 148 |
| 147 return getAttemptResult(n.Status, n.StateOrReason), nil | 149 return getAttemptResult(n.Status, n.StateOrReason), nil |
| 148 } | 150 } |
| 149 | 151 |
| 150 func loadTask(c context.Context, rawTok string) (*jobsimExecution, error) { | 152 func loadTask(c context.Context, rawTok string) (*jobsimExecution, error) { |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 235 return err | 237 return err |
| 236 } | 238 } |
| 237 | 239 |
| 238 // AddFactory adds this distributor implementation into the distributor | 240 // AddFactory adds this distributor implementation into the distributor |
| 239 // Registry. | 241 // Registry. |
| 240 func AddFactory(m distributor.FactoryMap) { | 242 func AddFactory(m distributor.FactoryMap) { |
| 241 m[(*jobsim.Config)(nil)] = func(c context.Context, cfg *distributor.Conf
ig) (distributor.D, error) { | 243 m[(*jobsim.Config)(nil)] = func(c context.Context, cfg *distributor.Conf
ig) (distributor.D, error) { |
| 242 return &jobsimDist{c, cfg}, nil | 244 return &jobsimDist{c, cfg}, nil |
| 243 } | 245 } |
| 244 } | 246 } |
| OLD | NEW |