Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 986553002: A simple memcache lock for appengine. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@meta
Patch Set: rebase Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Package memlock allows multiple appengine handlers to coordinate best-effort
6 // mutual execution via memcache. "best-effort" here means "best-effort"...
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper
8 // than, for example, colliding with datastore transactions.
9 package memlock
10
11 import (
12 "bytes"
13 "errors"
14 "infra/gae/libs/wrapper"
15 "sync/atomic"
16 "time"
17
18 "github.com/luci/luci-go/common/logging"
19 "golang.org/x/net/context"
20
21 "appengine/memcache"
22 )
23
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
25 // prior to invoking the user-supplied function.
26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
27
28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty
29 // clientID.
30 var ErrEmptyClientID = errors.New("memlock: empty clientID")
31
32 // memlockKeyPrefix is the memcache Key prefix for all user-supplied keys.
33 const memlockKeyPrefix = "memlock:"
34
35 type checkOp string
36
37 // var so we can override it in the tests
38 var delay = time.Second
39
40 const (
41 release checkOp = "release"
42 refresh = "refresh"
43 )
44
45 // memcacheLockTime is the expiration time of the memcache entry. If the lock
46 // is correctly released, then it will be released before this time. It's a
47 // var so we can override it in the tests.
48 var memcacheLockTime = 16 * time.Second
49
50 // TryWithLock attempts to obtains the lock once, and then invokes f if
51 // sucessful. The `check` function can be used within f to see if the lock is
52 // still held.
53 //
54 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock,
55 // otherwise returns the error that f returns.
56 //
57 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking
58 // the same data must use the same key. clientID is the unique identifier for
59 // this client (lock-holder). If it's empty then TryWithLock() will return
60 // ErrEmptyClientID.
61 func TryWithLock(c context.Context, key, clientID string, f func(check func() bo ol) error) error {
62 if len(clientID) == 0 {
63 return ErrEmptyClientID
64 }
65
66 c = logging.SetField(c, "key", key)
67 c = logging.SetField(c, "clientID", clientID)
68 log := logging.Get(c)
69 mc := wrapper.GetMC(c)
70
71 key = memlockKeyPrefix + key
72 cid := []byte(clientID)
73
74 // checkAnd gets the current value from memcache, and then attempts to d o the
75 // checkOp (which can either be `refresh` or `release`). These pieces of
76 // functionality are necessarially intertwined, because CAS only works w ith
77 // the exact-same *Item which was returned from a Get.
78 //
79 // refresh will attempt to CAS the item with the same content to reset i t's
80 // timeout.
81 //
82 // release will attempt to CAS the item to remove it's contents (clientI D).
83 // another lock observing an empty clientID will know that the lock is
84 // obtainable.
85 checkAnd := func(op checkOp) bool {
86 itm, err := mc.Get(key)
87 if err != nil {
88 log.Warningf("error getting: %s", err)
89 return false
90 }
91
92 if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) {
93 log.Infof("lock owned by %q", string(itm.Value))
94 return false
95 }
96
97 if op == refresh {
98 itm.Value = cid
99 itm.Expiration = memcacheLockTime
100 } else {
101 if len(itm.Value) == 0 {
102 // it's already unlocked, no need to CAS
103 log.Infof("lock already released")
104 return true
105 }
106 itm.Value = []byte{}
107 itm.Expiration = delay
108 }
109
110 err = mc.CompareAndSwap(itm)
111 if err != nil {
112 log.Warningf("failed to %s lock: %q", op, err)
113 return false
114 }
115
116 return true
117 }
118
119 // Now the actual logic begins. First we 'Add' the item, which will set it if
120 // it's not present in the memcache, otherwise leaves it alone.
121 err := mc.Add(&memcache.Item{
122 Key: key, Value: cid, Expiration: memcacheLockTime})
123 if err != nil {
124 if err != memcache.ErrNotStored {
125 log.Warningf("error adding: %s", err)
126 }
127 if !checkAnd(refresh) {
128 return ErrFailedToLock
129 }
130 }
131
132 // At this point we nominally have the lock (at least for memcacheLockTi me).
133
134 stopChan := make(chan struct{})
135 stoppedChan := make(chan struct{})
136 held := uint32(1)
137
138 defer func() {
139 close(stopChan)
140 <-stoppedChan // this blocks TryWithLock until the goroutine bel ow quits.
141 }()
142
143 // This goroutine checks to see if we still posess the lock, and refresh es it
144 // if we do. It will stop doing this when either stopChan is activated ( e.g.
145 // the user's function returns) or we lose the lock (memcache flake, etc .).
146 go func() {
147 defer close(stoppedChan)
148
149 checkLoop:
150 for {
151 select {
152 case <-stopChan:
153 break checkLoop
154 case <-time.After(delay):
155 }
156 if !checkAnd(refresh) {
157 atomic.StoreUint32(&held, 0)
158 log.Warningf("lost lock: %s", err)
159 break
160 }
161 }
162
163 checkAnd(release)
164 atomic.StoreUint32(&held, 0)
165 }()
166
167 return f(func() bool { return atomic.LoadUint32(&held) == 1 })
168 }
OLDNEW
« DEPS ('K') | « DEPS ('k') | go/src/infra/gae/libs/memlock/memlock_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698