Index: go/src/infra/gae/libs/memlock/memlock.go |
diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6866cbf0bd3d28f802bc8ef7e6eb8b64f301eca3 |
--- /dev/null |
+++ b/go/src/infra/gae/libs/memlock/memlock.go |
@@ -0,0 +1,168 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// Package memlock allows multiple appengine handlers to coordinate best-effort |
+// mutual execution via memcache. "best-effort" here means "best-effort"... |
+// memcache is not reliable. However, colliding on memcache is a lot cheaper |
+// than, for example, colliding with datastore transactions. |
+package memlock |
+ |
+import ( |
+ "bytes" |
+ "errors" |
+ "infra/gae/libs/wrapper" |
+ "sync/atomic" |
+ "time" |
+ |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+ |
+ "appengine/memcache" |
+) |
+ |
+// ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
+// prior to invoking the user-supplied function. |
+var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
+ |
+// ErrEmptyClientID is returned from TryWithLock when you specify an empty |
+// clientID. |
+var ErrEmptyClientID = errors.New("memlock: empty clientID") |
+ |
+// memlockKeyPrefix is the memcache Key prefix for all user-supplied keys. |
+const memlockKeyPrefix = "memlock:" |
+ |
+type checkOp string |
+ |
+// var so we can override it in the tests |
+var delay = time.Second |
+ |
+const ( |
+ release checkOp = "release" |
+ refresh = "refresh" |
+) |
+ |
+// memcacheLockTime is the expiration time of the memcache entry. If the lock |
+// is correctly released, then it will be released before this time. It's a |
+// var so we can override it in the tests. |
+var memcacheLockTime = 16 * time.Second |
+ |
+// TryWithLock attempts to obtains the lock once, and then invokes f if |
+// sucessful. The `check` function can be used within f to see if the lock is |
+// still held. |
+// |
+// TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, |
+// otherwise returns the error that f returns. |
+// |
+// `key` is the memcache key to use (i.e. the name of the lock). Clients locking |
+// the same data must use the same key. clientID is the unique identifier for |
+// this client (lock-holder). If it's empty then TryWithLock() will return |
+// ErrEmptyClientID. |
+func TryWithLock(c context.Context, key, clientID string, f func(check func() bool) error) error { |
+ if len(clientID) == 0 { |
+ return ErrEmptyClientID |
+ } |
+ |
+ c = logging.SetField(c, "key", key) |
+ c = logging.SetField(c, "clientID", clientID) |
+ log := logging.Get(c) |
+ mc := wrapper.GetMC(c) |
+ |
+ key = memlockKeyPrefix + key |
+ cid := []byte(clientID) |
+ |
+ // checkAnd gets the current value from memcache, and then attempts to do the |
+ // checkOp (which can either be `refresh` or `release`). These pieces of |
+ // functionality are necessarially intertwined, because CAS only works with |
+ // the exact-same *Item which was returned from a Get. |
+ // |
+ // refresh will attempt to CAS the item with the same content to reset it's |
+ // timeout. |
+ // |
+ // release will attempt to CAS the item to remove it's contents (clientID). |
+ // another lock observing an empty clientID will know that the lock is |
+ // obtainable. |
+ checkAnd := func(op checkOp) bool { |
+ itm, err := mc.Get(key) |
+ if err != nil { |
+ log.Warningf("error getting: %s", err) |
+ return false |
+ } |
+ |
+ if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { |
+ log.Infof("lock owned by %q", string(itm.Value)) |
+ return false |
+ } |
+ |
+ if op == refresh { |
+ itm.Value = cid |
+ itm.Expiration = memcacheLockTime |
+ } else { |
+ if len(itm.Value) == 0 { |
+ // it's already unlocked, no need to CAS |
+ log.Infof("lock already released") |
+ return true |
+ } |
+ itm.Value = []byte{} |
+ itm.Expiration = delay |
+ } |
+ |
+ err = mc.CompareAndSwap(itm) |
+ if err != nil { |
+ log.Warningf("failed to %s lock: %q", op, err) |
+ return false |
+ } |
+ |
+ return true |
+ } |
+ |
+ // Now the actual logic begins. First we 'Add' the item, which will set it if |
+ // it's not present in the memcache, otherwise leaves it alone. |
+ err := mc.Add(&memcache.Item{ |
+ Key: key, Value: cid, Expiration: memcacheLockTime}) |
+ if err != nil { |
+ if err != memcache.ErrNotStored { |
+ log.Warningf("error adding: %s", err) |
+ } |
+ if !checkAnd(refresh) { |
+ return ErrFailedToLock |
+ } |
+ } |
+ |
+ // At this point we nominally have the lock (at least for memcacheLockTime). |
+ |
+ stopChan := make(chan struct{}) |
+ stoppedChan := make(chan struct{}) |
+ held := uint32(1) |
+ |
+ defer func() { |
+ close(stopChan) |
+ <-stoppedChan // this blocks TryWithLock until the goroutine below quits. |
+ }() |
+ |
+ // This goroutine checks to see if we still posess the lock, and refreshes it |
+ // if we do. It will stop doing this when either stopChan is activated (e.g. |
+ // the user's function returns) or we lose the lock (memcache flake, etc.). |
+ go func() { |
+ defer close(stoppedChan) |
+ |
+ checkLoop: |
+ for { |
+ select { |
+ case <-stopChan: |
+ break checkLoop |
+ case <-time.After(delay): |
+ } |
+ if !checkAnd(refresh) { |
+ atomic.StoreUint32(&held, 0) |
+ log.Warningf("lost lock: %s", err) |
+ break |
+ } |
+ } |
+ |
+ checkAnd(release) |
+ atomic.StoreUint32(&held, 0) |
+ }() |
+ |
+ return f(func() bool { return atomic.LoadUint32(&held) == 1 }) |
+} |