Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(269)

Side by Side Diff: server/internal/logdog/collector/utils_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/internal/logdog/collector/doc.go ('k') | server/internal/logdog/config/config.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "sort"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/golang/protobuf/proto"
17 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/logdog/butlerproto"
19 "github.com/luci/luci-go/common/logdog/types"
20 "github.com/luci/luci-go/common/proto/google"
21 "github.com/luci/luci-go/common/proto/logdog/logpb"
22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator "
23 "github.com/luci/luci-go/server/logdog/storage"
24 "golang.org/x/net/context"
25 )
26
27 var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength)
28
29 // testCoordinator is an implementation of Coordinator that can be used for
30 // testing.
31 type testCoordinator struct {
32 sync.Mutex
33
34 // errC is a channel that error status will be read from if not nil.
35 errC chan error
36
37 // state is the latest tracked stream state.
38 state map[string]*cc.LogStreamState
39 }
40
41 var _ cc.Coordinator = (*testCoordinator)(nil)
42
43 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState {
44 c.Lock()
45 defer c.Unlock()
46
47 // Update our state.
48 if c.state == nil {
49 c.state = make(map[string]*cc.LogStreamState)
50 }
51 if sp := c.state[string(s.Path)]; sp != nil {
52 return *sp
53 }
54 c.state[string(s.Path)] = &s
55 return s
56 }
57
58 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta te, d *logpb.LogStreamDescriptor) (
59 *cc.LogStreamState, error) {
60 if err := c.enter(); err != nil {
61 return nil, err
62 }
63
64 // Enforce that the submitted stream is not terminal.
65 rs := *s
66 rs.TerminalIndex = -1
67
68 // Update our state.
69 sp := c.register(rs)
70 return &sp, nil
71 }
72
73 func (c *testCoordinator) TerminateStream(ctx context.Context, st *cc.LogStreamS tate) error {
74 if err := c.enter(); err != nil {
75 return err
76 }
77
78 if st.TerminalIndex < 0 {
79 return errors.New("submitted stream is not terminal")
80 }
81
82 c.Lock()
83 defer c.Unlock()
84
85 // Update our state.
86 cachedState, ok := c.state[string(st.Path)]
87 if !ok {
88 return fmt.Errorf("no such stream: %s", st.Path)
89 }
90 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter minalIndex {
91 return fmt.Errorf("incompatible terminal indexes: %d != %d", st. TerminalIndex, cachedState.TerminalIndex)
92 }
93
94 cachedState.TerminalIndex = st.TerminalIndex
95 return nil
96 }
97
98 // enter is an entry point for client goroutines. It offers the opportunity
99 // to trap executing goroutines within client calls.
100 //
101 // This must NOT be called while the lock is held, else it could lead to
102 // deadlock if multiple goroutines are trapped.
103 func (c *testCoordinator) enter() error {
104 if c.errC != nil {
105 return <-c.errC
106 }
107 return nil
108 }
109
110 func (c *testCoordinator) stream(name string) (int, bool) {
111 c.Lock()
112 defer c.Unlock()
113
114 sp, ok := c.state[name]
115 if !ok {
116 return 0, false
117 }
118 return int(sp.TerminalIndex), true
119 }
120
121 // testStorage is a testing storage instance that returns errors.
122 type testStorage struct {
123 storage.Storage
124 err func() error
125 }
126
127 func (s *testStorage) Put(r *storage.PutRequest) error {
128 if s.err != nil {
129 if err := s.err(); err != nil {
130 return err
131 }
132 }
133 return s.Storage.Put(r)
134 }
135
136 // bundleBuilder is a set of utility functions to help test cases construct
137 // specific logpb.ButlerLogBundle layouts.
138 type bundleBuilder struct {
139 context.Context
140
141 base time.Time
142 entries []*logpb.ButlerLogBundle_Entry
143 }
144
145 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
146 if b.base.IsZero() {
147 b.base = clock.Now(b)
148 }
149
150 b.entries = append(b.entries, be)
151 }
152
153 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp b.ButlerLogBundle_Entry {
154 p, n := types.StreamPath(name).Split()
155 be := logpb.ButlerLogBundle_Entry{
156 Secret: testSecret,
157 Desc: &logpb.LogStreamDescriptor{
158 Prefix: string(p),
159 Name: string(n),
160 ContentType: "application/test-message",
161 StreamType: logpb.StreamType_TEXT,
162 Timestamp: google.NewTimestamp(clock.Now(b)),
163 },
164 }
165
166 if len(idxs) > 0 {
167 be.Logs = make([]*logpb.LogEntry, len(idxs))
168 for i, idx := range idxs {
169 be.Logs[i] = b.logEntry(idx)
170 }
171 if tidx >= 0 {
172 be.Terminal = true
173 be.TerminalIndex = uint64(tidx)
174 }
175 }
176
177 return &be
178 }
179
180 func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) {
181 b.addBundleEntry(b.genBundleEntry(name, term, idxs...))
182 }
183
184 func (b *bundleBuilder) addFullStream(name string, count int) {
185 idxs := make([]int, count)
186 for i := range idxs {
187 idxs[i] = i
188 }
189 b.addStreamEntries(name, count-1, idxs...)
190 }
191
192 func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry {
193 return &logpb.LogEntry{
194 StreamIndex: uint64(idx),
195 Sequence: uint64(idx),
196 Content: &logpb.LogEntry_Text{
197 Text: &logpb.Text{
198 Lines: []*logpb.Text_Line{
199 {
200 Value: fmt.Sprintf("Line #%d ", idx),
201 Delimiter: "\n",
202 },
203 },
204 },
205 },
206 }
207 }
208
209 func (b *bundleBuilder) bundle() []byte {
210 bytes := b.bundleWithEntries(b.entries...)
211 b.entries = nil
212 return bytes
213 }
214
215 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b yte {
216 bundle := logpb.ButlerLogBundle{
217 Source: "test stream",
218 Timestamp: google.NewTimestamp(clock.Now(b)),
219 Entries: e,
220 }
221
222 buf := bytes.Buffer{}
223 w := butlerproto.Writer{Compress: true}
224 if err := w.Write(&buf, &bundle); err != nil {
225 panic(err)
226 }
227 return buf.Bytes()
228 }
229
230 type indexRange struct {
231 start int
232 end int
233 }
234
235 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r .end) }
236
237 // shouldHaveRegisteredStream asserts that a testCoordinator has
238 // registered a stream (string) and its terminal index (int).
239 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str ing {
240 tcc := actual.(*testCoordinator)
241 name := expected[0].(string)
242 tidx := expected[1].(int)
243
244 cur, ok := tcc.stream(name)
245 if !ok {
246 return fmt.Sprintf("stream %q is not registered", name)
247 }
248 if tidx >= 0 && cur < 0 {
249 return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name)
250 }
251 if cur >= 0 && tidx < 0 {
252 return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name)
253 }
254 return ""
255 }
256
257 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not
258 // registered a stream (string).
259 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
260 tcc := actual.(*testCoordinator)
261 name := expected[0].(string)
262
263 if _, ok := tcc.stream(name); ok {
264 return fmt.Sprintf("stream %q is registered, but it should NOT b e.", name)
265 }
266 return ""
267 }
268
269 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous
270 // stream records in it.
271 //
272 // actual is the storage.Storage instance. expected is a stream name (string)
273 // followed by a a series of records to assert. This can either be a specific
274 // integer index or an intexRange marking a closed range of indices.
275 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
276 st := actual.(storage.Storage)
277 name := expected[0].(string)
278
279 // Load all entries for this stream.
280 req := storage.GetRequest{
281 Path: types.StreamPath(name),
282 }
283
284 entries := make(map[int]*logpb.LogEntry)
285 var ierr error
286 err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool {
287 le := logpb.LogEntry{}
288 if ierr = proto.Unmarshal(d, &le); ierr != nil {
289 return false
290 }
291 entries[int(idx)] = &le
292 return true
293 })
294 if ierr != nil {
295 err = ierr
296 }
297 if err != nil && err != storage.ErrDoesNotExist {
298 return fmt.Sprintf("error: %v", err)
299 }
300
301 assertLogEntry := func(i int) string {
302 le := entries[i]
303 if le == nil {
304 return fmt.Sprintf("%d", i)
305 }
306 delete(entries, i)
307
308 if le.StreamIndex != uint64(i) {
309 return fmt.Sprintf("*%d", i)
310 }
311 return ""
312 }
313
314 var failed []string
315 for _, exp := range expected[1:] {
316 switch e := exp.(type) {
317 case int:
318 if err := assertLogEntry(e); err != "" {
319 failed = append(failed, err)
320 }
321
322 case indexRange:
323 var errs []string
324 for i := e.start; i <= e.end; i++ {
325 if err := assertLogEntry(i); err != "" {
326 errs = append(errs, err)
327 }
328 }
329 if len(errs) > 0 {
330 failed = append(failed, fmt.Sprintf("%s{%s}", e. String(), strings.Join(errs, ",")))
331 }
332
333 default:
334 panic(fmt.Errorf("unknown expected type %T", e))
335 }
336 }
337
338 // Extras?
339 if len(entries) > 0 {
340 idxs := make([]int, 0, len(entries))
341 for i := range entries {
342 idxs = append(idxs, i)
343 }
344 sort.Ints(idxs)
345
346 extra := make([]string, len(idxs))
347 for i, idx := range idxs {
348 extra[i] = fmt.Sprintf("%d", idx)
349 }
350 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex tra, ",")))
351 }
352
353 if len(failed) > 0 {
354 return strings.Join(failed, ", ")
355 }
356 return ""
357 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/doc.go ('k') | server/internal/logdog/config/config.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698