Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(646)

Side by Side Diff: client/internal/logdog/butler/bundler/stream_test.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "strconv"
12 "testing"
13 "time"
14
15 "github.com/luci/luci-go/common/clock/testclock"
16 "github.com/luci/luci-go/common/logdog/protocol"
17 . "github.com/smartystreets/goconvey/convey"
18 )
19
20 type testParserCommand struct {
21 // data is the data content of this command.
22 data []byte
23 // ts is the timestamp, which is valid if this is a data command.
24 ts time.Time
25 // truncate causes this command to only return if configured to truncate .
26 truncateToggle *bool
27 // closed causes this command to only return if configured to truncate a nd closed.
28 closedToggle *bool
29 // err, if not nil, is returned when this command is encountered.
30 err error
31 }
32
33 var errTestInduced = errors.New("test error")
34
35 // testParser is a parser implementation that allows specifically-configured
36 // data to be emitted. It consumes commands, some of which alter its behavior
37 // and others of which present data. The resulting state affects how it emits
38 // LogEntry records via nextEntry.
39 type testParser struct {
40 commands []*testParserCommand
41
42 appendedData []byte
43 truncateOn bool
44 closedOn bool
45 err error
46 }
47
48 func (p *testParser) addCommand(r *testParserCommand) {
49 p.commands = append(p.commands, r)
50 }
51
52 func (p *testParser) nextCommand(pop bool) *testParserCommand {
53 if len(p.commands) == 0 {
54 return nil
55 }
56 cmd := p.commands[0]
57 if pop {
58 p.commands = p.commands[1:]
59 }
60 return cmd
61 }
62
63 func (p *testParser) popData() (r *testParserCommand) {
64 for i, cmd := range p.commands {
65 if cmd.data != nil {
66 p.commands = p.commands[i+1:]
67 return cmd
68 }
69 }
70 return nil
71 }
72
73 func (p *testParser) tags(ts time.Time, commands ...string) {
74 for _, c := range commands {
75 p.addTag(c, ts)
76 }
77 }
78
79 func (p *testParser) addError(err error) {
80 p.addCommand(&testParserCommand{
81 err: err,
82 })
83 }
84
85 func (p *testParser) addTag(tag string, ts time.Time) {
86 p.addData([]byte(tag), ts)
87 }
88
89 func (p *testParser) addData(d []byte, ts time.Time) {
90 p.addCommand(&testParserCommand{
91 data: d,
92 ts: ts,
93 })
94 }
95
96 func (p *testParser) setTrunc(value bool) {
97 p.addCommand(&testParserCommand{
98 truncateToggle: &value,
99 })
100 }
101
102 func (p *testParser) setClosed(value bool) {
103 p.addCommand(&testParserCommand{
104 closedToggle: &value,
105 })
106 }
107
108 func (p *testParser) appendData(d Data) {
109 p.addData(d.Bytes(), d.Timestamp())
110 }
111
112 func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) {
113 // Process records until we hit data or run out.
114 for p.err == nil {
115 rec := p.nextCommand(false)
116 if rec == nil {
117 return nil, p.err
118 }
119
120 // If this is a data record, process.
121 if rec.data != nil {
122 break
123 }
124
125 // Ingest commands, repeat.
126 if rec.err != nil {
127 p.err = rec.err
128 break
129 }
130
131 if rec.truncateToggle != nil {
132 p.truncateOn = *rec.truncateToggle
133 }
134 if rec.closedToggle != nil {
135 p.closedOn = *rec.closedToggle
136 }
137 p.nextCommand(true)
138 }
139
140 if p.err != nil {
141 return nil, p.err
142 }
143
144 // This is a data record. If we're configured to not yield it, leave it and
145 // return nil.
146 if p.truncateOn && (!c.truncate || (p.closedOn && !c.closed)) {
147 return nil, nil
148 }
149
150 // Consume this record.
151 rec := p.nextCommand(true)
152 return &protocol.LogEntry{
153 Content: &protocol.LogEntry_Text{Text: &protocol.Text{
154 Lines: []*protocol.Text_Line{
155 {Value: string(rec.data)},
156 },
157 }},
158 }, nil
159 }
160
161 func (p *testParser) bufferedBytes() (r int64) {
162 for _, rec := range p.commands {
163 r += int64(len(rec.data))
164 }
165 return
166 }
167
168 func (p *testParser) firstChunkTime() (time.Time, bool) {
169 for _, c := range p.commands {
170 if c.data != nil {
171 return c.ts, true
172 }
173 }
174 return time.Time{}, false
175 }
176
177 func TestStream(t *testing.T) {
178 Convey(`A testing stream config`, t, func() {
179 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
180 tp := testParser{}
181 c := streamConfig{
182 name: "test",
183 parser: &tp,
184 template: protocol.ButlerLogBundle_Entry{
185 Desc: &protocol.LogStreamDescriptor{
186 Prefix: "test-prefix",
187 Name: "test",
188 },
189 },
190 }
191
192 Convey(`With a 64-byte maximum buffer and 1 second maximum durat ion`, func() {
193 c.maximumBufferedBytes = 64
194 c.maximumBufferDuration = time.Second
195 s := newStream(c)
196
197 Convey(`Is not drained by default`, func() {
198 So(s.isDrained(), ShouldBeFalse)
199
200 Convey(`When closed, is drained.`, func() {
201 s.Close()
202 So(s.isDrained(), ShouldBeTrue)
203
204 Convey(`When closed again, is still drai ned.`, func() {
205 s.Close()
206 So(s.isDrained(), ShouldBeTrue)
207 })
208 })
209 })
210
211 Convey(`With no data, has no expiration time.`, func() {
212 _, has := s.expireTime()
213 So(has, ShouldBeFalse)
214 })
215
216 Convey(`Append will ignore a 0-byte chunk.`, func() {
217 d := data(tc.Now())
218 So(s.Append(d), ShouldBeNil)
219 So(d.released, ShouldBeTrue)
220 })
221
222 Convey(`Append will add two 32-byte chunks.`, func() {
223 content := bytes.Repeat([]byte{0xAA}, 32)
224 So(s.Append(data(tc.Now(), content...)), ShouldB eNil)
225 So(s.Append(data(tc.Now(), content...)), ShouldB eNil)
226 })
227
228 Convey(`Append will add a large chunk when there are no other Data blocks.`, func() {
229 d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 1 28)...)
230 So(s.Append(d), ShouldBeNil)
231
232 Convey(`Will use that data's timestamp as expira tion time.`, func() {
233 t, has := s.expireTime()
234 So(has, ShouldBeTrue)
235 So(t.Equal(tc.Now().Add(time.Second)), S houldBeTrue)
236 })
237 })
238
239 Convey(`Append will block if the chunk exceeds the buffe r size.`, func() {
240 signalC := make(chan struct{})
241 s.c.onAppend = func(appended bool) {
242 if !appended {
243 // We're waiting.
244 close(signalC)
245 }
246 }
247
248 // Add one chunk so we don't hit the "only byte" condition.
249 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0 xAA}, 34)...)), ShouldBeNil)
250
251 // Wait until we get the signal that Append() wi ll block, then consume
252 // some data and unblock Append().
253 blocked := false
254 go func() {
255 <-signalC
256
257 s.withParserLock(func() {
258 tp.popData()
259 })
260 blocked = true
261 s.signalDataConsumed()
262 }()
263
264 // Add one chunk so we don't hit the "only byte" condition.
265 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0 xBB}, 32)...)), ShouldBeNil)
266 So(blocked, ShouldBeTrue)
267 })
268
269 Convey(`Append in an error state`, func() {
270 terr := errors.New("test error")
271
272 Convey(`Will return the error state.`, func() {
273 s.setAppendError(terr)
274
275 d := data(tc.Now(), bytes.Repeat([]byte{ 0xAA}, 32)...)
276 So(s.Append(d), ShouldEqual, terr)
277 So(d.released, ShouldBeTrue)
278 })
279
280 Convey(`Will block if the chunk exceeds buffer s ize, and return error state.`, func() {
281 signalC := make(chan struct{})
282 s.c.onAppend = func(appended bool) {
283 if !appended {
284 // Waiting, notify our g oroutine that we're going to be waiting.
285 close(signalC)
286 }
287 }
288
289 // Add one chunk so we don't hit the "on ly byte" condition.
290 So(s.Append(data(tc.Now(), bytes.Repeat( []byte{0xAA}, 34)...)), ShouldBeNil)
291
292 // Wait until we get the signal that App end() will block, then consume
293 // some data and unblock Append().
294 go func() {
295 <-signalC
296 s.setAppendError(terr)
297 }()
298
299 // Add one chunk so we don't hit the "on ly byte" condition.
300 for _, sz := range []int{32, 1, 0} {
301 d := data(tc.Now(), bytes.Repeat ([]byte{0xAA}, sz)...)
302 So(s.Append(d), ShouldEqual, ter r)
303 So(d.released, ShouldBeTrue)
304 }
305 })
306 })
307 })
308
309 Convey(`When building bundle entries`, func() {
310 bb := &builder{
311 size: 1024,
312 }
313 s := newStream(c)
314
315 Convey(`Returns nil with no buffered data.`, func() {
316 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
317 So(bb.bundle(), shouldHaveBundleEntries)
318 })
319
320 Convey(`With a single record, returns that entry.`, func () {
321 tp.tags(tc.Now(), "a", "b")
322
323 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
324 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b")
325 })
326
327 Convey(`When truncate is required, returns nil.`, func() {
328 tp.tags(tc.Now(), "a", "b")
329 tp.setTrunc(true)
330 tp.tags(tc.Now(), "c")
331
332 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
333 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b")
334 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
335
336 So(s.nextBundleEntry(bb, true), ShouldBeTrue)
337 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b:c")
338 })
339
340 Convey(`When an error occurs during stream parsing, drai ns stream.`, func() {
341 So(s.isDrained(), ShouldBeFalse)
342 tp.tags(tc.Now(), "a")
343 tp.addError(errTestInduced)
344 tp.tags(tc.Now(), "b")
345
346 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
347 So(s.isDrained(), ShouldBeTrue)
348 So(bb.bundle(), shouldHaveBundleEntries, "+test: a")
349 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
350 })
351
352 Convey(`With only an error, returns no bundle entry.`, f unc() {
353 So(s.isDrained(), ShouldBeFalse)
354 tp.addError(errTestInduced)
355 tp.tags(tc.Now(), "a")
356 tp.tags(tc.Now(), "b")
357
358 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
359 So(bb.bundle(), shouldHaveBundleEntries)
360 So(s.isDrained(), ShouldBeTrue)
361 })
362 })
363 })
364 }
365
366 // TestStreamSmoke tests a Stream in an actual multi-goroutine workflow.
367 func TestStreamSmoke(t *testing.T) {
368 Convey(`When running a smoke test`, t, func() {
369 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
370 tp := testParser{}
371 c := streamConfig{
372 name: "test",
373 parser: &tp,
374 template: protocol.ButlerLogBundle_Entry{
375 Desc: &protocol.LogStreamDescriptor{
376 Prefix: "test-prefix",
377 Name: "test",
378 },
379 },
380 }
381 s := newStream(c)
382
383 // Appender goroutine, constantly appends data.
384 //
385 // This will be inherently throttled by the nextBundle consumpti on.
386 go func() {
387 defer s.Close()
388
389 for i := 0; i < 512; i++ {
390 s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d", i))...))
391 tc.Add(time.Second)
392 }
393 }()
394
395 // The consumer goroutine will consume bundles from the stream.
396 consumerC := make(chan struct{})
397 bundleC := make(chan *protocol.ButlerLogBundle)
398 for i := 0; i < 32; i++ {
399 go func() {
400 defer func() {
401 consumerC <- struct{}{}
402 }()
403
404 b := (*builder)(nil)
405 for !s.isDrained() {
406 if b == nil {
407 b = &builder{
408 size: 128,
409 }
410 }
411
412 s.nextBundleEntry(b, false)
413 if b.hasContent() {
414 bundleC <- b.bundle()
415 b = nil
416 } else {
417 // No content! Sleep for a secon d and check again.
418 tc.Sleep(time.Second)
419 }
420 }
421 }()
422 }
423
424 // Collect all bundles.
425 gotIt := map[int]struct{}{}
426 collectDoneC := make(chan struct{})
427 go func() {
428 defer close(collectDoneC)
429
430 for bundle := range bundleC {
431 for _, be := range bundle.Entries {
432 for _, le := range be.Logs {
433 idx, _ := strconv.Atoi(logEntryN ame(le))
434 gotIt[idx] = struct{}{}
435 }
436 }
437 }
438 }()
439
440 for i := 0; i < 32; i++ {
441 <-consumerC
442 }
443 close(bundleC)
444
445 // Did we get them all?
446 <-collectDoneC
447 for i := 0; i < 512; i++ {
448 _, ok := gotIt[i]
449 So(ok, ShouldBeTrue)
450 }
451 })
452 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698