OLD | NEW |
1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
16 package demo | 16 package demo |
17 | 17 |
18 import ( | 18 import ( |
19 "fmt" | |
20 "net/http" | 19 "net/http" |
21 "strconv" | |
22 "time" | 20 "time" |
23 | 21 |
24 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
25 | 23 |
| 24 "github.com/golang/protobuf/proto" |
26 "github.com/luci/gae/service/datastore" | 25 "github.com/luci/gae/service/datastore" |
27 "github.com/luci/gae/service/taskqueue" | |
28 | 26 |
29 "github.com/luci/luci-go/appengine/gaemiddleware" | 27 "github.com/luci/luci-go/appengine/gaemiddleware" |
30 "github.com/luci/luci-go/common/clock" | 28 "github.com/luci/luci-go/common/clock" |
31 "github.com/luci/luci-go/common/data/rand/mathrand" | 29 "github.com/luci/luci-go/common/data/rand/mathrand" |
32 "github.com/luci/luci-go/common/logging" | 30 "github.com/luci/luci-go/common/logging" |
33 "github.com/luci/luci-go/server/router" | 31 "github.com/luci/luci-go/server/router" |
34 | 32 |
35 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| 34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
| 35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
36 "github.com/luci/luci-go/scheduler/appengine/schedule" | 36 "github.com/luci/luci-go/scheduler/appengine/schedule" |
37 ) | 37 ) |
38 | 38 |
| 39 var tasks = tq.Dispatcher{} |
| 40 |
39 type CronState struct { | 41 type CronState struct { |
40 _extra datastore.PropertyMap `gae:"-,extra"` | 42 _extra datastore.PropertyMap `gae:"-,extra"` |
41 | 43 |
42 ID string `gae:"$id"` | 44 ID string `gae:"$id"` |
43 State cron.State `gae:",noindex"` | 45 State cron.State `gae:",noindex"` |
44 } | 46 } |
45 | 47 |
46 func (s *CronState) schedule() *schedule.Schedule { | 48 func (s *CronState) schedule() *schedule.Schedule { |
47 parsed, err := schedule.Parse(s.ID, 0) | 49 parsed, err := schedule.Parse(s.ID, 0) |
48 if err != nil { | 50 if err != nil { |
(...skipping 16 matching lines...) Expand all Loading... |
65 Schedule: entity.schedule(), | 67 Schedule: entity.schedule(), |
66 Nonce: func() int64 { return mathrand.Get(c).Int63()
+ 1 }, | 68 Nonce: func() int64 { return mathrand.Get(c).Int63()
+ 1 }, |
67 State: entity.State, | 69 State: entity.State, |
68 } | 70 } |
69 | 71 |
70 if err := cb(c, machine); err != nil { | 72 if err := cb(c, machine); err != nil { |
71 return err | 73 return err |
72 } | 74 } |
73 | 75 |
74 for _, action := range machine.Actions { | 76 for _, action := range machine.Actions { |
75 » » » var task *taskqueue.Task | 77 » » » var task tq.Task |
76 switch a := action.(type) { | 78 switch a := action.(type) { |
77 case cron.TickLaterAction: | 79 case cron.TickLaterAction: |
78 logging.Infof(c, "Scheduling tick %d after %s",
a.TickNonce, a.When.Sub(time.Now())) | 80 logging.Infof(c, "Scheduling tick %d after %s",
a.TickNonce, a.When.Sub(time.Now())) |
79 » » » » task = &taskqueue.Task{ | 81 » » » » task = tq.Task{ |
80 » » » » » Path: fmt.Sprintf("/tick/%s/%d", id, a.T
ickNonce), | 82 » » » » » Payload: &internal.TickLaterTask{JobId:
id, TickNonce: a.TickNonce}, |
81 » » » » » ETA: a.When, | 83 » » » » » ETA: a.When, |
82 } | 84 } |
83 case cron.StartInvocationAction: | 85 case cron.StartInvocationAction: |
84 » » » » task = &taskqueue.Task{ | 86 » » » » task = tq.Task{ |
85 » » » » » Path: fmt.Sprintf("/invocation/%s", id)
, | 87 » » » » » Payload: &internal.StartInvocationTask{J
obId: id}, |
86 » » » » » Delay: time.Second, // give the transact
ion time to land | 88 » » » » » Delay: time.Second, // give the transa
ction time to land |
87 } | 89 } |
88 default: | 90 default: |
89 panic("unknown action type") | 91 panic("unknown action type") |
90 } | 92 } |
91 » » » if err := taskqueue.Add(c, "default", task); err != nil
{ | 93 » » » if err := tasks.AddTask(c, &task); err != nil { |
92 return err | 94 return err |
93 } | 95 } |
94 } | 96 } |
95 | 97 |
96 entity.State = machine.State | 98 entity.State = machine.State |
97 return datastore.Put(c, &entity) | 99 return datastore.Put(c, &entity) |
98 }, nil) | 100 }, nil) |
99 | 101 |
100 if err != nil { | 102 if err != nil { |
101 logging.Errorf(c, "FAIL - %s", err) | 103 logging.Errorf(c, "FAIL - %s", err) |
102 } | 104 } |
103 return err | 105 return err |
104 } | 106 } |
105 | 107 |
106 func startJob(c context.Context, id string) error { | 108 func startJob(c context.Context, id string) error { |
107 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
108 // Forcefully restart the chain of tasks. | 110 // Forcefully restart the chain of tasks. |
109 m.Disable() | 111 m.Disable() |
110 m.Enable() | 112 m.Enable() |
111 return nil | 113 return nil |
112 }) | 114 }) |
113 } | 115 } |
114 | 116 |
115 func handleTick(c context.Context, id string, nonce int64) error { | 117 func handleTick(c context.Context, task proto.Message, execCount int) error { |
116 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 118 » msg := task.(*internal.TickLaterTask) |
117 » » return m.OnTimerTick(nonce) | 119 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err
or { |
| 120 » » return m.OnTimerTick(msg.TickNonce) |
118 }) | 121 }) |
119 } | 122 } |
120 | 123 |
121 func handleInvocation(c context.Context, id string) error { | 124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro
r { |
122 » logging.Infof(c, "INVOCATION of job %q has finished!", id) | 125 » msg := task.(*internal.StartInvocationTask) |
123 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) |
| 127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err
or { |
124 m.RewindIfNecessary() | 128 m.RewindIfNecessary() |
125 return nil | 129 return nil |
126 }) | 130 }) |
127 } | 131 } |
128 | 132 |
129 func init() { | 133 func init() { |
130 r := router.New() | 134 r := router.New() |
131 gaemiddleware.InstallHandlers(r) | 135 gaemiddleware.InstallHandlers(r) |
132 | 136 |
| 137 tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil
) |
| 138 tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d
efault", nil) |
| 139 tasks.InstallRoutes(r, gaemiddleware.BaseProd()) |
| 140 |
133 // Kick-start a bunch of jobs by visiting: | 141 // Kick-start a bunch of jobs by visiting: |
134 // | 142 // |
135 // http://localhost:8080/start/with 10s interval | 143 // http://localhost:8080/start/with 10s interval |
136 // http://localhost:8080/start/with 5s interval | 144 // http://localhost:8080/start/with 5s interval |
137 // http://localhost:8080/start/0 * * * * * * * | 145 // http://localhost:8080/start/0 * * * * * * * |
138 // | 146 // |
139 // And the look at the logs. | 147 // And the look at the logs. |
140 | 148 |
141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context)
{ | 149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context)
{ |
142 jobID := c.Params.ByName("JobID") | 150 jobID := c.Params.ByName("JobID") |
143 if err := startJob(c.Context, jobID); err != nil { | 151 if err := startJob(c.Context, jobID); err != nil { |
144 panic(err) | 152 panic(err) |
145 } | 153 } |
146 }) | 154 }) |
147 | 155 |
148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout
er.Context) { | |
149 jobID := c.Params.ByName("JobID") | |
150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10,
64) | |
151 if err != nil { | |
152 panic(err) | |
153 } | |
154 if err := handleTick(c.Context, jobID, nonce); err != nil { | |
155 panic(err) | |
156 } | |
157 }) | |
158 | |
159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co
ntext) { | |
160 jobID := c.Params.ByName("JobID") | |
161 if err := handleInvocation(c.Context, jobID); err != nil { | |
162 panic(err) | |
163 } | |
164 }) | |
165 | |
166 http.DefaultServeMux.Handle("/", r) | 156 http.DefaultServeMux.Handle("/", r) |
167 } | 157 } |
OLD | NEW |