Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(311)

Side by Side Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: rebase Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | scheduler/appengine/engine/cron/demo/queue.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ.
16 package demo 16 package demo
17 17
18 import ( 18 import (
19 "fmt"
19 "net/http" 20 "net/http"
21 "strconv"
22 "sync"
20 "time" 23 "time"
21 24
22 "golang.org/x/net/context" 25 "golang.org/x/net/context"
23 26
24 "github.com/golang/protobuf/proto" 27 "github.com/golang/protobuf/proto"
25 "github.com/luci/gae/service/datastore" 28 "github.com/luci/gae/service/datastore"
29 "github.com/luci/gae/service/info"
30 "github.com/luci/gae/service/memcache"
26 31
27 "github.com/luci/luci-go/appengine/gaemiddleware" 32 "github.com/luci/luci-go/appengine/gaemiddleware"
33 "github.com/luci/luci-go/appengine/memlock"
28 "github.com/luci/luci-go/common/clock" 34 "github.com/luci/luci-go/common/clock"
29 "github.com/luci/luci-go/common/data/rand/mathrand" 35 "github.com/luci/luci-go/common/data/rand/mathrand"
30 "github.com/luci/luci-go/common/logging" 36 "github.com/luci/luci-go/common/logging"
37 "github.com/luci/luci-go/common/retry/transient"
31 "github.com/luci/luci-go/server/router" 38 "github.com/luci/luci-go/server/router"
32 39
33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" 40 "github.com/luci/luci-go/scheduler/appengine/engine/cron"
41 "github.com/luci/luci-go/scheduler/appengine/engine/dsset"
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" 42 "github.com/luci/luci-go/scheduler/appengine/engine/internal"
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" 43 "github.com/luci/luci-go/scheduler/appengine/engine/tq"
36 "github.com/luci/luci-go/scheduler/appengine/schedule" 44 "github.com/luci/luci-go/scheduler/appengine/schedule"
37 ) 45 )
38 46
39 var tasks = tq.Dispatcher{} 47 var dispatcher = tq.Dispatcher{}
40 48
41 type CronState struct { 49 type CronState struct {
42 _extra datastore.PropertyMap `gae:"-,extra"` 50 _extra datastore.PropertyMap `gae:"-,extra"`
43 51
44 ID string `gae:"$id"` 52 ID string `gae:"$id"`
45 State cron.State `gae:",noindex"` 53 State cron.State `gae:",noindex"`
54
55 NextInvID int64 `gae:",noindex"`
56 RunningSet []int64 `gae:",noindex"`
46 } 57 }
47 58
48 func (s *CronState) schedule() *schedule.Schedule { 59 func (s *CronState) schedule() *schedule.Schedule {
49 parsed, err := schedule.Parse(s.ID, 0) 60 parsed, err := schedule.Parse(s.ID, 0)
50 if err != nil { 61 if err != nil {
51 panic(err) 62 panic(err)
52 } 63 }
53 return parsed 64 return parsed
54 } 65 }
55 66
56 // evolve instantiates cron.Machine, calls the callback and submits emitted 67 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set {
57 // actions. 68 » return &dsset.Set{
69 » » ID: "triggers:" + jobID,
70 » » ShardCount: 8,
71 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
72 » » TombstonesDelay: 15 * time.Minute,
73 » }
74 }
75
76 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set {
77 » return &dsset.Set{
78 » » ID: "finished:" + jobID,
79 » » ShardCount: 8,
80 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
81 » » TombstonesDelay: 15 * time.Minute,
82 » }
83 }
84
85 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error {
86 » machine := &cron.Machine{
87 » » Now: clock.Now(c),
88 » » Schedule: entity.schedule(),
89 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
90 » » State: entity.State,
91 » }
92
93 » if err := cb(c, machine); err != nil {
94 » » return err
95 » }
96
97 » tasks := []*tq.Task{}
98 » for _, action := range machine.Actions {
99 » » switch a := action.(type) {
100 » » case cron.TickLaterAction:
101 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now()))
102 » » » tasks = append(tasks, &tq.Task{
103 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce},
104 » » » » ETA: a.When,
105 » » » })
106 » » case cron.StartInvocationAction:
107 » » » tasks = append(tasks, &tq.Task{
108 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()},
109 » » » })
110 » » default:
111 » » » panic("unknown action type")
112 » » }
113 » }
114 » if err := dispatcher.AddTask(c, tasks...); err != nil {
115 » » return err
116 » }
117
118 » entity.State = machine.State
119 » return nil
120 }
121
58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { 122 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error {
59 err := datastore.RunInTransaction(c, func(c context.Context) error { 123 err := datastore.RunInTransaction(c, func(c context.Context) error {
60 » » entity := CronState{ID: id} 124 » » entity := &CronState{ID: id}
61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { 125 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity {
62 return err 126 return err
63 } 127 }
64 128 » » if err := pokeMachine(c, entity, cb); err != nil {
65 » » machine := &cron.Machine{
66 » » » Now: clock.Now(c),
67 » » » Schedule: entity.schedule(),
68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
69 » » » State: entity.State,
70 » » }
71
72 » » if err := cb(c, machine); err != nil {
73 return err 129 return err
74 } 130 }
75 131 » » return datastore.Put(c, entity)
76 » » for _, action := range machine.Actions {
77 » » » var task tq.Task
78 » » » switch a := action.(type) {
79 » » » case cron.TickLaterAction:
80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
81 » » » » task = tq.Task{
82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
83 » » » » » ETA: a.When,
84 » » » » }
85 » » » case cron.StartInvocationAction:
86 » » » » task = tq.Task{
87 » » » » » Payload: &internal.StartInvocationTask{J obId: id},
88 » » » » » Delay: time.Second, // give the transa ction time to land
89 » » » » }
90 » » » default:
91 » » » » panic("unknown action type")
92 » » » }
93 » » » if err := tasks.AddTask(c, &task); err != nil {
94 » » » » return err
95 » » » }
96 » » }
97
98 » » entity.State = machine.State
99 » » return datastore.Put(c, &entity)
100 }, nil) 132 }, nil)
101 133 » return transient.Tag.Apply(err)
102 » if err != nil {
103 » » logging.Errorf(c, "FAIL - %s", err)
104 » }
105 » return err
106 } 134 }
107 135
108 func startJob(c context.Context, id string) error { 136 func startJob(c context.Context, id string) error {
109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { 137 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
110 // Forcefully restart the chain of tasks. 138 // Forcefully restart the chain of tasks.
111 m.Disable() 139 m.Disable()
112 m.Enable() 140 m.Enable()
113 return nil 141 return nil
114 }) 142 })
115 } 143 }
116 144
145 func addTrigger(c context.Context, jobID, triggerID string) error {
146 logging.Infof(c, "Triggering %q - %q", jobID, triggerID)
147
148 // Add the trigger request to the pending set.
149 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil {
150 return err
151 }
152
153 // Run a task that examines the pending set and makes decisions.
154 return kickTriageTask(c, jobID)
155 }
156
157 func kickTriageTask(c context.Context, jobID string) error {
158 // Throttle to once per 2 sec (and make sure it is always in the future) .
159 eta := clock.Now(c).Unix()
160 eta = (eta/2 + 1) * 2
161 dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta)
162
163 // Use cheaper but crappier memcache as a first guard.
164 itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute)
165 if memcache.Get(c, itm) == nil {
166 logging.Infof(c, "The triage task has already been scheduled")
167 return nil // already added!
168 }
169
170 err := dispatcher.AddTask(c, &tq.Task{
171 DeduplicationKey: dedupKey,
172 ETA: time.Unix(eta, 0),
173 Payload: &internal.TriageTriggersTask{JobId: jobID},
174 })
175 if err != nil {
176 return err
177 }
178 logging.Infof(c, "Scheduled the triage task")
179
180 // Best effort in setting memcache flag. No big deal if it fails.
181 memcache.Set(c, itm)
182 return nil
183 }
184
117 func handleTick(c context.Context, task proto.Message, execCount int) error { 185 func handleTick(c context.Context, task proto.Message, execCount int) error {
118 msg := task.(*internal.TickLaterTask) 186 msg := task.(*internal.TickLaterTask)
119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { 187 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
120 return m.OnTimerTick(msg.TickNonce) 188 return m.OnTimerTick(msg.TickNonce)
121 }) 189 })
122 } 190 }
123 191
124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { 192 func handleTrigger(c context.Context, task proto.Message, execCount int) error {
125 » msg := task.(*internal.StartInvocationTask) 193 » msg := task.(*internal.TriggerInvocationTask)
126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) 194 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId))
127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { 195 }
128 » » m.RewindIfNecessary() 196
129 » » return nil 197 func handleTriage(c context.Context, task proto.Message, execCount int) error {
198 » msg := task.(*internal.TriageTriggersTask)
199 » logging.Infof(c, "Triaging requests for %q", msg.JobId)
200
201 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error {
202 » » logging.Infof(c, "Got the lock!")
203 » » return runTriage(c, msg.JobId)
130 }) 204 })
205 return transient.Tag.Apply(err)
206 }
207
208 func runTriage(c context.Context, jobID string) error {
209 wg := sync.WaitGroup{}
210 wg.Add(2)
211
212 var triggersList *dsset.Listing
213 var triggersErr error
214
215 var finishedList *dsset.Listing
216 var finishedErr error
217
218 // Grab all pending requests (and stuff to cleanup).
219 triggersSet := pendingTriggersSet(c, jobID)
220 go func() {
221 defer wg.Done()
222 triggersList, triggersErr = triggersSet.List(c)
223 if triggersErr == nil {
224 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p",
225 len(triggersList.Items), len(triggersList.Tombst ones))
226 }
227 }()
228
229 // Same for recently finished invocations.
230 finishedSet := recentlyFinishedSet(c, jobID)
231 go func() {
232 defer wg.Done()
233 finishedList, finishedErr = finishedSet.List(c)
234 if finishedErr == nil {
235 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p",
236 len(finishedList.Items), len(finishedList.Tombst ones))
237 }
238 }()
239
240 wg.Wait()
241 switch {
242 case triggersErr != nil:
243 return triggersErr
244 case finishedErr != nil:
245 return finishedErr
246 }
247
248 // Do cleanups first.
249 if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList. Tombstones); err != nil {
250 return err
251 }
252
253 var cleanup []*dsset.Tombstone
254 err := datastore.RunInTransaction(c, func(c context.Context) error {
255 state := &CronState{ID: jobID}
256 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity {
257 return err
258 }
259
260 popOps := []*dsset.PopOp{}
261
262 // Tidy RunningSet by removing all recently finished invocations .
263 if !finishedList.Empty() {
264 op, err := finishedSet.BeginPop(c, finishedList)
265 if err != nil {
266 return err
267 }
268 popOps = append(popOps, op)
269
270 reallyFinished := map[int64]struct{}{}
271 for _, itm := range finishedList.Items {
272 if op.Pop(itm.ID) {
273 id, _ := strconv.ParseInt(itm.ID, 10, 64 )
274 reallyFinished[id] = struct{}{}
275 }
276 }
277
278 filtered := []int64{}
279 for _, id := range state.RunningSet {
280 if _, yep := reallyFinished[id]; yep {
281 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id)
282 } else {
283 filtered = append(filtered, id)
284 }
285 }
286 state.RunningSet = filtered
287 }
288
289 // Launch new invocations for each pending trigger.
290 if !triggersList.Empty() {
291 op, err := triggersSet.BeginPop(c, triggersList)
292 if err != nil {
293 return err
294 }
295 popOps = append(popOps, op)
296
297 batch := internal.LaunchInvocationsBatchTask{JobId: stat e.ID}
298 for _, trigger := range triggersList.Items {
299 if op.Pop(trigger.ID) {
300 logging.Infof(c, "Launching new launch-% d for trigger %s", state.NextInvID, trigger.ID)
301 state.RunningSet = append(state.RunningS et, state.NextInvID)
302 batch.InvId = append(batch.InvId, state. NextInvID)
303 state.NextInvID++
304 }
305 }
306 // Transactionally trigger a batch with new invocations.
307 if len(batch.InvId) != 0 {
308 if err := dispatcher.AddTask(c, &tq.Task{Payload : &batch}); err != nil {
309 return err
310 }
311 }
312 }
313
314 // Submit set changes.
315 var err error
316 if cleanup, err = dsset.FinishPop(c, popOps...); err != nil {
317 return err
318 }
319
320 logging.Infof(c, "Running invocations - %v", state.RunningSet)
321
322 // If nothing is running, poke the cron machine. Maybe it wants to start
323 // something.
324 if len(state.RunningSet) == 0 {
325 err = pokeMachine(c, state, func(c context.Context, m *c ron.Machine) error {
326 m.RewindIfNecessary()
327 return nil
328 })
329 if err != nil {
330 return err
331 }
332 }
333
334 // Done!
335 return datastore.Put(c, state)
336 }, nil)
337
338 if err == nil && len(cleanup) != 0 {
339 // Best effort cleanup of storage of consumed items.
340 logging.Infof(c, "Cleaning up storage of %d items", len(cleanup) )
341 if err := dsset.CleanupStorage(c, cleanup); err != nil {
342 logging.Warningf(c, "Best effort cleanup failed - %s", e rr)
343 }
344 }
345
346 return transient.Tag.Apply(err)
347 }
348
349 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or {
350 msg := task.(*internal.LaunchInvocationsBatchTask)
351 logging.Infof(c, "Batch launch for %q", msg.JobId)
352
353 tasks := []*tq.Task{}
354 for _, invId := range msg.InvId {
355 logging.Infof(c, "Launching inv-%d", invId)
356 tasks = append(tasks, &tq.Task{
357 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId),
358 Payload: &internal.LaunchInvocationTask{
359 JobId: msg.JobId,
360 InvId: invId,
361 },
362 })
363 }
364
365 return dispatcher.AddTask(c, tasks...)
366 }
367
368 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r {
369 msg := task.(*internal.LaunchInvocationTask)
370 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d)
371
372 // There can be more stuff here. But we just finish the invocation right away.
373
374 finishedSet := recentlyFinishedSet(c, msg.JobId)
375 err := finishedSet.Add(c, []dsset.Item{
376 {ID: fmt.Sprintf("%d", msg.InvId)},
377 })
378 if err != nil {
379 return err
380 }
381
382 // Kick the triage now that the set of running invocations has been modi fied.
383 return kickTriageTask(c, msg.JobId)
131 } 384 }
132 385
133 func init() { 386 func init() {
134 r := router.New() 387 r := router.New()
135 gaemiddleware.InstallHandlers(r) 388 gaemiddleware.InstallHandlers(r)
136 389
137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) 390 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil)
138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) 391 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil)
139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) 392 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil)
393 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil)
394 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil)
395 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd())
140 396
141 // Kick-start a bunch of jobs by visiting: 397 // Kick-start a bunch of jobs by visiting:
142 // 398 //
143 // http://localhost:8080/start/with 10s interval 399 // http://localhost:8080/start/with 10s interval
144 // http://localhost:8080/start/with 5s interval 400 // http://localhost:8080/start/with 5s interval
145 // http://localhost:8080/start/0 * * * * * * * 401 // http://localhost:8080/start/0 * * * * * * *
146 // 402 //
147 // And the look at the logs. 403 // And the look at the logs.
148 404
149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { 405 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
150 jobID := c.Params.ByName("JobID") 406 jobID := c.Params.ByName("JobID")
151 if err := startJob(c.Context, jobID); err != nil { 407 if err := startJob(c.Context, jobID); err != nil {
152 panic(err) 408 panic(err)
153 } 409 }
154 }) 410 })
155 411
412 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) {
413 jobID := c.Params.ByName("JobID")
414 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano())
415 if err := addTrigger(c.Context, jobID, triggerID); err != nil {
416 panic(err)
417 }
418 })
419
156 http.DefaultServeMux.Handle("/", r) 420 http.DefaultServeMux.Handle("/", r)
157 } 421 }
OLDNEW
« no previous file with comments | « no previous file | scheduler/appengine/engine/cron/demo/queue.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698