OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 // Package memlock allows multiple appengine handlers to coordinate best-effort | |
6 // mutual execution via memcache. "best-effort" here means "best-effort"... | |
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | |
8 // than, for example, colliding with datastore transactions. | |
9 package memlock | |
10 | |
11 import ( | |
12 "bytes" | |
13 "errors" | |
14 "infra/gae/libs/wrapper" | |
15 "sync/atomic" | |
16 "time" | |
17 | |
18 "github.com/luci/luci-go/common/logging" | |
19 "golang.org/x/net/context" | |
20 | |
21 "appengine/memcache" | |
22 ) | |
23 | |
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | |
25 // prior to invoking the user-supplied function. | |
26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") | |
27 | |
28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty | |
29 // clientID. | |
30 var ErrEmptyClientID = errors.New("memlock: empty clientID") | |
31 | |
32 // memlockKeyPrefix is the memcache Key prefix for all user-supplied keys. | |
33 const memlockKeyPrefix = "memlock:" | |
34 | |
35 type checkOp string | |
36 | |
37 // var so we can override it in the tests | |
M-A Ruel
2015/05/31 20:35:10
delay is a var so ...
| |
38 var delay = time.Second | |
39 | |
40 const ( | |
41 release checkOp = "release" | |
42 refresh = "refresh" | |
43 ) | |
44 | |
45 // memcacheLockTime is the expiration time of the memcache entry. If the lock | |
46 // is correctly released, then it will be released before this time. It's a | |
47 // var so we can override it in the tests. | |
48 var memcacheLockTime = 16 * time.Second | |
49 | |
50 // TryWithLock attempts to obtains the lock once, and then invokes f if | |
51 // sucessful. The `check` function can be used within f to see if the lock is | |
52 // still held. | |
53 // | |
54 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, | |
55 // otherwise returns the error that f returns. | |
56 // | |
57 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking | |
58 // the same data must use the same key. clientID is the unique identifier for | |
59 // this client (lock-holder). If it's empty then TryWithLock() will return | |
60 // ErrEmptyClientID. | |
61 func TryWithLock(c context.Context, key, clientID string, f func(check func() bo ol) error) error { | |
M-A Ruel
2015/05/31 20:35:11
A nicer way IMHO would be to have this signature i
iannucci
2015/05/31 21:07:57
The reason for the check function is so the code r
| |
62 if len(clientID) == 0 { | |
63 return ErrEmptyClientID | |
64 } | |
65 | |
66 log := logging.Get(c) | |
67 mc := wrapper.GetMC(c) | |
68 | |
69 key = memlockKeyPrefix + key | |
70 cid := []byte(clientID) | |
71 | |
72 // logf is a logging helper which includes the key and clientID in the l og | |
73 // message automatically. The function f is meant to be one of the metho ds | |
74 // in logging.Logger (e.g. `Logger.Warningf`). | |
75 logf := func(f func(string, ...interface{}), fmt string, args ...interfa ce{}) { | |
76 args = append([]interface{}{key, clientID}, args...) | |
M-A Ruel
2015/05/31 20:35:11
wouldn't it be simpler to do:
return f(fmt.Sprint
iannucci
2015/05/31 21:07:57
This function is actually going away... I'm standa
| |
77 f("memlock(%s:%q): "+fmt, args...) | |
78 } | |
79 | |
80 // checkAnd gets the current value from memcache, and then attempts to d o the | |
81 // checkOp (which can either be `refresh` or `release`). These pieces of | |
82 // functionality are necessarially intertwined, because CAS only works w ith | |
83 // the exact-same *Item which was returned from a Get. | |
84 // | |
85 // refresh will attempt to CAS the item with the same content to reset i t's | |
86 // timeout. | |
87 // | |
88 // release will attempt to CAS the item to remove it's contents (clientI D). | |
89 // another lock observing an empty clientID will know that the lock is | |
90 // obtainable. | |
91 checkAnd := func(op checkOp) bool { | |
92 itm, err := mc.Get(key) | |
93 if err != nil { | |
94 logf(log.Warningf, "error getting: %s", err) | |
95 return false | |
96 } | |
97 | |
98 if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { | |
99 logf(log.Infof, "lock owned by %q", string(itm.Value)) | |
100 return false | |
101 } | |
102 | |
103 if op == refresh { | |
104 itm.Value = cid | |
105 itm.Expiration = memcacheLockTime | |
106 } else { | |
107 if len(itm.Value) == 0 { | |
108 // it's already unlocked, no need to CAS | |
109 logf(log.Infof, "lock already released") | |
110 return true | |
111 } | |
112 itm.Value = []byte{} | |
113 itm.Expiration = delay | |
114 } | |
115 | |
116 err = mc.CompareAndSwap(itm) | |
117 if err != nil { | |
118 logf(log.Warningf, "failed to %s lock: %q", op, err) | |
119 return false | |
120 } | |
121 | |
122 return true | |
123 } | |
124 | |
125 // Now the actual logic begins. First we 'Add' the item, which will set it if | |
126 // it's not present in the memcache, otherwise leaves it alone. | |
127 err := mc.Add(&memcache.Item{ | |
128 Key: key, Value: cid, Expiration: memcacheLockTime}) | |
129 if err != nil { | |
130 if err != memcache.ErrNotStored { | |
131 logf(log.Warningf, "error adding: %s", err) | |
132 } | |
133 if !checkAnd(refresh) { | |
134 return ErrFailedToLock | |
135 } | |
136 } | |
137 | |
138 // At this point we nominally have the lock (at least for memcacheLockTi me). | |
139 | |
140 stopChan := make(chan struct{}) | |
141 stoppedChan := make(chan struct{}) | |
142 held := uint32(1) | |
143 | |
144 defer func() { | |
145 close(stopChan) | |
146 <-stoppedChan // this blocks TryWithLock until the goroutine bel ow quits. | |
147 }() | |
148 | |
149 // This goroutine checks to see if we still posess the lock, and refresh es it | |
150 // if we do. It will stop doing this when either stopChan is activated ( e.g. | |
151 // the user's function returns) or we lose the lock (memcache flake, etc .). | |
152 go func() { | |
153 defer close(stoppedChan) | |
154 | |
155 checkLoop: | |
156 for { | |
157 select { | |
158 case <-stopChan: | |
159 break checkLoop | |
160 case <-time.After(delay): | |
161 } | |
162 if !checkAnd(refresh) { | |
163 atomic.StoreUint32(&held, 0) | |
164 logf(log.Warningf, "lost lock: %s", err) | |
165 break | |
166 } | |
167 } | |
168 | |
169 checkAnd(release) | |
170 atomic.StoreUint32(&held, 0) | |
171 }() | |
172 | |
173 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | |
174 } | |
OLD | NEW |