| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package coordinator |
| 6 |
| 7 import ( |
| 8 "sync/atomic" |
| 9 "time" |
| 10 |
| 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/logdog/types" |
| 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/lru" |
| 15 "github.com/luci/luci-go/common/promise" |
| 16 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 17 "golang.org/x/net/context" |
| 18 ) |
| 19 |
| 20 const ( |
| 21 // DefaultSize is the default (maximum) size of the LRU cache. |
| 22 DefaultSize = 1024 * 1024 |
| 23 |
| 24 // DefaultExpiration is the default expiration value. |
| 25 DefaultExpiration = 10 * time.Minute |
| 26 ) |
| 27 |
| 28 // cache is a Coordinator interface implementation for the Collector service |
| 29 // that caches remote results locally. |
| 30 type cache struct { |
| 31 Coordinator |
| 32 |
| 33 // Size is the number of stream states to hold in the cache. If zero, |
| 34 // DefaultCacheSize will be used. |
| 35 size int |
| 36 |
| 37 // expiration is the maximum lifespan of a cache entry. If an entry is o
lder |
| 38 // than this, it will be discarded. If zero, DefaultExpiration will be u
sed. |
| 39 expiration time.Duration |
| 40 |
| 41 // cache is the LRU state cache. |
| 42 lru *lru.Cache |
| 43 } |
| 44 |
| 45 // NewCache creates a new Coordinator instance that wraps another Coordinator |
| 46 // instance with a cache that retains the latest remote Coordiantor state in a |
| 47 // client-side LRU cache. |
| 48 func NewCache(c Coordinator, size int, expiration time.Duration) Coordinator { |
| 49 if size <= 0 { |
| 50 size = DefaultSize |
| 51 } |
| 52 if expiration <= 0 { |
| 53 expiration = DefaultExpiration |
| 54 } |
| 55 |
| 56 return &cache{ |
| 57 Coordinator: c, |
| 58 expiration: expiration, |
| 59 lru: lru.New(size), |
| 60 } |
| 61 } |
| 62 |
| 63 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and |
| 64 // caches the result. It uses a Promise to cause all simultaneous identical |
| 65 // RegisterStream requests to block on a single RPC. |
| 66 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb
.LogStreamDescriptor) ( |
| 67 *LogStreamState, error) { |
| 68 now := clock.Now(ctx) |
| 69 |
| 70 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o
r |
| 71 // we're forcing, ignore any existing entry and replace with a Promise p
ending |
| 72 // Coordinator sync. |
| 73 entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} { |
| 74 // Don't replace an existing entry, unless it has an error or ha
s expired. |
| 75 if current != nil { |
| 76 curEntry := current.(*cacheEntry) |
| 77 if !curEntry.hasError() && now.Before(curEntry.expiresAt
) { |
| 78 return current |
| 79 } |
| 80 } |
| 81 |
| 82 p := promise.New(func() (interface{}, error) { |
| 83 st, err := c.Coordinator.RegisterStream(ctx, st, d) |
| 84 if err != nil { |
| 85 return nil, err |
| 86 } |
| 87 |
| 88 return &LogStreamState{ |
| 89 Path: st.Path, |
| 90 ProtoVersion: st.ProtoVersion, |
| 91 Secret: st.Secret, |
| 92 TerminalIndex: types.MessageIndex(st.TerminalInd
ex), |
| 93 Archived: st.Archived, |
| 94 Purged: st.Purged, |
| 95 }, nil |
| 96 }) |
| 97 |
| 98 return &cacheEntry{ |
| 99 terminalIndex: -1, |
| 100 p: p, |
| 101 path: st.Path, |
| 102 expiresAt: now.Add(c.expiration), |
| 103 } |
| 104 }).(*cacheEntry) |
| 105 |
| 106 // If there was an error, purge the erroneous entry from the cache so th
at |
| 107 // the next "update" will re-fetch it. |
| 108 st, err := entry.get(ctx) |
| 109 if err != nil { |
| 110 log.Fields{ |
| 111 log.ErrorKey: err, |
| 112 }.Errorf(ctx, "Error retrieving stream state.") |
| 113 return nil, err |
| 114 } |
| 115 return st, nil |
| 116 } |
| 117 |
| 118 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error { |
| 119 // Immediately update our state cache to record the terminal index, if |
| 120 // we have a state cache. |
| 121 c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) { |
| 122 // Always return the current entry. We're just atomically examin
ing it to |
| 123 // load it with a terminal index. |
| 124 r = current |
| 125 if r != nil { |
| 126 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex) |
| 127 } |
| 128 return |
| 129 }) |
| 130 |
| 131 return c.Coordinator.TerminateStream(ctx, st) |
| 132 } |
| 133 |
| 134 // cacheEntry is the value stored in the cache. It contains a Promise |
| 135 // representing the value and an optional invalidation singleton to ensure that |
| 136 // if the state failed to fetch, it will be invalidated at most once. |
| 137 // |
| 138 // In addition to remote caching via Promise, the state can be updated locally |
| 139 // by calling the cache's "put" method. In this case, the Promise will be nil, |
| 140 // and the state value will be populated. |
| 141 type cacheEntry struct { |
| 142 // terminalIndex is the loaded terminal index set via loadTerminalIndex.
It |
| 143 // will be applied to returned LogStreamState objects so that once a ter
minal |
| 144 // index has been set, we become aware of it in the Collector. |
| 145 // |
| 146 // This MUST be the first field in the struct in order to comply with at
omic's |
| 147 // 64-bit alignment requirements. |
| 148 terminalIndex int64 |
| 149 |
| 150 // p is a Promise that is blocking pending a Coordiantor stream state |
| 151 // response. Upon successful resolution, it will contain a *LogStreamSta
te. |
| 152 p promise.Promise |
| 153 path types.StreamPath |
| 154 expiresAt time.Time |
| 155 } |
| 156 |
| 157 // get returns the cached state that this entry owns, blocking until resolution |
| 158 // if necessary. |
| 159 // |
| 160 // The returned state is a shallow copy of the cached state, and may be |
| 161 // modified by the caller. |
| 162 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) { |
| 163 promisedSt, err := e.p.Get(ctx) |
| 164 if err != nil { |
| 165 return nil, err |
| 166 } |
| 167 |
| 168 // Create a clone of our cached value (not deep, so secret is not cloned
, but |
| 169 // the Collector will not modify that). If we have a local terminal inde
x |
| 170 // cached, apply that to the response. |
| 171 // |
| 172 // We need to lock around our terminalIndex. |
| 173 rp := *(promisedSt.(*LogStreamState)) |
| 174 if rp.TerminalIndex < 0 { |
| 175 rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin
alIndex)) |
| 176 } |
| 177 |
| 178 return &rp, nil |
| 179 } |
| 180 |
| 181 // hasError tests if this entry has completed evaluation with an error state. |
| 182 // This is non-blocking, so if the evaluation hasn't completed, it will return |
| 183 // false. |
| 184 func (e *cacheEntry) hasError() bool { |
| 185 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData { |
| 186 return true |
| 187 } |
| 188 return false |
| 189 } |
| 190 |
| 191 // loadTerminalIndex loads a local cache of the stream's terminal index. This |
| 192 // will be applied to all future get requests. |
| 193 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) { |
| 194 atomic.StoreInt64(&e.terminalIndex, int64(tidx)) |
| 195 } |
| OLD | NEW |