Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1977)

Unified Diff: appengine/tumble/process.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/tumble/model_mutation.go ('k') | appengine/tumble/tumble.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/tumble/process.go
diff --git a/appengine/tumble/process.go b/appengine/tumble/process.go
new file mode 100644
index 0000000000000000000000000000000000000000..5011a5a98f8a603c062909c7f8b1b8c3dcf13231
--- /dev/null
+++ b/appengine/tumble/process.go
@@ -0,0 +1,338 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package tumble
+
+import (
+ "bytes"
+ "fmt"
+ "math"
+ "net/http"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/filter/txnBuf"
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/datastore/serialize"
+ "github.com/luci/gae/service/memcache"
+ "github.com/luci/luci-go/appengine/memlock"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/parallel"
+ "github.com/luci/luci-go/common/stringset"
+ "golang.org/x/net/context"
+)
+
+// expandedShardBounds returns the boundary of the expandedShard query that
+// currently corresponds to this shard number. If Shard is < 0 or > NumShards
+// (the currently configured number of shards), this will return a low > high.
+// Otherwise low < high.
+func expandedShardBounds(c context.Context, shard uint64) (low, high int64) {
+ cfg := GetConfig(c)
+
+ if shard < 0 || uint64(shard) >= cfg.NumShards {
+ logging.Warningf(c, "Invalid shard: %d", shard)
+ // return inverted bounds
+ return 0, -1
+ }
+
+ expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards)
+ low = math.MinInt64 + (int64(shard) * expandedShardsPerShard)
+ if uint64(shard) == cfg.NumShards-1 {
+ high = math.MaxInt64
+ } else {
+ high = low + expandedShardsPerShard
+ }
+ return
+}
+
+var dustSettleTimeout = 2 * time.Second
+
+// ProcessShardHandler is a http handler suitable for installation into
+// a httprouter. It expects `logging` and `luci/gae` services to be installed
+// into the context.
+//
+// ProcessShardHandler verifies that its being run as a taskqueue task and that
+// the following parameters exist and are well-formed:
+// * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
+// * shard_id: decimal-encoded shard identifier.
+//
+// ProcessShardHandler then invokes ProcessShard with the parsed parameters.
+func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
+ tstampStr := p.ByName("timestamp")
+ sidStr := p.ByName("shard_id")
+
+ tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
+ if err != nil {
+ logging.Errorf(c, "bad timestamp %q", tstampStr)
+ rw.WriteHeader(http.StatusNotFound)
+ fmt.Fprintf(rw, "bad timestamp")
+ return
+ }
+
+ sid, err := strconv.ParseUint(sidStr, 10, 64)
+ if err != nil {
+ logging.Errorf(c, "bad shardID %q", tstampStr)
+ rw.WriteHeader(http.StatusNotFound)
+ fmt.Fprintf(rw, "bad shardID")
+ return
+ }
+
+ err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid)
+ if err != nil {
+ rw.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(rw, "error: %s", err)
+ } else {
+ rw.Write([]byte("ok"))
+ }
+}
+
+// ProcessShard is the tumble backend endpoint. This accepts a shard number which
+// is expected to be < GlobalConfig.NumShards.
+func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error {
+ low, high := expandedShardBounds(c, shard)
+ if low > high {
+ return nil
+ }
+
+ l := logging.Get(logging.SetField(c, "shard", shard))
+
+ cfg := GetConfig(c)
+
+ lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard)
+ clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard)
+
+ // this last key allows buffered tasks to early exit if some other shard
+ // processor has already processed past this task's target timestamp.
+ lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard)
+ mc := memcache.Get(c)
+ lastItm, err := mc.Get(lastKey)
+ if err != nil {
+ if err != memcache.ErrCacheMiss {
+ l.Warningf("couldn't obtain last timestamp: %s", err)
+ }
+ } else {
+ val := lastItm.Value()
+ last, err := serialize.ReadTime(bytes.NewBuffer(val))
+ if err != nil {
+ l.Warningf("could not decode timestamp %v: %s", val, err)
+ } else {
+ last = last.Add(cfg.TemporalRoundFactor)
+ if timestamp.After(last) {
+ l.Infof("early exit, %s > %s", timestamp, last)
+ return nil
+ }
+ }
+ }
+ err = nil
+
+ q := datastore.NewQuery("tumble.Mutation").
+ Gte("ExpandedShard", low).Lte("ExpandedShard", high).
+ Project("TargetRoot").Distinct(true).
+ Limit(cfg.ProcessMaxBatchSize)
+
+ banSets := map[string]stringset.Set{}
+
+ limitSemaphore := make(chan struct{}, cfg.NumGoroutines)
+
+ for try := 0; try < 2; try++ {
+ err = memlock.TryWithLock(c, lockKey, clientID, func(c context.Context) error {
+ l.Infof("Got lock (try %d)", try)
+
+ for {
+ processCounters := []*int64{}
+ err := parallel.FanOutIn(func(ch chan<- func() error) {
+ err := datastore.Get(c).Run(q, func(pm datastore.PropertyMap, _ datastore.CursorCB) bool {
+ root := pm["TargetRoot"][0].Value().(*datastore.Key)
+ encRoot := root.Encode()
+
+ // TODO(riannucci): make banSets remove keys from the banSet which
+ // weren't hit. Once they stop showing up, they'll never show up
+ // again.
+
+ bs := banSets[encRoot]
+ if bs == nil {
+ bs = stringset.New(0)
+ banSets[encRoot] = bs
+ }
+ counter := new(int64)
+ processCounters = append(processCounters, counter)
+
+ ch <- func() error {
+ limitSemaphore <- struct{}{}
+ defer func() {
+ <-limitSemaphore
+ }()
+ return processRoot(c, root, bs, counter)
+ }
+
+ select {
+ case <-c.Done():
+ l.Warningf("Lost lock!")
+ return false
+ default:
+ return true
+ }
+ })
+ if err != nil {
+ l.Errorf("Failure to query: %s", err)
+ ch <- func() error {
+ return err
+ }
+ }
+ })
+ if err != nil {
+ return err
+ }
+ numProcessed := int64(0)
+ for _, n := range processCounters {
+ numProcessed += *n
+ }
+ l.Infof("cumulatively processed %d items", numProcessed)
+ if numProcessed == 0 {
+ break
+ }
+
+ err = mc.Set(mc.NewItem(lastKey).SetValue(serialize.ToBytes(clock.Now(c))))
+ if err != nil {
+ l.Warningf("could not update last process memcache key %s: %s", lastKey, err)
+ }
+
+ clock.Sleep(c, dustSettleTimeout)
+ }
+ return nil
+ })
+ if err != memlock.ErrFailedToLock {
+ break
+ }
+ l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1)
+ clock.Sleep(c, time.Second*2)
+ }
+ if err == memlock.ErrFailedToLock {
+ l.Infof("Couldn't obtain lock (giving up): %s", err)
+ err = nil
+ }
+ return err
+}
+
+func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set) ([]*realMutation, error) {
+ cfg := GetConfig(c)
+ ds := datastore.Get(c)
+ q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root)
+ toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize)
+ err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool {
+ if !banSet.Has(k.Encode()) {
+ toFetch = append(toFetch, &realMutation{
+ ID: k.StringID(),
+ Parent: k.Parent(),
+ })
+ }
+ return len(toFetch) < cap(toFetch)
+ })
+ return toFetch, err
+}
+
+func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore.Key, []Mutation, error) {
+ ds := datastore.Get(c)
+
+ mutKeys := make([]*datastore.Key, 0, len(rms))
+ muts := make([]Mutation, 0, len(rms))
+ err := ds.GetMulti(rms)
+ me, ok := err.(errors.MultiError)
+ if !ok && err != nil {
+ return nil, nil, err
+ }
+
+ for i, rm := range rms {
+ err = nil
+ if me != nil {
+ err = me[i]
+ }
+ if err == nil {
+ if rm.Version != getAppVersion(c) {
+ logging.Fields{
+ "mut_version": rm.Version,
+ "cur_version": getAppVersion(c),
+ }.Warningf(c, "loading mutation with different code version")
+ }
+ m, err := rm.GetMutation()
+ if err != nil {
+ logging.Errorf(c, "couldn't load mutation: %s", err)
+ continue
+ }
+ muts = append(muts, m)
+ mutKeys = append(mutKeys, ds.KeyForObj(rm))
+ } else if err != datastore.ErrNoSuchEntity {
+ return nil, nil, me
+ }
+ }
+
+ return mutKeys, muts, nil
+}
+
+func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, counter *int64) error {
+ l := logging.Get(c)
+
+ toFetch, err := getBatchByRoot(c, root, banSet)
+ if err != nil || len(toFetch) == 0 {
+ return err
+ }
+
+ mutKeys, muts, err := loadFilteredMutations(c, toFetch)
+ if err != nil {
+ return err
+ }
+
+ select {
+ case <-c.Done():
+ l.Warningf("Lost lock during processRoot")
+ default:
+ }
+
+ allShards := map[uint64]struct{}{}
+
+ toDel := make([]*datastore.Key, 0, len(muts))
+ numMuts := uint64(0)
+ err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context.Context) error {
+ toDel = toDel[:0]
+ numMuts = 0
+
+ for i, m := range muts {
+ shards, numNewMuts, err := enterTransactionInternal(c, root, m.RollForward)
+ if err != nil {
+ l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m)
+ continue
+ }
+ toDel = append(toDel, mutKeys[i])
+ numMuts += uint64(numNewMuts)
+ for shard := range shards {
+ allShards[shard] = struct{}{}
+ }
+ }
+
+ return nil
+ }, nil)
+ if err != nil {
+ l.Errorf("failed running transaction: %s", err)
+ return err
+ }
+ fireTasks(c, allShards)
+ l.Infof("successfully processed %d mutations, adding %d more", len(toDel), numMuts)
+
+ if len(toDel) > 0 {
+ atomic.StoreInt64(counter, int64(len(toDel)))
+
+ for _, k := range toDel {
+ banSet.Add(k.Encode())
+ }
+ if err := datastore.Get(c).DeleteMulti(toDel); err != nil {
+ l.Warningf("error deleting finished mutations: %s", err)
+ }
+ }
+
+ return nil
+}
« no previous file with comments | « appengine/tumble/model_mutation.go ('k') | appengine/tumble/tumble.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698