Index: scheduler/appengine/engine/cron/demo/main.go |
diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go |
index 381f63877e45c67b091bbb6fc92b9ae19331fcf5..50e1b08b2a6a120010ea96cf6a4e2441ec03473a 100644 |
--- a/scheduler/appengine/engine/cron/demo/main.go |
+++ b/scheduler/appengine/engine/cron/demo/main.go |
@@ -16,33 +16,44 @@ |
package demo |
import ( |
+ "fmt" |
"net/http" |
+ "strconv" |
+ "sync" |
"time" |
"golang.org/x/net/context" |
"github.com/golang/protobuf/proto" |
"github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/info" |
+ "github.com/luci/gae/service/memcache" |
"github.com/luci/luci-go/appengine/gaemiddleware" |
+ "github.com/luci/luci-go/appengine/memlock" |
"github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/data/rand/mathrand" |
"github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/retry/transient" |
"github.com/luci/luci-go/server/router" |
"github.com/luci/luci-go/scheduler/appengine/engine/cron" |
+ "github.com/luci/luci-go/scheduler/appengine/engine/dsset" |
"github.com/luci/luci-go/scheduler/appengine/engine/internal" |
"github.com/luci/luci-go/scheduler/appengine/engine/tq" |
"github.com/luci/luci-go/scheduler/appengine/schedule" |
) |
-var tasks = tq.Dispatcher{} |
+var dispatcher = tq.Dispatcher{} |
type CronState struct { |
_extra datastore.PropertyMap `gae:"-,extra"` |
ID string `gae:"$id"` |
State cron.State `gae:",noindex"` |
+ |
+ NextInvID int64 `gae:",noindex"` |
+ RunningSet []int64 `gae:",noindex"` |
} |
func (s *CronState) schedule() *schedule.Schedule { |
@@ -53,56 +64,73 @@ func (s *CronState) schedule() *schedule.Schedule { |
return parsed |
} |
-// evolve instantiates cron.Machine, calls the callback and submits emitted |
-// actions. |
-func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error { |
- err := datastore.RunInTransaction(c, func(c context.Context) error { |
- entity := CronState{ID: id} |
- if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity { |
- return err |
- } |
+func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
+ return &dsset.Set{ |
+ ID: "triggers:" + jobID, |
+ ShardCount: 8, |
+ TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), |
+ TombstonesDelay: 15 * time.Minute, |
+ } |
+} |
- machine := &cron.Machine{ |
- Now: clock.Now(c), |
- Schedule: entity.schedule(), |
- Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, |
- State: entity.State, |
+func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { |
+ return &dsset.Set{ |
+ ID: "finished:" + jobID, |
+ ShardCount: 8, |
+ TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), |
+ TombstonesDelay: 15 * time.Minute, |
+ } |
+} |
+ |
+func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { |
+ machine := &cron.Machine{ |
+ Now: clock.Now(c), |
+ Schedule: entity.schedule(), |
+ Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, |
+ State: entity.State, |
+ } |
+ |
+ if err := cb(c, machine); err != nil { |
+ return err |
+ } |
+ |
+ tasks := []*tq.Task{} |
+ for _, action := range machine.Actions { |
+ switch a := action.(type) { |
+ case cron.TickLaterAction: |
+ logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
+ tasks = append(tasks, &tq.Task{ |
+ Payload: &internal.TickLaterTask{JobId: entity.ID, TickNonce: a.TickNonce}, |
+ ETA: a.When, |
+ }) |
+ case cron.StartInvocationAction: |
+ tasks = append(tasks, &tq.Task{ |
+ Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, |
+ }) |
+ default: |
+ panic("unknown action type") |
} |
+ } |
+ if err := dispatcher.AddTask(c, tasks...); err != nil { |
+ return err |
+ } |
+ |
+ entity.State = machine.State |
+ return nil |
+} |
- if err := cb(c, machine); err != nil { |
+func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error { |
+ err := datastore.RunInTransaction(c, func(c context.Context) error { |
+ entity := &CronState{ID: id} |
+ if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity { |
return err |
} |
- |
- for _, action := range machine.Actions { |
- var task tq.Task |
- switch a := action.(type) { |
- case cron.TickLaterAction: |
- logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
- task = tq.Task{ |
- Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, |
- ETA: a.When, |
- } |
- case cron.StartInvocationAction: |
- task = tq.Task{ |
- Payload: &internal.StartInvocationTask{JobId: id}, |
- Delay: time.Second, // give the transaction time to land |
- } |
- default: |
- panic("unknown action type") |
- } |
- if err := tasks.AddTask(c, &task); err != nil { |
- return err |
- } |
+ if err := pokeMachine(c, entity, cb); err != nil { |
+ return err |
} |
- |
- entity.State = machine.State |
- return datastore.Put(c, &entity) |
+ return datastore.Put(c, entity) |
}, nil) |
- |
- if err != nil { |
- logging.Errorf(c, "FAIL - %s", err) |
- } |
- return err |
+ return transient.Tag.Apply(err) |
} |
func startJob(c context.Context, id string) error { |
@@ -114,6 +142,46 @@ func startJob(c context.Context, id string) error { |
}) |
} |
+func addTrigger(c context.Context, jobID, triggerID string) error { |
+ logging.Infof(c, "Triggering %q - %q", jobID, triggerID) |
+ |
+ // Add the trigger request to the pending set. |
+ if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerID}}); err != nil { |
+ return err |
+ } |
+ |
+ // Run a task that examines the pending set and makes decisions. |
+ return kickTriageTask(c, jobID) |
+} |
+ |
+func kickTriageTask(c context.Context, jobID string) error { |
+ // Throttle to once per 2 sec (and make sure it is always in the future). |
+ eta := clock.Now(c).Unix() |
+ eta = (eta/2 + 1) * 2 |
+ dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta) |
+ |
+ // Use cheaper but crappier memcache as a first guard. |
+ itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute) |
+ if memcache.Get(c, itm) == nil { |
+ logging.Infof(c, "The triage task has already been scheduled") |
+ return nil // already added! |
+ } |
+ |
+ err := dispatcher.AddTask(c, &tq.Task{ |
+ DeduplicationKey: dedupKey, |
+ ETA: time.Unix(eta, 0), |
+ Payload: &internal.TriageTriggersTask{JobId: jobID}, |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ logging.Infof(c, "Scheduled the triage task") |
+ |
+ // Best effort in setting memcache flag. No big deal if it fails. |
+ memcache.Set(c, itm) |
+ return nil |
+} |
+ |
func handleTick(c context.Context, task proto.Message, execCount int) error { |
msg := task.(*internal.TickLaterTask) |
return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
@@ -121,22 +189,210 @@ func handleTick(c context.Context, task proto.Message, execCount int) error { |
}) |
} |
-func handleInvocation(c context.Context, task proto.Message, execCount int) error { |
- msg := task.(*internal.StartInvocationTask) |
- logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) |
- return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
- m.RewindIfNecessary() |
- return nil |
+func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.TriggerInvocationTask) |
+ return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
+} |
+ |
+func handleTriage(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.TriageTriggersTask) |
+ logging.Infof(c, "Triaging requests for %q", msg.JobId) |
+ |
+ err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c), func(context.Context) error { |
+ logging.Infof(c, "Got the lock!") |
+ return runTriage(c, msg.JobId) |
+ }) |
+ return transient.Tag.Apply(err) |
+} |
+ |
+func runTriage(c context.Context, jobID string) error { |
+ wg := sync.WaitGroup{} |
+ wg.Add(2) |
+ |
+ var triggersList *dsset.Listing |
+ var triggersErr error |
+ |
+ var finishedList *dsset.Listing |
+ var finishedErr error |
+ |
+ // Grab all pending requests (and stuff to cleanup). |
+ triggersSet := pendingTriggersSet(c, jobID) |
+ go func() { |
+ defer wg.Done() |
+ triggersList, triggersErr = triggersSet.List(c) |
+ if triggersErr == nil { |
+ logging.Infof(c, "Triggers: %d items, %d tombs to cleanup", |
+ len(triggersList.Items), len(triggersList.Tombstones)) |
+ } |
+ }() |
+ |
+ // Same for recently finished invocations. |
+ finishedSet := recentlyFinishedSet(c, jobID) |
+ go func() { |
+ defer wg.Done() |
+ finishedList, finishedErr = finishedSet.List(c) |
+ if finishedErr == nil { |
+ logging.Infof(c, "Finished: %d items, %d tombs to cleanup", |
+ len(finishedList.Items), len(finishedList.Tombstones)) |
+ } |
+ }() |
+ |
+ wg.Wait() |
+ switch { |
+ case triggersErr != nil: |
+ return triggersErr |
+ case finishedErr != nil: |
+ return finishedErr |
+ } |
+ |
+ // Do cleanups first. |
+ if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList.Tombstones); err != nil { |
+ return err |
+ } |
+ |
+ var cleanup []*dsset.Tombstone |
+ err := datastore.RunInTransaction(c, func(c context.Context) error { |
+ state := &CronState{ID: jobID} |
+ if err := datastore.Get(c, state); err != nil && err != datastore.ErrNoSuchEntity { |
+ return err |
+ } |
+ |
+ popOps := []*dsset.PopOp{} |
+ |
+ // Tidy RunningSet by removing all recently finished invocations. |
+ if !finishedList.Empty() { |
+ op, err := finishedSet.BeginPop(c, finishedList) |
+ if err != nil { |
+ return err |
+ } |
+ popOps = append(popOps, op) |
+ |
+ reallyFinished := map[int64]struct{}{} |
+ for _, itm := range finishedList.Items { |
+ if op.Pop(itm.ID) { |
+ id, _ := strconv.ParseInt(itm.ID, 10, 64) |
+ reallyFinished[id] = struct{}{} |
+ } |
+ } |
+ |
+ filtered := []int64{} |
+ for _, id := range state.RunningSet { |
+ if _, yep := reallyFinished[id]; yep { |
+ logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) |
+ } else { |
+ filtered = append(filtered, id) |
+ } |
+ } |
+ state.RunningSet = filtered |
+ } |
+ |
+ // Launch new invocations for each pending trigger. |
+ if !triggersList.Empty() { |
+ op, err := triggersSet.BeginPop(c, triggersList) |
+ if err != nil { |
+ return err |
+ } |
+ popOps = append(popOps, op) |
+ |
+ batch := internal.LaunchInvocationsBatchTask{JobId: state.ID} |
+ for _, trigger := range triggersList.Items { |
+ if op.Pop(trigger.ID) { |
+ logging.Infof(c, "Launching new launch-%d for trigger %s", state.NextInvID, trigger.ID) |
+ state.RunningSet = append(state.RunningSet, state.NextInvID) |
+ batch.InvId = append(batch.InvId, state.NextInvID) |
+ state.NextInvID++ |
+ } |
+ } |
+ // Transactionally trigger a batch with new invocations. |
+ if len(batch.InvId) != 0 { |
+ if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch}); err != nil { |
+ return err |
+ } |
+ } |
+ } |
+ |
+ // Submit set changes. |
+ var err error |
+ if cleanup, err = dsset.FinishPop(c, popOps...); err != nil { |
+ return err |
+ } |
+ |
+ logging.Infof(c, "Running invocations - %v", state.RunningSet) |
+ |
+ // If nothing is running, poke the cron machine. Maybe it wants to start |
+ // something. |
+ if len(state.RunningSet) == 0 { |
+ err = pokeMachine(c, state, func(c context.Context, m *cron.Machine) error { |
+ m.RewindIfNecessary() |
+ return nil |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ } |
+ |
+ // Done! |
+ return datastore.Put(c, state) |
+ }, nil) |
+ |
+ if err == nil && len(cleanup) != 0 { |
+ // Best effort cleanup of storage of consumed items. |
+ logging.Infof(c, "Cleaning up storage of %d items", len(cleanup)) |
+ if err := dsset.CleanupStorage(c, cleanup); err != nil { |
+ logging.Warningf(c, "Best effort cleanup failed - %s", err) |
+ } |
+ } |
+ |
+ return transient.Tag.Apply(err) |
+} |
+ |
+func handleBatchLaunch(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.LaunchInvocationsBatchTask) |
+ logging.Infof(c, "Batch launch for %q", msg.JobId) |
+ |
+ tasks := []*tq.Task{} |
+ for _, invId := range msg.InvId { |
+ logging.Infof(c, "Launching inv-%d", invId) |
+ tasks = append(tasks, &tq.Task{ |
+ DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, invId), |
+ Payload: &internal.LaunchInvocationTask{ |
+ JobId: msg.JobId, |
+ InvId: invId, |
+ }, |
+ }) |
+ } |
+ |
+ return dispatcher.AddTask(c, tasks...) |
+} |
+ |
+func handleLaunchTask(c context.Context, task proto.Message, execCount int) error { |
+ msg := task.(*internal.LaunchInvocationTask) |
+ logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvId) |
+ |
+ // There can be more stuff here. But we just finish the invocation right away. |
+ |
+ finishedSet := recentlyFinishedSet(c, msg.JobId) |
+ err := finishedSet.Add(c, []dsset.Item{ |
+ {ID: fmt.Sprintf("%d", msg.InvId)}, |
}) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ // Kick the triage now that the set of running invocations has been modified. |
+ return kickTriageTask(c, msg.JobId) |
} |
func init() { |
r := router.New() |
gaemiddleware.InstallHandlers(r) |
- tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil) |
- tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil) |
- tasks.InstallRoutes(r, gaemiddleware.BaseProd()) |
+ dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
+ dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger, "triggers", nil) |
+ dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "triages", nil) |
+ dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBatchLaunch, "launches", nil) |
+ dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTask, "invocations", nil) |
+ dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) |
// Kick-start a bunch of jobs by visiting: |
// |
@@ -153,5 +409,13 @@ func init() { |
} |
}) |
+ r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
+ jobID := c.Params.ByName("JobID") |
+ triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixNano()) |
+ if err := addTrigger(c.Context, jobID, triggerID); err != nil { |
+ panic(err) |
+ } |
+ }) |
+ |
http.DefaultServeMux.Handle("/", r) |
} |