| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "sync" |
| 10 "testing" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1" |
| 14 "github.com/luci/luci-go/common/clock/testclock" |
| 15 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/logdog/types" |
| 17 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| 18 "golang.org/x/net/context" |
| 19 |
| 20 . "github.com/smartystreets/goconvey/convey" |
| 21 ) |
| 22 |
| 23 func TestStreamStateCache(t *testing.T) { |
| 24 t.Parallel() |
| 25 |
| 26 Convey(`Using a test configuration`, t, func() { |
| 27 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 28 tcc := testCoordinatorClient{} |
| 29 |
| 30 o := streamStateCacheOptions{ |
| 31 coordinator: &tcc, |
| 32 cacheSize: 4, |
| 33 expiration: 1 * time.Second, |
| 34 } |
| 35 |
| 36 st := cc.State{ |
| 37 Path: "foo/+/bar", |
| 38 State: &service.LogStreamState{ |
| 39 TerminalIndex: -1, |
| 40 }, |
| 41 } |
| 42 |
| 43 // Note: In all of these tests, we check if "proto" field (Proto
Version) |
| 44 // is "remote". We use ProtoVersion as a channel between our fak
e remote |
| 45 // service. When our fake remote service returns a LogStreamStat
e, it sets |
| 46 // "remote" to true to differentiate it from the local pushed st
ate. |
| 47 // |
| 48 // If a LogStreamState has "remote" set to true, that implies th
at it was |
| 49 // sent by the fake testing service rather than the local test. |
| 50 Convey(`A streamStateCache`, func() { |
| 51 ssc := newStreamStateCache(o) |
| 52 |
| 53 resultC := make(chan *stateProxy) |
| 54 req := func(s cc.State) { |
| 55 var res *stateProxy |
| 56 defer func() { |
| 57 resultC <- res |
| 58 }() |
| 59 |
| 60 st, err := ssc.getOrRegister(c, &s) |
| 61 if err == nil { |
| 62 res = st |
| 63 } |
| 64 } |
| 65 |
| 66 Convey(`Can register a stream`, func() { |
| 67 s, err := ssc.getOrRegister(c, &st) |
| 68 So(err, ShouldBeNil) |
| 69 So(s.proto, ShouldEqual, "remote") |
| 70 So(tcc.calls, ShouldEqual, 1) |
| 71 |
| 72 Convey(`Will not re-register the same stream.`,
func() { |
| 73 st.ProtoVersion = "" |
| 74 |
| 75 s, err := ssc.getOrRegister(c, &st) |
| 76 So(err, ShouldBeNil) |
| 77 So(s.proto, ShouldEqual, "remote") |
| 78 So(tcc.calls, ShouldEqual, 1) |
| 79 }) |
| 80 |
| 81 Convey(`When the registration expires`, func() { |
| 82 st.ProtoVersion = "" |
| 83 tc.Add(time.Second) |
| 84 |
| 85 Convey(`Will re-register the stream.`, f
unc() { |
| 86 s, err := ssc.getOrRegister(c, &
st) |
| 87 So(err, ShouldBeNil) |
| 88 So(s.proto, ShouldEqual, "remote
") |
| 89 So(tcc.calls, ShouldEqual, 2) |
| 90 }) |
| 91 }) |
| 92 |
| 93 Convey(`Can terminate a registered stream`, func
() { |
| 94 s.terminalIndex = 1337 |
| 95 So(ssc.setTerminalIndex(c, s), ShouldBeN
il) |
| 96 So(tcc.calls, ShouldEqual, 2) // +1 call |
| 97 |
| 98 Convey(`Registering the stream will incl
ude the terminal index.`, func() { |
| 99 // Fill it in with junk to make
sure we are getting cached. |
| 100 st.State.TerminalIndex = 123 |
| 101 st.ProtoVersion = "" |
| 102 |
| 103 s, err := ssc.getOrRegister(c, &
st) |
| 104 So(err, ShouldBeNil) |
| 105 So(s.proto, ShouldEqual, "remote
") |
| 106 So(s.terminalIndex, ShouldEqual,
1337) |
| 107 So(tcc.calls, ShouldEqual, 2) //
No additional calls. |
| 108 }) |
| 109 }) |
| 110 }) |
| 111 |
| 112 Convey(`When the terminal index is set before the fetch
finishes, it will be returned.`, func() { |
| 113 tcc.callC = make(chan struct{}) |
| 114 tcc.errC = make(chan error) |
| 115 |
| 116 go req(st) |
| 117 |
| 118 // Wait for our request to block on RegisterStre
am. |
| 119 <-tcc.callC |
| 120 |
| 121 // Set the terminal index. We will use a minimal
stateProxy. We know |
| 122 // that this will happen after the streamStateCa
cheEntry is registered |
| 123 // because both block on the LRU cache's Mutate,
which is atomic, and |
| 124 // getOrRegister must have added the streamState
CacheEntry in order for |
| 125 // the lock to be available for setTerminalIndex
to proceed. |
| 126 terminalErrC := make(chan error) |
| 127 go func() { |
| 128 terminalErrC <- ssc.setTerminalIndex(c,
&stateProxy{ |
| 129 path: st.Path, |
| 130 terminalIndex: 1337, |
| 131 }) |
| 132 }() |
| 133 |
| 134 // Let both requests succeed. |
| 135 <-tcc.callC |
| 136 tcc.errC <- nil |
| 137 tcc.errC <- nil |
| 138 |
| 139 // Read the stateProxy from our getOrRegister re
quest. |
| 140 s := <-resultC |
| 141 So(s, ShouldNotBeNil) |
| 142 So(s.terminalIndex, ShouldEqual, 1337) |
| 143 }) |
| 144 |
| 145 Convey(`When multiple goroutines register the same strea
m, it gets registered once.`, func() { |
| 146 tcc.callC = make(chan struct{}) |
| 147 tcc.errC = make(chan error) |
| 148 |
| 149 errs := make(errors.MultiError, 256) |
| 150 for i := 0; i < len(errs); i++ { |
| 151 go req(st) |
| 152 } |
| 153 |
| 154 <-tcc.callC |
| 155 tcc.errC <- nil |
| 156 for i := 0; i < len(errs); i++ { |
| 157 <-resultC |
| 158 } |
| 159 |
| 160 So(errors.SingleError(errs), ShouldBeNil) |
| 161 So(tcc.calls, ShouldEqual, 1) |
| 162 }) |
| 163 |
| 164 Convey(`Multiple registrations for the same stream will
result in two requests if the first expires.`, func() { |
| 165 tcc.callC = make(chan struct{}) |
| 166 tcc.errC = make(chan error) |
| 167 |
| 168 // First request. |
| 169 go req(st) |
| 170 |
| 171 // Wait for the request to happen, then advance
time past the request's |
| 172 // expiration. |
| 173 <-tcc.callC |
| 174 tc.Add(time.Second) |
| 175 |
| 176 // Second request. |
| 177 go req(st) |
| 178 |
| 179 // Release both calls and reap the results. |
| 180 <-tcc.callC |
| 181 tcc.errC <- nil |
| 182 tcc.errC <- nil |
| 183 |
| 184 r1 := <-resultC |
| 185 r2 := <-resultC |
| 186 |
| 187 So(r1.proto, ShouldEqual, "remote") |
| 188 So(r2.proto, ShouldEqual, "remote") |
| 189 So(tcc.calls, ShouldEqual, 2) |
| 190 }) |
| 191 |
| 192 Convey(`A registration error will result in a getOrRegis
ter error.`, func() { |
| 193 tcc.errC = make(chan error, 1) |
| 194 tcc.errC <- errors.New("test error") |
| 195 |
| 196 _, err := ssc.getOrRegister(c, &st) |
| 197 So(err, ShouldNotBeNil) |
| 198 So(tcc.calls, ShouldEqual, 1) |
| 199 |
| 200 Convey(`A second registration without error will
make a new request.`, func() { |
| 201 tcc.errC = nil |
| 202 |
| 203 _, err := ssc.getOrRegister(c, &st) |
| 204 So(err, ShouldBeNil) |
| 205 So(tcc.calls, ShouldEqual, 2) |
| 206 }) |
| 207 }) |
| 208 }) |
| 209 |
| 210 Convey(`A streamStateCache can register multiple streams at once
.`, func() { |
| 211 tcc.callC = make(chan struct{}) |
| 212 tcc.errC = make(chan error) |
| 213 ssc := newStreamStateCache(o) |
| 214 |
| 215 count := 256 |
| 216 wg := sync.WaitGroup{} |
| 217 errs := make(errors.MultiError, count) |
| 218 state := make([]*stateProxy, count) |
| 219 wg.Add(count) |
| 220 for i := 0; i < count; i++ { |
| 221 st := st |
| 222 st.Path = types.StreamPath(fmt.Sprintf("foo/+/ba
r%d", i)) |
| 223 |
| 224 go func(i int) { |
| 225 defer wg.Done() |
| 226 state[i], errs[i] = ssc.getOrRegister(c,
&st) |
| 227 }(i) |
| 228 } |
| 229 |
| 230 // Wait for all of them to simultaneously call. |
| 231 for i := 0; i < count; i++ { |
| 232 <-tcc.callC |
| 233 } |
| 234 |
| 235 // They're all blocked on errC; allow them to continue. |
| 236 for i := 0; i < count; i++ { |
| 237 tcc.errC <- nil |
| 238 } |
| 239 |
| 240 // Wait for them to finish. |
| 241 wg.Wait() |
| 242 |
| 243 // Confirm that all registered successfully. |
| 244 So(errors.SingleError(errs), ShouldBeNil) |
| 245 |
| 246 remotes := 0 |
| 247 for i := 0; i < count; i++ { |
| 248 if state[i].proto == "remote" { |
| 249 remotes++ |
| 250 } |
| 251 } |
| 252 So(remotes, ShouldEqual, count) |
| 253 }) |
| 254 }) |
| 255 } |
| OLD | NEW |