Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(67)

Side by Side Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "sync/atomic"
9 "time"
10
11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/logdog/types"
13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/lru"
15 "github.com/luci/luci-go/common/promise"
16 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "golang.org/x/net/context"
18 )
19
20 const (
21 // DefaultSize is the default (maximum) size of the LRU cache.
22 DefaultSize = 1024 * 1024
23
24 // DefaultExpiration is the default expiration value.
25 DefaultExpiration = 10 * time.Minute
26 )
27
28 // cache is a Coordinator interface implementation for the Collector service
29 // that caches remote results locally.
30 type cache struct {
31 Coordinator
32
33 // Size is the number of stream states to hold in the cache. If zero,
34 // DefaultCacheSize will be used.
35 size int
36
37 // expiration is the maximum lifespan of a cache entry. If an entry is o lder
38 // than this, it will be discarded. If zero, DefaultExpiration will be u sed.
39 expiration time.Duration
40
41 // cache is the LRU state cache.
42 lru *lru.Cache
43 }
44
45 // NewCache creates a new Coordinator instance that wraps another Coordinator
46 // instance with a cache that retains the latest remote Coordiantor state in a
47 // client-side LRU cache.
48 func NewCache(c Coordinator, size int, expiration time.Duration) Coordinator {
49 if size <= 0 {
50 size = DefaultSize
51 }
52 if expiration <= 0 {
53 expiration = DefaultExpiration
54 }
55
56 return &cache{
57 Coordinator: c,
58 expiration: expiration,
59 lru: lru.New(size),
60 }
61 }
62
63 // RegisterStream invokes the wrapped Coordinator's RegisterStream method and
64 // caches the result. It uses a Promise to cause all simultaneous identical
65 // RegisterStream requests to block on a single RPC.
66 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb .LogStreamDescriptor) (
67 *LogStreamState, error) {
68 now := clock.Now(ctx)
69
70 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r
71 // we're forcing, ignore any existing entry and replace with a Promise p ending
72 // Coordinator sync.
73 entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} {
74 // Don't replace an existing entry, unless it has an error or ha s expired.
75 if current != nil {
76 curEntry := current.(*cacheEntry)
77 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) {
78 return current
79 }
80 }
81
82 p := promise.New(func() (interface{}, error) {
83 st, err := c.Coordinator.RegisterStream(ctx, st, d)
84 if err != nil {
85 return nil, err
86 }
87
88 return &LogStreamState{
89 Path: st.Path,
90 ProtoVersion: st.ProtoVersion,
91 Secret: st.Secret,
92 TerminalIndex: types.MessageIndex(st.TerminalInd ex),
93 Archived: st.Archived,
94 Purged: st.Purged,
95 }, nil
96 })
97
98 return &cacheEntry{
99 terminalIndex: -1,
100 p: p,
101 path: st.Path,
102 expiresAt: now.Add(c.expiration),
103 }
104 }).(*cacheEntry)
105
106 // If there was an error, purge the erroneous entry from the cache so th at
107 // the next "update" will re-fetch it.
108 st, err := entry.get(ctx)
109 if err != nil {
110 log.Fields{
111 log.ErrorKey: err,
112 }.Errorf(ctx, "Error retrieving stream state.")
113 return nil, err
114 }
115 return st, nil
116 }
117
118 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
119 // Immediately update our state cache to record the terminal index, if
120 // we have a state cache.
121 c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) {
122 // Always return the current entry. We're just atomically examin ing it to
123 // load it with a terminal index.
124 r = current
125 if r != nil {
126 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex)
127 }
128 return
129 })
130
131 return c.Coordinator.TerminateStream(ctx, st)
132 }
133
134 // cacheEntry is the value stored in the cache. It contains a Promise
135 // representing the value and an optional invalidation singleton to ensure that
136 // if the state failed to fetch, it will be invalidated at most once.
137 //
138 // In addition to remote caching via Promise, the state can be updated locally
139 // by calling the cache's "put" method. In this case, the Promise will be nil,
140 // and the state value will be populated.
141 type cacheEntry struct {
142 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
143 // will be applied to returned LogStreamState objects so that once a ter minal
144 // index has been set, we become aware of it in the Collector.
145 //
146 // This MUST be the first field in the struct in order to comply with at omic's
147 // 64-bit alignment requirements.
148 terminalIndex int64
149
150 // p is a Promise that is blocking pending a Coordiantor stream state
151 // response. Upon successful resolution, it will contain a *LogStreamSta te.
152 p promise.Promise
153 path types.StreamPath
154 expiresAt time.Time
155 }
156
157 // get returns the cached state that this entry owns, blocking until resolution
158 // if necessary.
159 //
160 // The returned state is a shallow copy of the cached state, and may be
161 // modified by the caller.
162 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
163 promisedSt, err := e.p.Get(ctx)
164 if err != nil {
165 return nil, err
166 }
167
168 // Create a clone of our cached value (not deep, so secret is not cloned , but
169 // the Collector will not modify that). If we have a local terminal inde x
170 // cached, apply that to the response.
171 //
172 // We need to lock around our terminalIndex.
173 rp := *(promisedSt.(*LogStreamState))
174 if rp.TerminalIndex < 0 {
175 rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex))
176 }
177
178 return &rp, nil
179 }
180
181 // hasError tests if this entry has completed evaluation with an error state.
182 // This is non-blocking, so if the evaluation hasn't completed, it will return
183 // false.
184 func (e *cacheEntry) hasError() bool {
185 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
186 return true
187 }
188 return false
189 }
190
191 // loadTerminalIndex loads a local cache of the stream's terminal index. This
192 // will be applied to all future get requests.
193 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
194 atomic.StoreInt64(&e.terminalIndex, int64(tidx))
195 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/collector_test.go ('k') | server/internal/logdog/collector/coordinator/cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698