Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(364)

Side by Side Diff: client/internal/logdog/butler/bundler/stream_test.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Updated from comments. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bundler
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "strconv"
12 "testing"
13 "time"
14
15 "github.com/luci/luci-go/common/clock/testclock"
16 "github.com/luci/luci-go/common/logdog/protocol"
17 . "github.com/smartystreets/goconvey/convey"
18 )
19
20 type testParserCommand struct {
21 // data is the data content of this command.
22 data []byte
23 // ts is the timestamp, which is valid if this is a data command.
24 ts time.Time
25 // splitToggle, if not null, causes this command to set the "allowSplit"
26 // parser constraint.
27 splitToggle *bool
28 // closedToggle, if not null, causes this command to set the "closed"
29 // parser constraint.
30 closedToggle *bool
31 // err, if not nil, is returned when this command is encountered.
32 err error
33 }
34
35 var errTestInduced = errors.New("test error")
36
37 // testParser is a parser implementation that allows specifically-configured
38 // data to be emitted. It consumes commands, some of which alter its behavior
39 // and others of which present data. The resulting state affects how it emits
40 // LogEntry records via nextEntry.
41 type testParser struct {
42 commands []*testParserCommand
43
44 appendedData []byte
45 truncateOn bool
46 closedOn bool
47 err error
48 }
49
50 func (p *testParser) addCommand(r *testParserCommand) {
51 p.commands = append(p.commands, r)
52 }
53
54 func (p *testParser) nextCommand(pop bool) *testParserCommand {
55 if len(p.commands) == 0 {
56 return nil
57 }
58 cmd := p.commands[0]
59 if pop {
60 p.commands = p.commands[1:]
61 }
62 return cmd
63 }
64
65 func (p *testParser) popData() (r *testParserCommand) {
66 for i, cmd := range p.commands {
67 if cmd.data != nil {
68 p.commands = p.commands[i+1:]
69 return cmd
70 }
71 }
72 return nil
73 }
74
75 func (p *testParser) tags(ts time.Time, commands ...string) {
76 for _, c := range commands {
77 p.addTag(c, ts)
78 }
79 }
80
81 func (p *testParser) addError(err error) {
82 p.addCommand(&testParserCommand{
83 err: err,
84 })
85 }
86
87 func (p *testParser) addTag(tag string, ts time.Time) {
88 p.addData([]byte(tag), ts)
89 }
90
91 func (p *testParser) addData(d []byte, ts time.Time) {
92 p.addCommand(&testParserCommand{
93 data: d,
94 ts: ts,
95 })
96 }
97
98 func (p *testParser) setAllowSplit(value bool) {
99 p.addCommand(&testParserCommand{
100 splitToggle: &value,
101 })
102 }
103
104 func (p *testParser) setClosed(value bool) {
105 p.addCommand(&testParserCommand{
106 closedToggle: &value,
107 })
108 }
109
110 func (p *testParser) appendData(d Data) {
111 p.addData(d.Bytes(), d.Timestamp())
112 }
113
114 func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) {
115 // Process records until we hit data or run out.
116 for p.err == nil {
117 rec := p.nextCommand(false)
118 if rec == nil {
119 return nil, p.err
120 }
121
122 // If this is a data record, process.
123 if rec.data != nil {
124 break
125 }
126
127 // Ingest commands, repeat.
128 if rec.err != nil {
129 p.err = rec.err
130 break
131 }
132
133 if rec.splitToggle != nil {
134 p.truncateOn = *rec.splitToggle
135 }
136 if rec.closedToggle != nil {
137 p.closedOn = *rec.closedToggle
138 }
139 p.nextCommand(true)
140 }
141
142 if p.err != nil {
143 return nil, p.err
144 }
145
146 // This is a data record. If we're configured to not yield it, leave it and
147 // return nil.
148 if p.truncateOn && (!c.allowSplit || (p.closedOn && !c.closed)) {
149 return nil, nil
150 }
151
152 // Consume this record.
153 rec := p.nextCommand(true)
154 return &protocol.LogEntry{
155 Content: &protocol.LogEntry_Text{Text: &protocol.Text{
156 Lines: []*protocol.Text_Line{
157 {Value: string(rec.data)},
158 },
159 }},
160 }, nil
161 }
162
163 func (p *testParser) bufferedBytes() (r int64) {
164 for _, rec := range p.commands {
165 r += int64(len(rec.data))
166 }
167 return
168 }
169
170 func (p *testParser) firstChunkTime() (time.Time, bool) {
171 for _, c := range p.commands {
172 if c.data != nil {
173 return c.ts, true
174 }
175 }
176 return time.Time{}, false
177 }
178
179 func TestStream(t *testing.T) {
180 Convey(`A testing stream config`, t, func() {
181 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
182 tp := testParser{}
183 c := streamConfig{
184 name: "test",
185 parser: &tp,
186 template: protocol.ButlerLogBundle_Entry{
187 Desc: &protocol.LogStreamDescriptor{
188 Prefix: "test-prefix",
189 Name: "test",
190 },
191 },
192 }
193
194 Convey(`With a 64-byte maximum buffer and 1 second maximum durat ion`, func() {
195 c.maximumBufferedBytes = 64
196 c.maximumBufferDuration = time.Second
197 s := newStream(c)
198
199 Convey(`Is not drained by default`, func() {
200 So(s.isDrained(), ShouldBeFalse)
201
202 Convey(`When closed, is drained.`, func() {
203 s.Close()
204 So(s.isDrained(), ShouldBeTrue)
205
206 Convey(`When closed again, is still drai ned.`, func() {
207 s.Close()
208 So(s.isDrained(), ShouldBeTrue)
209 })
210 })
211 })
212
213 Convey(`With no data, has no expiration time.`, func() {
214 _, has := s.expireTime()
215 So(has, ShouldBeFalse)
216 })
217
218 Convey(`Append will ignore a 0-byte chunk.`, func() {
219 d := data(tc.Now())
220 So(s.Append(d), ShouldBeNil)
221 So(d.released, ShouldBeTrue)
222 })
223
224 Convey(`Append will add two 32-byte chunks.`, func() {
225 content := bytes.Repeat([]byte{0xAA}, 32)
226 So(s.Append(data(tc.Now(), content...)), ShouldB eNil)
227 So(s.Append(data(tc.Now(), content...)), ShouldB eNil)
228 })
229
230 Convey(`Append will add a large chunk when there are no other Data blocks.`, func() {
231 d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 1 28)...)
232 So(s.Append(d), ShouldBeNil)
233
234 Convey(`Will use that data's timestamp as expira tion time.`, func() {
235 t, has := s.expireTime()
236 So(has, ShouldBeTrue)
237 So(t.Equal(tc.Now().Add(time.Second)), S houldBeTrue)
238 })
239 })
240
241 Convey(`Append will block if the chunk exceeds the buffe r size.`, func() {
242 signalC := make(chan struct{})
243 s.c.onAppend = func(appended bool) {
244 if !appended {
245 // We're waiting.
246 close(signalC)
247 }
248 }
249
250 // Add one chunk so we don't hit the "only byte" condition.
251 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0 xAA}, 34)...)), ShouldBeNil)
252
253 // Wait until we get the signal that Append() wi ll block, then consume
254 // some data and unblock Append().
255 blocked := false
256 go func() {
257 <-signalC
258
259 s.withParserLock(func() {
260 tp.popData()
261 })
262 blocked = true
263 s.signalDataConsumed()
264 }()
265
266 // Add one chunk so we don't hit the "only byte" condition.
267 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0 xBB}, 32)...)), ShouldBeNil)
268 So(blocked, ShouldBeTrue)
269 })
270
271 Convey(`Append in an error state`, func() {
272 terr := errors.New("test error")
273
274 Convey(`Will return the error state.`, func() {
275 s.setAppendError(terr)
276
277 d := data(tc.Now(), bytes.Repeat([]byte{ 0xAA}, 32)...)
278 So(s.Append(d), ShouldEqual, terr)
279 So(d.released, ShouldBeTrue)
280 })
281
282 Convey(`Will block if the chunk exceeds buffer s ize, and return error state.`, func() {
283 signalC := make(chan struct{})
284 s.c.onAppend = func(appended bool) {
285 if !appended {
286 // Waiting, notify our g oroutine that we're going to be waiting.
287 close(signalC)
288 }
289 }
290
291 // Add one chunk so we don't hit the "on ly byte" condition.
292 So(s.Append(data(tc.Now(), bytes.Repeat( []byte{0xAA}, 34)...)), ShouldBeNil)
293
294 // Wait until we get the signal that App end() will block, then consume
295 // some data and unblock Append().
296 go func() {
297 <-signalC
298 s.setAppendError(terr)
299 }()
300
301 // Add one chunk so we don't hit the "on ly byte" condition.
302 for _, sz := range []int{32, 1, 0} {
303 d := data(tc.Now(), bytes.Repeat ([]byte{0xAA}, sz)...)
304 So(s.Append(d), ShouldEqual, ter r)
305 So(d.released, ShouldBeTrue)
306 }
307 })
308 })
309 })
310
311 Convey(`When building bundle entries`, func() {
312 bb := &builder{
313 size: 1024,
314 }
315 s := newStream(c)
316
317 Convey(`Returns nil with no buffered data.`, func() {
318 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
319 So(bb.bundle(), shouldHaveBundleEntries)
320 })
321
322 Convey(`With a single record, returns that entry.`, func () {
323 tp.tags(tc.Now(), "a", "b")
324
325 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
326 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b")
327 })
328
329 Convey(`When split is allowed, returns nil.`, func() {
330 tp.tags(tc.Now(), "a", "b")
331 tp.setAllowSplit(true)
332 tp.tags(tc.Now(), "c")
333
334 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
335 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b")
336 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
337
338 So(s.nextBundleEntry(bb, true), ShouldBeTrue)
339 So(bb.bundle(), shouldHaveBundleEntries, "test:a :b:c")
340 })
341
342 Convey(`When an error occurs during stream parsing, drai ns stream.`, func() {
343 So(s.isDrained(), ShouldBeFalse)
344 tp.tags(tc.Now(), "a")
345 tp.addError(errTestInduced)
346 tp.tags(tc.Now(), "b")
347
348 So(s.nextBundleEntry(bb, false), ShouldBeTrue)
349 So(s.isDrained(), ShouldBeTrue)
350 So(bb.bundle(), shouldHaveBundleEntries, "+test: a")
351 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
352 })
353
354 Convey(`With only an error, returns no bundle entry.`, f unc() {
355 So(s.isDrained(), ShouldBeFalse)
356 tp.addError(errTestInduced)
357 tp.tags(tc.Now(), "a")
358 tp.tags(tc.Now(), "b")
359
360 So(s.nextBundleEntry(bb, false), ShouldBeFalse)
361 So(bb.bundle(), shouldHaveBundleEntries)
362 So(s.isDrained(), ShouldBeTrue)
363 })
364 })
365 })
366 }
367
368 // TestStreamSmoke tests a Stream in an actual multi-goroutine workflow.
369 func TestStreamSmoke(t *testing.T) {
370 Convey(`When running a smoke test`, t, func() {
371 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC))
372 tp := testParser{}
373 c := streamConfig{
374 name: "test",
375 parser: &tp,
376 template: protocol.ButlerLogBundle_Entry{
377 Desc: &protocol.LogStreamDescriptor{
378 Prefix: "test-prefix",
379 Name: "test",
380 },
381 },
382 }
383 s := newStream(c)
384
385 // Appender goroutine, constantly appends data.
386 //
387 // This will be inherently throttled by the nextBundle consumpti on.
388 go func() {
389 defer s.Close()
390
391 for i := 0; i < 512; i++ {
392 s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d", i))...))
393 tc.Add(time.Second)
394 }
395 }()
396
397 // The consumer goroutine will consume bundles from the stream.
398 consumerC := make(chan struct{})
399 bundleC := make(chan *protocol.ButlerLogBundle)
400 for i := 0; i < 32; i++ {
401 go func() {
402 defer func() {
403 consumerC <- struct{}{}
404 }()
405
406 b := (*builder)(nil)
407 for !s.isDrained() {
408 if b == nil {
409 b = &builder{
410 size: 128,
411 }
412 }
413
414 s.nextBundleEntry(b, false)
415 if b.hasContent() {
416 bundleC <- b.bundle()
417 b = nil
418 } else {
419 // No content! Sleep for a secon d and check again.
420 tc.Sleep(time.Second)
421 }
422 }
423 }()
424 }
425
426 // Collect all bundles.
427 gotIt := map[int]struct{}{}
428 collectDoneC := make(chan struct{})
429 go func() {
430 defer close(collectDoneC)
431
432 for bundle := range bundleC {
433 for _, be := range bundle.Entries {
434 for _, le := range be.Logs {
435 idx, _ := strconv.Atoi(logEntryN ame(le))
436 gotIt[idx] = struct{}{}
437 }
438 }
439 }
440 }()
441
442 for i := 0; i < 32; i++ {
443 <-consumerC
444 }
445 close(bundleC)
446
447 // Did we get them all?
448 <-collectDoneC
449 for i := 0; i < 512; i++ {
450 _, ok := gotIt[i]
451 So(ok, ShouldBeTrue)
452 }
453 })
454 }
OLDNEW
« no previous file with comments | « client/internal/logdog/butler/bundler/stream.go ('k') | client/internal/logdog/butler/bundler/textParser.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698