| Index: go/src/infra/gae/libs/memlock/memlock.go
|
| diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6866cbf0bd3d28f802bc8ef7e6eb8b64f301eca3
|
| --- /dev/null
|
| +++ b/go/src/infra/gae/libs/memlock/memlock.go
|
| @@ -0,0 +1,168 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +// Package memlock allows multiple appengine handlers to coordinate best-effort
|
| +// mutual execution via memcache. "best-effort" here means "best-effort"...
|
| +// memcache is not reliable. However, colliding on memcache is a lot cheaper
|
| +// than, for example, colliding with datastore transactions.
|
| +package memlock
|
| +
|
| +import (
|
| + "bytes"
|
| + "errors"
|
| + "infra/gae/libs/wrapper"
|
| + "sync/atomic"
|
| + "time"
|
| +
|
| + "github.com/luci/luci-go/common/logging"
|
| + "golang.org/x/net/context"
|
| +
|
| + "appengine/memcache"
|
| +)
|
| +
|
| +// ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
|
| +// prior to invoking the user-supplied function.
|
| +var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
|
| +
|
| +// ErrEmptyClientID is returned from TryWithLock when you specify an empty
|
| +// clientID.
|
| +var ErrEmptyClientID = errors.New("memlock: empty clientID")
|
| +
|
| +// memlockKeyPrefix is the memcache Key prefix for all user-supplied keys.
|
| +const memlockKeyPrefix = "memlock:"
|
| +
|
| +type checkOp string
|
| +
|
| +// var so we can override it in the tests
|
| +var delay = time.Second
|
| +
|
| +const (
|
| + release checkOp = "release"
|
| + refresh = "refresh"
|
| +)
|
| +
|
| +// memcacheLockTime is the expiration time of the memcache entry. If the lock
|
| +// is correctly released, then it will be released before this time. It's a
|
| +// var so we can override it in the tests.
|
| +var memcacheLockTime = 16 * time.Second
|
| +
|
| +// TryWithLock attempts to obtains the lock once, and then invokes f if
|
| +// sucessful. The `check` function can be used within f to see if the lock is
|
| +// still held.
|
| +//
|
| +// TryWithLock function returns ErrFailedToLock if it fails to obtain the lock,
|
| +// otherwise returns the error that f returns.
|
| +//
|
| +// `key` is the memcache key to use (i.e. the name of the lock). Clients locking
|
| +// the same data must use the same key. clientID is the unique identifier for
|
| +// this client (lock-holder). If it's empty then TryWithLock() will return
|
| +// ErrEmptyClientID.
|
| +func TryWithLock(c context.Context, key, clientID string, f func(check func() bool) error) error {
|
| + if len(clientID) == 0 {
|
| + return ErrEmptyClientID
|
| + }
|
| +
|
| + c = logging.SetField(c, "key", key)
|
| + c = logging.SetField(c, "clientID", clientID)
|
| + log := logging.Get(c)
|
| + mc := wrapper.GetMC(c)
|
| +
|
| + key = memlockKeyPrefix + key
|
| + cid := []byte(clientID)
|
| +
|
| + // checkAnd gets the current value from memcache, and then attempts to do the
|
| + // checkOp (which can either be `refresh` or `release`). These pieces of
|
| + // functionality are necessarially intertwined, because CAS only works with
|
| + // the exact-same *Item which was returned from a Get.
|
| + //
|
| + // refresh will attempt to CAS the item with the same content to reset it's
|
| + // timeout.
|
| + //
|
| + // release will attempt to CAS the item to remove it's contents (clientID).
|
| + // another lock observing an empty clientID will know that the lock is
|
| + // obtainable.
|
| + checkAnd := func(op checkOp) bool {
|
| + itm, err := mc.Get(key)
|
| + if err != nil {
|
| + log.Warningf("error getting: %s", err)
|
| + return false
|
| + }
|
| +
|
| + if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) {
|
| + log.Infof("lock owned by %q", string(itm.Value))
|
| + return false
|
| + }
|
| +
|
| + if op == refresh {
|
| + itm.Value = cid
|
| + itm.Expiration = memcacheLockTime
|
| + } else {
|
| + if len(itm.Value) == 0 {
|
| + // it's already unlocked, no need to CAS
|
| + log.Infof("lock already released")
|
| + return true
|
| + }
|
| + itm.Value = []byte{}
|
| + itm.Expiration = delay
|
| + }
|
| +
|
| + err = mc.CompareAndSwap(itm)
|
| + if err != nil {
|
| + log.Warningf("failed to %s lock: %q", op, err)
|
| + return false
|
| + }
|
| +
|
| + return true
|
| + }
|
| +
|
| + // Now the actual logic begins. First we 'Add' the item, which will set it if
|
| + // it's not present in the memcache, otherwise leaves it alone.
|
| + err := mc.Add(&memcache.Item{
|
| + Key: key, Value: cid, Expiration: memcacheLockTime})
|
| + if err != nil {
|
| + if err != memcache.ErrNotStored {
|
| + log.Warningf("error adding: %s", err)
|
| + }
|
| + if !checkAnd(refresh) {
|
| + return ErrFailedToLock
|
| + }
|
| + }
|
| +
|
| + // At this point we nominally have the lock (at least for memcacheLockTime).
|
| +
|
| + stopChan := make(chan struct{})
|
| + stoppedChan := make(chan struct{})
|
| + held := uint32(1)
|
| +
|
| + defer func() {
|
| + close(stopChan)
|
| + <-stoppedChan // this blocks TryWithLock until the goroutine below quits.
|
| + }()
|
| +
|
| + // This goroutine checks to see if we still posess the lock, and refreshes it
|
| + // if we do. It will stop doing this when either stopChan is activated (e.g.
|
| + // the user's function returns) or we lose the lock (memcache flake, etc.).
|
| + go func() {
|
| + defer close(stoppedChan)
|
| +
|
| + checkLoop:
|
| + for {
|
| + select {
|
| + case <-stopChan:
|
| + break checkLoop
|
| + case <-time.After(delay):
|
| + }
|
| + if !checkAnd(refresh) {
|
| + atomic.StoreUint32(&held, 0)
|
| + log.Warningf("lost lock: %s", err)
|
| + break
|
| + }
|
| + }
|
| +
|
| + checkAnd(release)
|
| + atomic.StoreUint32(&held, 0)
|
| + }()
|
| +
|
| + return f(func() bool { return atomic.LoadUint32(&held) == 1 })
|
| +}
|
|
|