Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(276)

Side by Side Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 986553002: A simple memcache lock for appengine. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@meta
Patch Set: remove unnecessary log, use blackhole logger Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Package memlock allows multiple appengine handlers to coordinate best-effort
6 // mutual execution via memcache. "best-effort" here means "best-effort"...
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper
8 // than, for example, colliding with datastore transactions.
9 package memlock
10
11 import (
12 "bytes"
13 "errors"
14 "infra/gae/libs/wrapper"
15 "sync/atomic"
16 "time"
17
18 "github.com/luci/luci-go/common/logging"
19 "golang.org/x/net/context"
20
21 "appengine/memcache"
22 )
23
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
25 // prior to invoking the user-supplied function.
26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
27
28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty
29 // clientID.
30 var ErrEmptyClientID = errors.New("memlock: empty clientID")
31
32 // memlockKeyPrefix is the memcache Key prefix for all user-supplied keys.
33 const memlockKeyPrefix = "memlock:"
34
35 type checkOp string
36
37 // var so we can override it in the tests
M-A Ruel 2015/05/31 20:35:10 delay is a var so ...
38 var delay = time.Second
39
40 const (
41 release checkOp = "release"
42 refresh = "refresh"
43 )
44
45 // memcacheLockTime is the expiration time of the memcache entry. If the lock
46 // is correctly released, then it will be released before this time. It's a
47 // var so we can override it in the tests.
48 var memcacheLockTime = 16 * time.Second
49
50 // TryWithLock attempts to obtains the lock once, and then invokes f if
51 // sucessful. The `check` function can be used within f to see if the lock is
52 // still held.
53 //
54 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock,
55 // otherwise returns the error that f returns.
56 //
57 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking
58 // the same data must use the same key. clientID is the unique identifier for
59 // this client (lock-holder). If it's empty then TryWithLock() will return
60 // ErrEmptyClientID.
61 func TryWithLock(c context.Context, key, clientID string, f func(check func() bo ol) error) error {
M-A Ruel 2015/05/31 20:35:11 A nicer way IMHO would be to have this signature i
iannucci 2015/05/31 21:07:57 The reason for the check function is so the code r
62 if len(clientID) == 0 {
63 return ErrEmptyClientID
64 }
65
66 log := logging.Get(c)
67 mc := wrapper.GetMC(c)
68
69 key = memlockKeyPrefix + key
70 cid := []byte(clientID)
71
72 // logf is a logging helper which includes the key and clientID in the l og
73 // message automatically. The function f is meant to be one of the metho ds
74 // in logging.Logger (e.g. `Logger.Warningf`).
75 logf := func(f func(string, ...interface{}), fmt string, args ...interfa ce{}) {
76 args = append([]interface{}{key, clientID}, args...)
M-A Ruel 2015/05/31 20:35:11 wouldn't it be simpler to do: return f(fmt.Sprint
iannucci 2015/05/31 21:07:57 This function is actually going away... I'm standa
77 f("memlock(%s:%q): "+fmt, args...)
78 }
79
80 // checkAnd gets the current value from memcache, and then attempts to d o the
81 // checkOp (which can either be `refresh` or `release`). These pieces of
82 // functionality are necessarially intertwined, because CAS only works w ith
83 // the exact-same *Item which was returned from a Get.
84 //
85 // refresh will attempt to CAS the item with the same content to reset i t's
86 // timeout.
87 //
88 // release will attempt to CAS the item to remove it's contents (clientI D).
89 // another lock observing an empty clientID will know that the lock is
90 // obtainable.
91 checkAnd := func(op checkOp) bool {
92 itm, err := mc.Get(key)
93 if err != nil {
94 logf(log.Warningf, "error getting: %s", err)
95 return false
96 }
97
98 if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) {
99 logf(log.Infof, "lock owned by %q", string(itm.Value))
100 return false
101 }
102
103 if op == refresh {
104 itm.Value = cid
105 itm.Expiration = memcacheLockTime
106 } else {
107 if len(itm.Value) == 0 {
108 // it's already unlocked, no need to CAS
109 logf(log.Infof, "lock already released")
110 return true
111 }
112 itm.Value = []byte{}
113 itm.Expiration = delay
114 }
115
116 err = mc.CompareAndSwap(itm)
117 if err != nil {
118 logf(log.Warningf, "failed to %s lock: %q", op, err)
119 return false
120 }
121
122 return true
123 }
124
125 // Now the actual logic begins. First we 'Add' the item, which will set it if
126 // it's not present in the memcache, otherwise leaves it alone.
127 err := mc.Add(&memcache.Item{
128 Key: key, Value: cid, Expiration: memcacheLockTime})
129 if err != nil {
130 if err != memcache.ErrNotStored {
131 logf(log.Warningf, "error adding: %s", err)
132 }
133 if !checkAnd(refresh) {
134 return ErrFailedToLock
135 }
136 }
137
138 // At this point we nominally have the lock (at least for memcacheLockTi me).
139
140 stopChan := make(chan struct{})
141 stoppedChan := make(chan struct{})
142 held := uint32(1)
143
144 defer func() {
145 close(stopChan)
146 <-stoppedChan // this blocks TryWithLock until the goroutine bel ow quits.
147 }()
148
149 // This goroutine checks to see if we still posess the lock, and refresh es it
150 // if we do. It will stop doing this when either stopChan is activated ( e.g.
151 // the user's function returns) or we lose the lock (memcache flake, etc .).
152 go func() {
153 defer close(stoppedChan)
154
155 checkLoop:
156 for {
157 select {
158 case <-stopChan:
159 break checkLoop
160 case <-time.After(delay):
161 }
162 if !checkAnd(refresh) {
163 atomic.StoreUint32(&held, 0)
164 logf(log.Warningf, "lost lock: %s", err)
165 break
166 }
167 }
168
169 checkAnd(release)
170 atomic.StoreUint32(&held, 0)
171 }()
172
173 return f(func() bool { return atomic.LoadUint32(&held) == 1 })
174 }
OLDNEW
« no previous file with comments | « no previous file | go/src/infra/gae/libs/memlock/memlock_test.go » ('j') | go/src/infra/gae/libs/wrapper/memory/memcache.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698