| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "errors" |
| 10 "fmt" |
| 11 "strconv" |
| 12 "testing" |
| 13 "time" |
| 14 |
| 15 "github.com/luci/luci-go/common/clock/testclock" |
| 16 "github.com/luci/luci-go/common/logdog/protocol" |
| 17 . "github.com/smartystreets/goconvey/convey" |
| 18 ) |
| 19 |
| 20 type testParserCommand struct { |
| 21 // data is the data content of this command. |
| 22 data []byte |
| 23 // ts is the timestamp, which is valid if this is a data command. |
| 24 ts time.Time |
| 25 // splitToggle, if not null, causes this command to set the "allowSplit" |
| 26 // parser constraint. |
| 27 splitToggle *bool |
| 28 // closedToggle, if not null, causes this command to set the "closed" |
| 29 // parser constraint. |
| 30 closedToggle *bool |
| 31 // err, if not nil, is returned when this command is encountered. |
| 32 err error |
| 33 } |
| 34 |
| 35 var errTestInduced = errors.New("test error") |
| 36 |
| 37 // testParser is a parser implementation that allows specifically-configured |
| 38 // data to be emitted. It consumes commands, some of which alter its behavior |
| 39 // and others of which present data. The resulting state affects how it emits |
| 40 // LogEntry records via nextEntry. |
| 41 type testParser struct { |
| 42 commands []*testParserCommand |
| 43 |
| 44 appendedData []byte |
| 45 truncateOn bool |
| 46 closedOn bool |
| 47 err error |
| 48 } |
| 49 |
| 50 func (p *testParser) addCommand(r *testParserCommand) { |
| 51 p.commands = append(p.commands, r) |
| 52 } |
| 53 |
| 54 func (p *testParser) nextCommand(pop bool) *testParserCommand { |
| 55 if len(p.commands) == 0 { |
| 56 return nil |
| 57 } |
| 58 cmd := p.commands[0] |
| 59 if pop { |
| 60 p.commands = p.commands[1:] |
| 61 } |
| 62 return cmd |
| 63 } |
| 64 |
| 65 func (p *testParser) popData() (r *testParserCommand) { |
| 66 for i, cmd := range p.commands { |
| 67 if cmd.data != nil { |
| 68 p.commands = p.commands[i+1:] |
| 69 return cmd |
| 70 } |
| 71 } |
| 72 return nil |
| 73 } |
| 74 |
| 75 func (p *testParser) tags(ts time.Time, commands ...string) { |
| 76 for _, c := range commands { |
| 77 p.addTag(c, ts) |
| 78 } |
| 79 } |
| 80 |
| 81 func (p *testParser) addError(err error) { |
| 82 p.addCommand(&testParserCommand{ |
| 83 err: err, |
| 84 }) |
| 85 } |
| 86 |
| 87 func (p *testParser) addTag(tag string, ts time.Time) { |
| 88 p.addData([]byte(tag), ts) |
| 89 } |
| 90 |
| 91 func (p *testParser) addData(d []byte, ts time.Time) { |
| 92 p.addCommand(&testParserCommand{ |
| 93 data: d, |
| 94 ts: ts, |
| 95 }) |
| 96 } |
| 97 |
| 98 func (p *testParser) setAllowSplit(value bool) { |
| 99 p.addCommand(&testParserCommand{ |
| 100 splitToggle: &value, |
| 101 }) |
| 102 } |
| 103 |
| 104 func (p *testParser) setClosed(value bool) { |
| 105 p.addCommand(&testParserCommand{ |
| 106 closedToggle: &value, |
| 107 }) |
| 108 } |
| 109 |
| 110 func (p *testParser) appendData(d Data) { |
| 111 p.addData(d.Bytes(), d.Timestamp()) |
| 112 } |
| 113 |
| 114 func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) { |
| 115 // Process records until we hit data or run out. |
| 116 for p.err == nil { |
| 117 rec := p.nextCommand(false) |
| 118 if rec == nil { |
| 119 return nil, p.err |
| 120 } |
| 121 |
| 122 // If this is a data record, process. |
| 123 if rec.data != nil { |
| 124 break |
| 125 } |
| 126 |
| 127 // Ingest commands, repeat. |
| 128 if rec.err != nil { |
| 129 p.err = rec.err |
| 130 break |
| 131 } |
| 132 |
| 133 if rec.splitToggle != nil { |
| 134 p.truncateOn = *rec.splitToggle |
| 135 } |
| 136 if rec.closedToggle != nil { |
| 137 p.closedOn = *rec.closedToggle |
| 138 } |
| 139 p.nextCommand(true) |
| 140 } |
| 141 |
| 142 if p.err != nil { |
| 143 return nil, p.err |
| 144 } |
| 145 |
| 146 // This is a data record. If we're configured to not yield it, leave it
and |
| 147 // return nil. |
| 148 if p.truncateOn && (!c.allowSplit || (p.closedOn && !c.closed)) { |
| 149 return nil, nil |
| 150 } |
| 151 |
| 152 // Consume this record. |
| 153 rec := p.nextCommand(true) |
| 154 return &protocol.LogEntry{ |
| 155 Content: &protocol.LogEntry_Text{Text: &protocol.Text{ |
| 156 Lines: []*protocol.Text_Line{ |
| 157 {Value: string(rec.data)}, |
| 158 }, |
| 159 }}, |
| 160 }, nil |
| 161 } |
| 162 |
| 163 func (p *testParser) bufferedBytes() (r int64) { |
| 164 for _, rec := range p.commands { |
| 165 r += int64(len(rec.data)) |
| 166 } |
| 167 return |
| 168 } |
| 169 |
| 170 func (p *testParser) firstChunkTime() (time.Time, bool) { |
| 171 for _, c := range p.commands { |
| 172 if c.data != nil { |
| 173 return c.ts, true |
| 174 } |
| 175 } |
| 176 return time.Time{}, false |
| 177 } |
| 178 |
| 179 func TestStream(t *testing.T) { |
| 180 Convey(`A testing stream config`, t, func() { |
| 181 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
| 182 tp := testParser{} |
| 183 c := streamConfig{ |
| 184 name: "test", |
| 185 parser: &tp, |
| 186 template: protocol.ButlerLogBundle_Entry{ |
| 187 Desc: &protocol.LogStreamDescriptor{ |
| 188 Prefix: "test-prefix", |
| 189 Name: "test", |
| 190 }, |
| 191 }, |
| 192 } |
| 193 |
| 194 Convey(`With a 64-byte maximum buffer and 1 second maximum durat
ion`, func() { |
| 195 c.maximumBufferedBytes = 64 |
| 196 c.maximumBufferDuration = time.Second |
| 197 s := newStream(c) |
| 198 |
| 199 Convey(`Is not drained by default`, func() { |
| 200 So(s.isDrained(), ShouldBeFalse) |
| 201 |
| 202 Convey(`When closed, is drained.`, func() { |
| 203 s.Close() |
| 204 So(s.isDrained(), ShouldBeTrue) |
| 205 |
| 206 Convey(`When closed again, is still drai
ned.`, func() { |
| 207 s.Close() |
| 208 So(s.isDrained(), ShouldBeTrue) |
| 209 }) |
| 210 }) |
| 211 }) |
| 212 |
| 213 Convey(`With no data, has no expiration time.`, func() { |
| 214 _, has := s.expireTime() |
| 215 So(has, ShouldBeFalse) |
| 216 }) |
| 217 |
| 218 Convey(`Append will ignore a 0-byte chunk.`, func() { |
| 219 d := data(tc.Now()) |
| 220 So(s.Append(d), ShouldBeNil) |
| 221 So(d.released, ShouldBeTrue) |
| 222 }) |
| 223 |
| 224 Convey(`Append will add two 32-byte chunks.`, func() { |
| 225 content := bytes.Repeat([]byte{0xAA}, 32) |
| 226 So(s.Append(data(tc.Now(), content...)), ShouldB
eNil) |
| 227 So(s.Append(data(tc.Now(), content...)), ShouldB
eNil) |
| 228 }) |
| 229 |
| 230 Convey(`Append will add a large chunk when there are no
other Data blocks.`, func() { |
| 231 d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 1
28)...) |
| 232 So(s.Append(d), ShouldBeNil) |
| 233 |
| 234 Convey(`Will use that data's timestamp as expira
tion time.`, func() { |
| 235 t, has := s.expireTime() |
| 236 So(has, ShouldBeTrue) |
| 237 So(t.Equal(tc.Now().Add(time.Second)), S
houldBeTrue) |
| 238 }) |
| 239 }) |
| 240 |
| 241 Convey(`Append will block if the chunk exceeds the buffe
r size.`, func() { |
| 242 signalC := make(chan struct{}) |
| 243 s.c.onAppend = func(appended bool) { |
| 244 if !appended { |
| 245 // We're waiting. |
| 246 close(signalC) |
| 247 } |
| 248 } |
| 249 |
| 250 // Add one chunk so we don't hit the "only byte"
condition. |
| 251 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0
xAA}, 34)...)), ShouldBeNil) |
| 252 |
| 253 // Wait until we get the signal that Append() wi
ll block, then consume |
| 254 // some data and unblock Append(). |
| 255 blocked := false |
| 256 go func() { |
| 257 <-signalC |
| 258 |
| 259 s.withParserLock(func() { |
| 260 tp.popData() |
| 261 }) |
| 262 blocked = true |
| 263 s.signalDataConsumed() |
| 264 }() |
| 265 |
| 266 // Add one chunk so we don't hit the "only byte"
condition. |
| 267 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0
xBB}, 32)...)), ShouldBeNil) |
| 268 So(blocked, ShouldBeTrue) |
| 269 }) |
| 270 |
| 271 Convey(`Append in an error state`, func() { |
| 272 terr := errors.New("test error") |
| 273 |
| 274 Convey(`Will return the error state.`, func() { |
| 275 s.setAppendError(terr) |
| 276 |
| 277 d := data(tc.Now(), bytes.Repeat([]byte{
0xAA}, 32)...) |
| 278 So(s.Append(d), ShouldEqual, terr) |
| 279 So(d.released, ShouldBeTrue) |
| 280 }) |
| 281 |
| 282 Convey(`Will block if the chunk exceeds buffer s
ize, and return error state.`, func() { |
| 283 signalC := make(chan struct{}) |
| 284 s.c.onAppend = func(appended bool) { |
| 285 if !appended { |
| 286 // Waiting, notify our g
oroutine that we're going to be waiting. |
| 287 close(signalC) |
| 288 } |
| 289 } |
| 290 |
| 291 // Add one chunk so we don't hit the "on
ly byte" condition. |
| 292 So(s.Append(data(tc.Now(), bytes.Repeat(
[]byte{0xAA}, 34)...)), ShouldBeNil) |
| 293 |
| 294 // Wait until we get the signal that App
end() will block, then consume |
| 295 // some data and unblock Append(). |
| 296 go func() { |
| 297 <-signalC |
| 298 s.setAppendError(terr) |
| 299 }() |
| 300 |
| 301 // Add one chunk so we don't hit the "on
ly byte" condition. |
| 302 for _, sz := range []int{32, 1, 0} { |
| 303 d := data(tc.Now(), bytes.Repeat
([]byte{0xAA}, sz)...) |
| 304 So(s.Append(d), ShouldEqual, ter
r) |
| 305 So(d.released, ShouldBeTrue) |
| 306 } |
| 307 }) |
| 308 }) |
| 309 }) |
| 310 |
| 311 Convey(`When building bundle entries`, func() { |
| 312 bb := &builder{ |
| 313 size: 1024, |
| 314 } |
| 315 s := newStream(c) |
| 316 |
| 317 Convey(`Returns nil with no buffered data.`, func() { |
| 318 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 319 So(bb.bundle(), shouldHaveBundleEntries) |
| 320 }) |
| 321 |
| 322 Convey(`With a single record, returns that entry.`, func
() { |
| 323 tp.tags(tc.Now(), "a", "b") |
| 324 |
| 325 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 326 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b") |
| 327 }) |
| 328 |
| 329 Convey(`When split is allowed, returns nil.`, func() { |
| 330 tp.tags(tc.Now(), "a", "b") |
| 331 tp.setAllowSplit(true) |
| 332 tp.tags(tc.Now(), "c") |
| 333 |
| 334 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 335 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b") |
| 336 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 337 |
| 338 So(s.nextBundleEntry(bb, true), ShouldBeTrue) |
| 339 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b:c") |
| 340 }) |
| 341 |
| 342 Convey(`When an error occurs during stream parsing, drai
ns stream.`, func() { |
| 343 So(s.isDrained(), ShouldBeFalse) |
| 344 tp.tags(tc.Now(), "a") |
| 345 tp.addError(errTestInduced) |
| 346 tp.tags(tc.Now(), "b") |
| 347 |
| 348 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 349 So(s.isDrained(), ShouldBeTrue) |
| 350 So(bb.bundle(), shouldHaveBundleEntries, "+test:
a") |
| 351 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 352 }) |
| 353 |
| 354 Convey(`With only an error, returns no bundle entry.`, f
unc() { |
| 355 So(s.isDrained(), ShouldBeFalse) |
| 356 tp.addError(errTestInduced) |
| 357 tp.tags(tc.Now(), "a") |
| 358 tp.tags(tc.Now(), "b") |
| 359 |
| 360 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 361 So(bb.bundle(), shouldHaveBundleEntries) |
| 362 So(s.isDrained(), ShouldBeTrue) |
| 363 }) |
| 364 }) |
| 365 }) |
| 366 } |
| 367 |
| 368 // TestStreamSmoke tests a Stream in an actual multi-goroutine workflow. |
| 369 func TestStreamSmoke(t *testing.T) { |
| 370 Convey(`When running a smoke test`, t, func() { |
| 371 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
| 372 tp := testParser{} |
| 373 c := streamConfig{ |
| 374 name: "test", |
| 375 parser: &tp, |
| 376 template: protocol.ButlerLogBundle_Entry{ |
| 377 Desc: &protocol.LogStreamDescriptor{ |
| 378 Prefix: "test-prefix", |
| 379 Name: "test", |
| 380 }, |
| 381 }, |
| 382 } |
| 383 s := newStream(c) |
| 384 |
| 385 // Appender goroutine, constantly appends data. |
| 386 // |
| 387 // This will be inherently throttled by the nextBundle consumpti
on. |
| 388 go func() { |
| 389 defer s.Close() |
| 390 |
| 391 for i := 0; i < 512; i++ { |
| 392 s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d",
i))...)) |
| 393 tc.Add(time.Second) |
| 394 } |
| 395 }() |
| 396 |
| 397 // The consumer goroutine will consume bundles from the stream. |
| 398 consumerC := make(chan struct{}) |
| 399 bundleC := make(chan *protocol.ButlerLogBundle) |
| 400 for i := 0; i < 32; i++ { |
| 401 go func() { |
| 402 defer func() { |
| 403 consumerC <- struct{}{} |
| 404 }() |
| 405 |
| 406 b := (*builder)(nil) |
| 407 for !s.isDrained() { |
| 408 if b == nil { |
| 409 b = &builder{ |
| 410 size: 128, |
| 411 } |
| 412 } |
| 413 |
| 414 s.nextBundleEntry(b, false) |
| 415 if b.hasContent() { |
| 416 bundleC <- b.bundle() |
| 417 b = nil |
| 418 } else { |
| 419 // No content! Sleep for a secon
d and check again. |
| 420 tc.Sleep(time.Second) |
| 421 } |
| 422 } |
| 423 }() |
| 424 } |
| 425 |
| 426 // Collect all bundles. |
| 427 gotIt := map[int]struct{}{} |
| 428 collectDoneC := make(chan struct{}) |
| 429 go func() { |
| 430 defer close(collectDoneC) |
| 431 |
| 432 for bundle := range bundleC { |
| 433 for _, be := range bundle.Entries { |
| 434 for _, le := range be.Logs { |
| 435 idx, _ := strconv.Atoi(logEntryN
ame(le)) |
| 436 gotIt[idx] = struct{}{} |
| 437 } |
| 438 } |
| 439 } |
| 440 }() |
| 441 |
| 442 for i := 0; i < 32; i++ { |
| 443 <-consumerC |
| 444 } |
| 445 close(bundleC) |
| 446 |
| 447 // Did we get them all? |
| 448 <-collectDoneC |
| 449 for i := 0; i < 512; i++ { |
| 450 _, ok := gotIt[i] |
| 451 So(ok, ShouldBeTrue) |
| 452 } |
| 453 }) |
| 454 } |
| OLD | NEW |