Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(225)

Side by Side Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: comment nit Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ.
16 package demo 16 package demo
17 17
18 import ( 18 import (
19 "fmt"
20 "net/http" 19 "net/http"
21 "strconv"
22 "time" 20 "time"
23 21
24 "golang.org/x/net/context" 22 "golang.org/x/net/context"
25 23
24 "github.com/golang/protobuf/proto"
26 "github.com/luci/gae/service/datastore" 25 "github.com/luci/gae/service/datastore"
27 "github.com/luci/gae/service/taskqueue"
28 26
29 "github.com/luci/luci-go/appengine/gaemiddleware" 27 "github.com/luci/luci-go/appengine/gaemiddleware"
30 "github.com/luci/luci-go/common/clock" 28 "github.com/luci/luci-go/common/clock"
31 "github.com/luci/luci-go/common/data/rand/mathrand" 29 "github.com/luci/luci-go/common/data/rand/mathrand"
32 "github.com/luci/luci-go/common/logging" 30 "github.com/luci/luci-go/common/logging"
33 "github.com/luci/luci-go/server/router" 31 "github.com/luci/luci-go/server/router"
34 32
35 "github.com/luci/luci-go/scheduler/appengine/engine/cron" 33 "github.com/luci/luci-go/scheduler/appengine/engine/cron"
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal"
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq"
36 "github.com/luci/luci-go/scheduler/appengine/schedule" 36 "github.com/luci/luci-go/scheduler/appengine/schedule"
37 ) 37 )
38 38
39 var tasks = tq.Dispatcher{}
40
39 type CronState struct { 41 type CronState struct {
40 _extra datastore.PropertyMap `gae:"-,extra"` 42 _extra datastore.PropertyMap `gae:"-,extra"`
41 43
42 ID string `gae:"$id"` 44 ID string `gae:"$id"`
43 State cron.State `gae:",noindex"` 45 State cron.State `gae:",noindex"`
44 } 46 }
45 47
46 func (s *CronState) schedule() *schedule.Schedule { 48 func (s *CronState) schedule() *schedule.Schedule {
47 parsed, err := schedule.Parse(s.ID, 0) 49 parsed, err := schedule.Parse(s.ID, 0)
48 if err != nil { 50 if err != nil {
(...skipping 16 matching lines...) Expand all
65 Schedule: entity.schedule(), 67 Schedule: entity.schedule(),
66 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, 68 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
67 State: entity.State, 69 State: entity.State,
68 } 70 }
69 71
70 if err := cb(c, machine); err != nil { 72 if err := cb(c, machine); err != nil {
71 return err 73 return err
72 } 74 }
73 75
74 for _, action := range machine.Actions { 76 for _, action := range machine.Actions {
75 » » » var task *taskqueue.Task 77 » » » var task tq.Task
76 switch a := action.(type) { 78 switch a := action.(type) {
77 case cron.TickLaterAction: 79 case cron.TickLaterAction:
78 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) 80 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
79 » » » » task = &taskqueue.Task{ 81 » » » » task = tq.Task{
80 » » » » » Path: fmt.Sprintf("/tick/%s/%d", id, a.T ickNonce), 82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
81 » » » » » ETA: a.When, 83 » » » » » ETA: a.When,
82 } 84 }
83 case cron.StartInvocationAction: 85 case cron.StartInvocationAction:
84 » » » » task = &taskqueue.Task{ 86 » » » » task = tq.Task{
85 » » » » » Path: fmt.Sprintf("/invocation/%s", id) , 87 » » » » » Payload: &internal.StartInvocationTask{J obId: id},
86 » » » » » Delay: time.Second, // give the transact ion time to land 88 » » » » » Delay: time.Second, // give the transa ction time to land
87 } 89 }
88 default: 90 default:
89 panic("unknown action type") 91 panic("unknown action type")
90 } 92 }
91 » » » if err := taskqueue.Add(c, "default", task); err != nil { 93 » » » if err := tasks.AddTask(c, &task); err != nil {
92 return err 94 return err
93 } 95 }
94 } 96 }
95 97
96 entity.State = machine.State 98 entity.State = machine.State
97 return datastore.Put(c, &entity) 99 return datastore.Put(c, &entity)
98 }, nil) 100 }, nil)
99 101
100 if err != nil { 102 if err != nil {
101 logging.Errorf(c, "FAIL - %s", err) 103 logging.Errorf(c, "FAIL - %s", err)
102 } 104 }
103 return err 105 return err
104 } 106 }
105 107
106 func startJob(c context.Context, id string) error { 108 func startJob(c context.Context, id string) error {
107 return evolve(c, id, func(c context.Context, m *cron.Machine) error { 109 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
108 // Forcefully restart the chain of tasks. 110 // Forcefully restart the chain of tasks.
109 m.Disable() 111 m.Disable()
110 m.Enable() 112 m.Enable()
111 return nil 113 return nil
112 }) 114 })
113 } 115 }
114 116
115 func handleTick(c context.Context, id string, nonce int64) error { 117 func handleTick(c context.Context, task proto.Message, execCount int) error {
116 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { 118 » msg := task.(*internal.TickLaterTask)
117 » » return m.OnTimerTick(nonce) 119 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
120 » » return m.OnTimerTick(msg.TickNonce)
118 }) 121 })
119 } 122 }
120 123
121 func handleInvocation(c context.Context, id string) error { 124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r {
122 » logging.Infof(c, "INVOCATION of job %q has finished!", id) 125 » msg := task.(*internal.StartInvocationTask)
123 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { 126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId)
127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
124 m.RewindIfNecessary() 128 m.RewindIfNecessary()
125 return nil 129 return nil
126 }) 130 })
127 } 131 }
128 132
129 func init() { 133 func init() {
130 r := router.New() 134 r := router.New()
131 gaemiddleware.InstallHandlers(r) 135 gaemiddleware.InstallHandlers(r)
132 136
137 tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil )
138 tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil)
139 tasks.InstallRoutes(r, gaemiddleware.BaseProd())
140
133 // Kick-start a bunch of jobs by visiting: 141 // Kick-start a bunch of jobs by visiting:
134 // 142 //
135 // http://localhost:8080/start/with 10s interval 143 // http://localhost:8080/start/with 10s interval
136 // http://localhost:8080/start/with 5s interval 144 // http://localhost:8080/start/with 5s interval
137 // http://localhost:8080/start/0 * * * * * * * 145 // http://localhost:8080/start/0 * * * * * * *
138 // 146 //
139 // And the look at the logs. 147 // And the look at the logs.
140 148
141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { 149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
142 jobID := c.Params.ByName("JobID") 150 jobID := c.Params.ByName("JobID")
143 if err := startJob(c.Context, jobID); err != nil { 151 if err := startJob(c.Context, jobID); err != nil {
144 panic(err) 152 panic(err)
145 } 153 }
146 }) 154 })
147 155
148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout er.Context) {
149 jobID := c.Params.ByName("JobID")
150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
151 if err != nil {
152 panic(err)
153 }
154 if err := handleTick(c.Context, jobID, nonce); err != nil {
155 panic(err)
156 }
157 })
158
159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co ntext) {
160 jobID := c.Params.ByName("JobID")
161 if err := handleInvocation(c.Context, jobID); err != nil {
162 panic(err)
163 }
164 })
165
166 http.DefaultServeMux.Handle("/", r) 156 http.DefaultServeMux.Handle("/", r)
167 } 157 }
OLDNEW
« no previous file with comments | « no previous file | scheduler/appengine/engine/internal/gen.go » ('j') | scheduler/appengine/engine/tq/tq.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698