Index: scheduler/appengine/engine/cron/demo/main.go |
diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go |
index e24f1be0978f157051a71004853be5a507801aab..381f63877e45c67b091bbb6fc92b9ae19331fcf5 100644 |
--- a/scheduler/appengine/engine/cron/demo/main.go |
+++ b/scheduler/appengine/engine/cron/demo/main.go |
@@ -16,15 +16,13 @@ |
package demo |
import ( |
- "fmt" |
"net/http" |
- "strconv" |
"time" |
"golang.org/x/net/context" |
+ "github.com/golang/protobuf/proto" |
"github.com/luci/gae/service/datastore" |
- "github.com/luci/gae/service/taskqueue" |
"github.com/luci/luci-go/appengine/gaemiddleware" |
"github.com/luci/luci-go/common/clock" |
@@ -33,9 +31,13 @@ import ( |
"github.com/luci/luci-go/server/router" |
"github.com/luci/luci-go/scheduler/appengine/engine/cron" |
+ "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
+ "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
"github.com/luci/luci-go/scheduler/appengine/schedule" |
) |
+var tasks = tq.Dispatcher{} |
+ |
type CronState struct { |
_extra datastore.PropertyMap `gae:"-,extra"` |
@@ -72,23 +74,23 @@ func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine |
} |
for _, action := range machine.Actions { |
- var task *taskqueue.Task |
+ var task tq.Task |
switch a := action.(type) { |
case cron.TickLaterAction: |
logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
- task = &taskqueue.Task{ |
- Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce), |
- ETA: a.When, |
+ task = tq.Task{ |
+ Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, |
+ ETA: a.When, |
} |
case cron.StartInvocationAction: |
- task = &taskqueue.Task{ |
- Path: fmt.Sprintf("/invocation/%s", id), |
- Delay: time.Second, // give the transaction time to land |
+ task = tq.Task{ |
+ Payload: &internal.StartInvocationTask{JobId: id}, |
+ Delay: time.Second, // give the transaction time to land |
} |
default: |
panic("unknown action type") |
} |
- if err := taskqueue.Add(c, "default", task); err != nil { |
+ if err := tasks.AddTask(c, &task); err != nil { |
return err |
} |
} |
@@ -112,15 +114,17 @@ func startJob(c context.Context, id string) error { |
}) |
} |
-func handleTick(c context.Context, id string, nonce int64) error { |
- return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
- return m.OnTimerTick(nonce) |
+func handleTick(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.TickLaterTask) |
+ return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
+ return m.OnTimerTick(msg.TickNonce) |
}) |
} |
-func handleInvocation(c context.Context, id string) error { |
- logging.Infof(c, "INVOCATION of job %q has finished!", id) |
- return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
+func handleInvocation(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.StartInvocationTask) |
+ logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) |
+ return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
m.RewindIfNecessary() |
return nil |
}) |
@@ -130,6 +134,10 @@ func init() { |
r := router.New() |
gaemiddleware.InstallHandlers(r) |
+ tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil) |
+ tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil) |
+ tasks.InstallRoutes(r, gaemiddleware.BaseProd()) |
+ |
// Kick-start a bunch of jobs by visiting: |
// |
// http://localhost:8080/start/with 10s interval |
@@ -145,23 +153,5 @@ func init() { |
} |
}) |
- r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) { |
- jobID := c.Params.ByName("JobID") |
- nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64) |
- if err != nil { |
- panic(err) |
- } |
- if err := handleTick(c.Context, jobID, nonce); err != nil { |
- panic(err) |
- } |
- }) |
- |
- r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
- jobID := c.Params.ByName("JobID") |
- if err := handleInvocation(c.Context, jobID); err != nil { |
- panic(err) |
- } |
- }) |
- |
http.DefaultServeMux.Handle("/", r) |
} |