Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Unified Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: rebase Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | scheduler/appengine/engine/cron/demo/queue.yaml » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/cron/demo/main.go
diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go
index 381f63877e45c67b091bbb6fc92b9ae19331fcf5..50e1b08b2a6a120010ea96cf6a4e2441ec03473a 100644
--- a/scheduler/appengine/engine/cron/demo/main.go
+++ b/scheduler/appengine/engine/cron/demo/main.go
@@ -16,33 +16,44 @@
package demo
import (
+ "fmt"
"net/http"
+ "strconv"
+ "sync"
"time"
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
"github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/gae/service/memcache"
"github.com/luci/luci-go/appengine/gaemiddleware"
+ "github.com/luci/luci-go/appengine/memlock"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/data/rand/mathrand"
"github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/server/router"
"github.com/luci/luci-go/scheduler/appengine/engine/cron"
+ "github.com/luci/luci-go/scheduler/appengine/engine/dsset"
"github.com/luci/luci-go/scheduler/appengine/engine/internal"
"github.com/luci/luci-go/scheduler/appengine/engine/tq"
"github.com/luci/luci-go/scheduler/appengine/schedule"
)
-var tasks = tq.Dispatcher{}
+var dispatcher = tq.Dispatcher{}
type CronState struct {
_extra datastore.PropertyMap `gae:"-,extra"`
ID string `gae:"$id"`
State cron.State `gae:",noindex"`
+
+ NextInvID int64 `gae:",noindex"`
+ RunningSet []int64 `gae:",noindex"`
}
func (s *CronState) schedule() *schedule.Schedule {
@@ -53,56 +64,73 @@ func (s *CronState) schedule() *schedule.Schedule {
return parsed
}
-// evolve instantiates cron.Machine, calls the callback and submits emitted
-// actions.
-func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error {
- err := datastore.RunInTransaction(c, func(c context.Context) error {
- entity := CronState{ID: id}
- if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity {
- return err
- }
+func pendingTriggersSet(c context.Context, jobID string) *dsset.Set {
+ return &dsset.Set{
+ ID: "triggers:" + jobID,
+ ShardCount: 8,
+ TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
+ TombstonesDelay: 15 * time.Minute,
+ }
+}
- machine := &cron.Machine{
- Now: clock.Now(c),
- Schedule: entity.schedule(),
- Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
- State: entity.State,
+func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set {
+ return &dsset.Set{
+ ID: "finished:" + jobID,
+ ShardCount: 8,
+ TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
+ TombstonesDelay: 15 * time.Minute,
+ }
+}
+
+func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error {
+ machine := &cron.Machine{
+ Now: clock.Now(c),
+ Schedule: entity.schedule(),
+ Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
+ State: entity.State,
+ }
+
+ if err := cb(c, machine); err != nil {
+ return err
+ }
+
+ tasks := []*tq.Task{}
+ for _, action := range machine.Actions {
+ switch a := action.(type) {
+ case cron.TickLaterAction:
+ logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
+ tasks = append(tasks, &tq.Task{
+ Payload: &internal.TickLaterTask{JobId: entity.ID, TickNonce: a.TickNonce},
+ ETA: a.When,
+ })
+ case cron.StartInvocationAction:
+ tasks = append(tasks, &tq.Task{
+ Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()},
+ })
+ default:
+ panic("unknown action type")
}
+ }
+ if err := dispatcher.AddTask(c, tasks...); err != nil {
+ return err
+ }
+
+ entity.State = machine.State
+ return nil
+}
- if err := cb(c, machine); err != nil {
+func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error {
+ err := datastore.RunInTransaction(c, func(c context.Context) error {
+ entity := &CronState{ID: id}
+ if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
-
- for _, action := range machine.Actions {
- var task tq.Task
- switch a := action.(type) {
- case cron.TickLaterAction:
- logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
- task = tq.Task{
- Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
- ETA: a.When,
- }
- case cron.StartInvocationAction:
- task = tq.Task{
- Payload: &internal.StartInvocationTask{JobId: id},
- Delay: time.Second, // give the transaction time to land
- }
- default:
- panic("unknown action type")
- }
- if err := tasks.AddTask(c, &task); err != nil {
- return err
- }
+ if err := pokeMachine(c, entity, cb); err != nil {
+ return err
}
-
- entity.State = machine.State
- return datastore.Put(c, &entity)
+ return datastore.Put(c, entity)
}, nil)
-
- if err != nil {
- logging.Errorf(c, "FAIL - %s", err)
- }
- return err
+ return transient.Tag.Apply(err)
}
func startJob(c context.Context, id string) error {
@@ -114,6 +142,46 @@ func startJob(c context.Context, id string) error {
})
}
+func addTrigger(c context.Context, jobID, triggerID string) error {
+ logging.Infof(c, "Triggering %q - %q", jobID, triggerID)
+
+ // Add the trigger request to the pending set.
+ if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerID}}); err != nil {
+ return err
+ }
+
+ // Run a task that examines the pending set and makes decisions.
+ return kickTriageTask(c, jobID)
+}
+
+func kickTriageTask(c context.Context, jobID string) error {
+ // Throttle to once per 2 sec (and make sure it is always in the future).
+ eta := clock.Now(c).Unix()
+ eta = (eta/2 + 1) * 2
+ dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta)
+
+ // Use cheaper but crappier memcache as a first guard.
+ itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute)
+ if memcache.Get(c, itm) == nil {
+ logging.Infof(c, "The triage task has already been scheduled")
+ return nil // already added!
+ }
+
+ err := dispatcher.AddTask(c, &tq.Task{
+ DeduplicationKey: dedupKey,
+ ETA: time.Unix(eta, 0),
+ Payload: &internal.TriageTriggersTask{JobId: jobID},
+ })
+ if err != nil {
+ return err
+ }
+ logging.Infof(c, "Scheduled the triage task")
+
+ // Best effort in setting memcache flag. No big deal if it fails.
+ memcache.Set(c, itm)
+ return nil
+}
+
func handleTick(c context.Context, task proto.Message, execCount int) error {
msg := task.(*internal.TickLaterTask)
return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
@@ -121,22 +189,210 @@ func handleTick(c context.Context, task proto.Message, execCount int) error {
})
}
-func handleInvocation(c context.Context, task proto.Message, execCount int) error {
- msg := task.(*internal.StartInvocationTask)
- logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId)
- return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
- m.RewindIfNecessary()
- return nil
+func handleTrigger(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.TriggerInvocationTask)
+ return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId))
+}
+
+func handleTriage(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.TriageTriggersTask)
+ logging.Infof(c, "Triaging requests for %q", msg.JobId)
+
+ err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c), func(context.Context) error {
+ logging.Infof(c, "Got the lock!")
+ return runTriage(c, msg.JobId)
+ })
+ return transient.Tag.Apply(err)
+}
+
+func runTriage(c context.Context, jobID string) error {
+ wg := sync.WaitGroup{}
+ wg.Add(2)
+
+ var triggersList *dsset.Listing
+ var triggersErr error
+
+ var finishedList *dsset.Listing
+ var finishedErr error
+
+ // Grab all pending requests (and stuff to cleanup).
+ triggersSet := pendingTriggersSet(c, jobID)
+ go func() {
+ defer wg.Done()
+ triggersList, triggersErr = triggersSet.List(c)
+ if triggersErr == nil {
+ logging.Infof(c, "Triggers: %d items, %d tombs to cleanup",
+ len(triggersList.Items), len(triggersList.Tombstones))
+ }
+ }()
+
+ // Same for recently finished invocations.
+ finishedSet := recentlyFinishedSet(c, jobID)
+ go func() {
+ defer wg.Done()
+ finishedList, finishedErr = finishedSet.List(c)
+ if finishedErr == nil {
+ logging.Infof(c, "Finished: %d items, %d tombs to cleanup",
+ len(finishedList.Items), len(finishedList.Tombstones))
+ }
+ }()
+
+ wg.Wait()
+ switch {
+ case triggersErr != nil:
+ return triggersErr
+ case finishedErr != nil:
+ return finishedErr
+ }
+
+ // Do cleanups first.
+ if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList.Tombstones); err != nil {
+ return err
+ }
+
+ var cleanup []*dsset.Tombstone
+ err := datastore.RunInTransaction(c, func(c context.Context) error {
+ state := &CronState{ID: jobID}
+ if err := datastore.Get(c, state); err != nil && err != datastore.ErrNoSuchEntity {
+ return err
+ }
+
+ popOps := []*dsset.PopOp{}
+
+ // Tidy RunningSet by removing all recently finished invocations.
+ if !finishedList.Empty() {
+ op, err := finishedSet.BeginPop(c, finishedList)
+ if err != nil {
+ return err
+ }
+ popOps = append(popOps, op)
+
+ reallyFinished := map[int64]struct{}{}
+ for _, itm := range finishedList.Items {
+ if op.Pop(itm.ID) {
+ id, _ := strconv.ParseInt(itm.ID, 10, 64)
+ reallyFinished[id] = struct{}{}
+ }
+ }
+
+ filtered := []int64{}
+ for _, id := range state.RunningSet {
+ if _, yep := reallyFinished[id]; yep {
+ logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id)
+ } else {
+ filtered = append(filtered, id)
+ }
+ }
+ state.RunningSet = filtered
+ }
+
+ // Launch new invocations for each pending trigger.
+ if !triggersList.Empty() {
+ op, err := triggersSet.BeginPop(c, triggersList)
+ if err != nil {
+ return err
+ }
+ popOps = append(popOps, op)
+
+ batch := internal.LaunchInvocationsBatchTask{JobId: state.ID}
+ for _, trigger := range triggersList.Items {
+ if op.Pop(trigger.ID) {
+ logging.Infof(c, "Launching new launch-%d for trigger %s", state.NextInvID, trigger.ID)
+ state.RunningSet = append(state.RunningSet, state.NextInvID)
+ batch.InvId = append(batch.InvId, state.NextInvID)
+ state.NextInvID++
+ }
+ }
+ // Transactionally trigger a batch with new invocations.
+ if len(batch.InvId) != 0 {
+ if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch}); err != nil {
+ return err
+ }
+ }
+ }
+
+ // Submit set changes.
+ var err error
+ if cleanup, err = dsset.FinishPop(c, popOps...); err != nil {
+ return err
+ }
+
+ logging.Infof(c, "Running invocations - %v", state.RunningSet)
+
+ // If nothing is running, poke the cron machine. Maybe it wants to start
+ // something.
+ if len(state.RunningSet) == 0 {
+ err = pokeMachine(c, state, func(c context.Context, m *cron.Machine) error {
+ m.RewindIfNecessary()
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ // Done!
+ return datastore.Put(c, state)
+ }, nil)
+
+ if err == nil && len(cleanup) != 0 {
+ // Best effort cleanup of storage of consumed items.
+ logging.Infof(c, "Cleaning up storage of %d items", len(cleanup))
+ if err := dsset.CleanupStorage(c, cleanup); err != nil {
+ logging.Warningf(c, "Best effort cleanup failed - %s", err)
+ }
+ }
+
+ return transient.Tag.Apply(err)
+}
+
+func handleBatchLaunch(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.LaunchInvocationsBatchTask)
+ logging.Infof(c, "Batch launch for %q", msg.JobId)
+
+ tasks := []*tq.Task{}
+ for _, invId := range msg.InvId {
+ logging.Infof(c, "Launching inv-%d", invId)
+ tasks = append(tasks, &tq.Task{
+ DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, invId),
+ Payload: &internal.LaunchInvocationTask{
+ JobId: msg.JobId,
+ InvId: invId,
+ },
+ })
+ }
+
+ return dispatcher.AddTask(c, tasks...)
+}
+
+func handleLaunchTask(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.LaunchInvocationTask)
+ logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvId)
+
+ // There can be more stuff here. But we just finish the invocation right away.
+
+ finishedSet := recentlyFinishedSet(c, msg.JobId)
+ err := finishedSet.Add(c, []dsset.Item{
+ {ID: fmt.Sprintf("%d", msg.InvId)},
})
+ if err != nil {
+ return err
+ }
+
+ // Kick the triage now that the set of running invocations has been modified.
+ return kickTriageTask(c, msg.JobId)
}
func init() {
r := router.New()
gaemiddleware.InstallHandlers(r)
- tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil)
- tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil)
- tasks.InstallRoutes(r, gaemiddleware.BaseProd())
+ dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil)
+ dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger, "triggers", nil)
+ dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "triages", nil)
+ dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBatchLaunch, "launches", nil)
+ dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTask, "invocations", nil)
+ dispatcher.InstallRoutes(r, gaemiddleware.BaseProd())
// Kick-start a bunch of jobs by visiting:
//
@@ -153,5 +409,13 @@ func init() {
}
})
+ r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
+ jobID := c.Params.ByName("JobID")
+ triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixNano())
+ if err := addTrigger(c.Context, jobID, triggerID); err != nil {
+ panic(err)
+ }
+ })
+
http.DefaultServeMux.Handle("/", r)
}
« no previous file with comments | « no previous file | scheduler/appengine/engine/cron/demo/queue.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698