Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(591)

Side by Side Diff: server/internal/logdog/collector/streamstatecache_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "fmt"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
14 "github.com/luci/luci-go/common/clock/testclock"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/logdog/types"
17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
18 "golang.org/x/net/context"
19
20 . "github.com/smartystreets/goconvey/convey"
21 )
22
23 func TestStreamStateCache(t *testing.T) {
24 t.Parallel()
25
26 Convey(`Using a test configuration`, t, func() {
27 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
28 tcc := testCoordinatorClient{}
29
30 o := streamStateCacheOptions{
31 coordinator: &tcc,
32 cacheSize: 4,
33 expiration: 1 * time.Second,
34 }
35
36 st := cc.State{
37 Path: "foo/+/bar",
38 State: &service.LogStreamState{
39 TerminalIndex: -1,
40 },
41 }
42
43 // Note: In all of these tests, we check if "proto" field (Proto Version)
44 // is "remote". We use ProtoVersion as a channel between our fak e remote
45 // service. When our fake remote service returns a LogStreamStat e, it sets
46 // "remote" to true to differentiate it from the local pushed st ate.
47 //
48 // If a LogStreamState has "remote" set to true, that implies th at it was
49 // sent by the fake testing service rather than the local test.
50 Convey(`A streamStateCache`, func() {
51 ssc := newStreamStateCache(o)
52
53 resultC := make(chan *stateProxy)
54 req := func(s cc.State) {
55 var res *stateProxy
56 defer func() {
57 resultC <- res
58 }()
59
60 st, err := ssc.getOrRegister(c, &s)
61 if err == nil {
62 res = st
63 }
64 }
65
66 Convey(`Can register a stream`, func() {
67 s, err := ssc.getOrRegister(c, &st)
68 So(err, ShouldBeNil)
69 So(s.proto, ShouldEqual, "remote")
70 So(tcc.calls, ShouldEqual, 1)
71
72 Convey(`Will not re-register the same stream.`, func() {
73 st.ProtoVersion = ""
74
75 s, err := ssc.getOrRegister(c, &st)
76 So(err, ShouldBeNil)
77 So(s.proto, ShouldEqual, "remote")
78 So(tcc.calls, ShouldEqual, 1)
79 })
80
81 Convey(`When the registration expires`, func() {
82 st.ProtoVersion = ""
83 tc.Add(time.Second)
84
85 Convey(`Will re-register the stream.`, f unc() {
86 s, err := ssc.getOrRegister(c, & st)
87 So(err, ShouldBeNil)
88 So(s.proto, ShouldEqual, "remote ")
89 So(tcc.calls, ShouldEqual, 2)
90 })
91 })
92
93 Convey(`Can terminate a registered stream`, func () {
94 s.terminalIndex = 1337
95 So(ssc.setTerminalIndex(c, s), ShouldBeN il)
96 So(tcc.calls, ShouldEqual, 2) // +1 call
97
98 Convey(`Registering the stream will incl ude the terminal index.`, func() {
99 // Fill it in with junk to make sure we are getting cached.
100 st.State.TerminalIndex = 123
101 st.ProtoVersion = ""
102
103 s, err := ssc.getOrRegister(c, & st)
104 So(err, ShouldBeNil)
105 So(s.proto, ShouldEqual, "remote ")
106 So(s.terminalIndex, ShouldEqual, 1337)
107 So(tcc.calls, ShouldEqual, 2) // No additional calls.
108 })
109 })
110 })
111
112 Convey(`When the terminal index is set before the fetch finishes, it will be returned.`, func() {
113 tcc.callC = make(chan struct{})
114 tcc.errC = make(chan error)
115
116 go req(st)
117
118 // Wait for our request to block on RegisterStre am.
119 <-tcc.callC
120
121 // Set the terminal index. We will use a minimal stateProxy. We know
122 // that this will happen after the streamStateCa cheEntry is registered
123 // because both block on the LRU cache's Mutate, which is atomic, and
124 // getOrRegister must have added the streamState CacheEntry in order for
125 // the lock to be available for setTerminalIndex to proceed.
126 terminalErrC := make(chan error)
127 go func() {
128 terminalErrC <- ssc.setTerminalIndex(c, &stateProxy{
129 path: st.Path,
130 terminalIndex: 1337,
131 })
132 }()
133
134 // Let both requests succeed.
135 <-tcc.callC
136 tcc.errC <- nil
137 tcc.errC <- nil
138
139 // Read the stateProxy from our getOrRegister re quest.
140 s := <-resultC
141 So(s, ShouldNotBeNil)
142 So(s.terminalIndex, ShouldEqual, 1337)
143 })
144
145 Convey(`When multiple goroutines register the same strea m, it gets registered once.`, func() {
146 tcc.callC = make(chan struct{})
147 tcc.errC = make(chan error)
148
149 errs := make(errors.MultiError, 256)
150 for i := 0; i < len(errs); i++ {
151 go req(st)
152 }
153
154 <-tcc.callC
155 tcc.errC <- nil
156 for i := 0; i < len(errs); i++ {
157 <-resultC
158 }
159
160 So(errors.SingleError(errs), ShouldBeNil)
161 So(tcc.calls, ShouldEqual, 1)
162 })
163
164 Convey(`Multiple registrations for the same stream will result in two requests if the first expires.`, func() {
165 tcc.callC = make(chan struct{})
166 tcc.errC = make(chan error)
167
168 // First request.
169 go req(st)
170
171 // Wait for the request to happen, then advance time past the request's
172 // expiration.
173 <-tcc.callC
174 tc.Add(time.Second)
175
176 // Second request.
177 go req(st)
178
179 // Release both calls and reap the results.
180 <-tcc.callC
181 tcc.errC <- nil
182 tcc.errC <- nil
183
184 r1 := <-resultC
185 r2 := <-resultC
186
187 So(r1.proto, ShouldEqual, "remote")
188 So(r2.proto, ShouldEqual, "remote")
189 So(tcc.calls, ShouldEqual, 2)
190 })
191
192 Convey(`A registration error will result in a getOrRegis ter error.`, func() {
193 tcc.errC = make(chan error, 1)
194 tcc.errC <- errors.New("test error")
195
196 _, err := ssc.getOrRegister(c, &st)
197 So(err, ShouldNotBeNil)
198 So(tcc.calls, ShouldEqual, 1)
199
200 Convey(`A second registration without error will make a new request.`, func() {
201 tcc.errC = nil
202
203 _, err := ssc.getOrRegister(c, &st)
204 So(err, ShouldBeNil)
205 So(tcc.calls, ShouldEqual, 2)
206 })
207 })
208 })
209
210 Convey(`A streamStateCache can register multiple streams at once .`, func() {
211 tcc.callC = make(chan struct{})
212 tcc.errC = make(chan error)
213 ssc := newStreamStateCache(o)
214
215 count := 256
216 wg := sync.WaitGroup{}
217 errs := make(errors.MultiError, count)
218 state := make([]*stateProxy, count)
219 wg.Add(count)
220 for i := 0; i < count; i++ {
221 st := st
222 st.Path = types.StreamPath(fmt.Sprintf("foo/+/ba r%d", i))
223
224 go func(i int) {
225 defer wg.Done()
226 state[i], errs[i] = ssc.getOrRegister(c, &st)
227 }(i)
228 }
229
230 // Wait for all of them to simultaneously call.
231 for i := 0; i < count; i++ {
232 <-tcc.callC
233 }
234
235 // They're all blocked on errC; allow them to continue.
236 for i := 0; i < count; i++ {
237 tcc.errC <- nil
238 }
239
240 // Wait for them to finish.
241 wg.Wait()
242
243 // Confirm that all registered successfully.
244 So(errors.SingleError(errs), ShouldBeNil)
245
246 remotes := 0
247 for i := 0; i < count; i++ {
248 if state[i].proto == "remote" {
249 remotes++
250 }
251 }
252 So(remotes, ShouldEqual, count)
253 })
254 })
255 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698