| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bundler |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "errors" |
| 10 "fmt" |
| 11 "strconv" |
| 12 "testing" |
| 13 "time" |
| 14 |
| 15 "github.com/luci/luci-go/common/clock/testclock" |
| 16 "github.com/luci/luci-go/common/logdog/protocol" |
| 17 . "github.com/smartystreets/goconvey/convey" |
| 18 ) |
| 19 |
| 20 type testParserCommand struct { |
| 21 // data is the data content of this command. |
| 22 data []byte |
| 23 // ts is the timestamp, which is valid if this is a data command. |
| 24 ts time.Time |
| 25 // truncate causes this command to only return if configured to truncate
. |
| 26 truncateToggle *bool |
| 27 // closed causes this command to only return if configured to truncate a
nd closed. |
| 28 closedToggle *bool |
| 29 // err, if not nil, is returned when this command is encountered. |
| 30 err error |
| 31 } |
| 32 |
| 33 var errTestInduced = errors.New("test error") |
| 34 |
| 35 // testParser is a parser implementation that allows specifically-configured |
| 36 // data to be emitted. It consumes commands, some of which alter its behavior |
| 37 // and others of which present data. The resulting state affects how it emits |
| 38 // LogEntry records via nextEntry. |
| 39 type testParser struct { |
| 40 commands []*testParserCommand |
| 41 |
| 42 appendedData []byte |
| 43 truncateOn bool |
| 44 closedOn bool |
| 45 err error |
| 46 } |
| 47 |
| 48 func (p *testParser) addCommand(r *testParserCommand) { |
| 49 p.commands = append(p.commands, r) |
| 50 } |
| 51 |
| 52 func (p *testParser) nextCommand(pop bool) *testParserCommand { |
| 53 if len(p.commands) == 0 { |
| 54 return nil |
| 55 } |
| 56 cmd := p.commands[0] |
| 57 if pop { |
| 58 p.commands = p.commands[1:] |
| 59 } |
| 60 return cmd |
| 61 } |
| 62 |
| 63 func (p *testParser) popData() (r *testParserCommand) { |
| 64 for i, cmd := range p.commands { |
| 65 if cmd.data != nil { |
| 66 p.commands = p.commands[i+1:] |
| 67 return cmd |
| 68 } |
| 69 } |
| 70 return nil |
| 71 } |
| 72 |
| 73 func (p *testParser) tags(ts time.Time, commands ...string) { |
| 74 for _, c := range commands { |
| 75 p.addTag(c, ts) |
| 76 } |
| 77 } |
| 78 |
| 79 func (p *testParser) addError(err error) { |
| 80 p.addCommand(&testParserCommand{ |
| 81 err: err, |
| 82 }) |
| 83 } |
| 84 |
| 85 func (p *testParser) addTag(tag string, ts time.Time) { |
| 86 p.addData([]byte(tag), ts) |
| 87 } |
| 88 |
| 89 func (p *testParser) addData(d []byte, ts time.Time) { |
| 90 p.addCommand(&testParserCommand{ |
| 91 data: d, |
| 92 ts: ts, |
| 93 }) |
| 94 } |
| 95 |
| 96 func (p *testParser) setTrunc(value bool) { |
| 97 p.addCommand(&testParserCommand{ |
| 98 truncateToggle: &value, |
| 99 }) |
| 100 } |
| 101 |
| 102 func (p *testParser) setClosed(value bool) { |
| 103 p.addCommand(&testParserCommand{ |
| 104 closedToggle: &value, |
| 105 }) |
| 106 } |
| 107 |
| 108 func (p *testParser) appendData(d Data) { |
| 109 p.addData(d.Bytes(), d.Timestamp()) |
| 110 } |
| 111 |
| 112 func (p *testParser) nextEntry(c *constraints) (*protocol.LogEntry, error) { |
| 113 // Process records until we hit data or run out. |
| 114 for p.err == nil { |
| 115 rec := p.nextCommand(false) |
| 116 if rec == nil { |
| 117 return nil, p.err |
| 118 } |
| 119 |
| 120 // If this is a data record, process. |
| 121 if rec.data != nil { |
| 122 break |
| 123 } |
| 124 |
| 125 // Ingest commands, repeat. |
| 126 if rec.err != nil { |
| 127 p.err = rec.err |
| 128 break |
| 129 } |
| 130 |
| 131 if rec.truncateToggle != nil { |
| 132 p.truncateOn = *rec.truncateToggle |
| 133 } |
| 134 if rec.closedToggle != nil { |
| 135 p.closedOn = *rec.closedToggle |
| 136 } |
| 137 p.nextCommand(true) |
| 138 } |
| 139 |
| 140 if p.err != nil { |
| 141 return nil, p.err |
| 142 } |
| 143 |
| 144 // This is a data record. If we're configured to not yield it, leave it
and |
| 145 // return nil. |
| 146 if p.truncateOn && (!c.truncate || (p.closedOn && !c.closed)) { |
| 147 return nil, nil |
| 148 } |
| 149 |
| 150 // Consume this record. |
| 151 rec := p.nextCommand(true) |
| 152 return &protocol.LogEntry{ |
| 153 Content: &protocol.LogEntry_Text{Text: &protocol.Text{ |
| 154 Lines: []*protocol.Text_Line{ |
| 155 {Value: string(rec.data)}, |
| 156 }, |
| 157 }}, |
| 158 }, nil |
| 159 } |
| 160 |
| 161 func (p *testParser) bufferedBytes() (r int64) { |
| 162 for _, rec := range p.commands { |
| 163 r += int64(len(rec.data)) |
| 164 } |
| 165 return |
| 166 } |
| 167 |
| 168 func (p *testParser) firstChunkTime() (time.Time, bool) { |
| 169 for _, c := range p.commands { |
| 170 if c.data != nil { |
| 171 return c.ts, true |
| 172 } |
| 173 } |
| 174 return time.Time{}, false |
| 175 } |
| 176 |
| 177 func TestStream(t *testing.T) { |
| 178 Convey(`A testing stream config`, t, func() { |
| 179 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
| 180 tp := testParser{} |
| 181 c := streamConfig{ |
| 182 name: "test", |
| 183 parser: &tp, |
| 184 template: protocol.ButlerLogBundle_Entry{ |
| 185 Desc: &protocol.LogStreamDescriptor{ |
| 186 Prefix: "test-prefix", |
| 187 Name: "test", |
| 188 }, |
| 189 }, |
| 190 } |
| 191 |
| 192 Convey(`With a 64-byte maximum buffer and 1 second maximum durat
ion`, func() { |
| 193 c.maximumBufferedBytes = 64 |
| 194 c.maximumBufferDuration = time.Second |
| 195 s := newStream(c) |
| 196 |
| 197 Convey(`Is not drained by default`, func() { |
| 198 So(s.isDrained(), ShouldBeFalse) |
| 199 |
| 200 Convey(`When closed, is drained.`, func() { |
| 201 s.Close() |
| 202 So(s.isDrained(), ShouldBeTrue) |
| 203 |
| 204 Convey(`When closed again, is still drai
ned.`, func() { |
| 205 s.Close() |
| 206 So(s.isDrained(), ShouldBeTrue) |
| 207 }) |
| 208 }) |
| 209 }) |
| 210 |
| 211 Convey(`With no data, has no expiration time.`, func() { |
| 212 _, has := s.expireTime() |
| 213 So(has, ShouldBeFalse) |
| 214 }) |
| 215 |
| 216 Convey(`Append will ignore a 0-byte chunk.`, func() { |
| 217 d := data(tc.Now()) |
| 218 So(s.Append(d), ShouldBeNil) |
| 219 So(d.released, ShouldBeTrue) |
| 220 }) |
| 221 |
| 222 Convey(`Append will add two 32-byte chunks.`, func() { |
| 223 content := bytes.Repeat([]byte{0xAA}, 32) |
| 224 So(s.Append(data(tc.Now(), content...)), ShouldB
eNil) |
| 225 So(s.Append(data(tc.Now(), content...)), ShouldB
eNil) |
| 226 }) |
| 227 |
| 228 Convey(`Append will add a large chunk when there are no
other Data blocks.`, func() { |
| 229 d := data(tc.Now(), bytes.Repeat([]byte{0xAA}, 1
28)...) |
| 230 So(s.Append(d), ShouldBeNil) |
| 231 |
| 232 Convey(`Will use that data's timestamp as expira
tion time.`, func() { |
| 233 t, has := s.expireTime() |
| 234 So(has, ShouldBeTrue) |
| 235 So(t.Equal(tc.Now().Add(time.Second)), S
houldBeTrue) |
| 236 }) |
| 237 }) |
| 238 |
| 239 Convey(`Append will block if the chunk exceeds the buffe
r size.`, func() { |
| 240 signalC := make(chan struct{}) |
| 241 s.c.onAppend = func(appended bool) { |
| 242 if !appended { |
| 243 // We're waiting. |
| 244 close(signalC) |
| 245 } |
| 246 } |
| 247 |
| 248 // Add one chunk so we don't hit the "only byte"
condition. |
| 249 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0
xAA}, 34)...)), ShouldBeNil) |
| 250 |
| 251 // Wait until we get the signal that Append() wi
ll block, then consume |
| 252 // some data and unblock Append(). |
| 253 blocked := false |
| 254 go func() { |
| 255 <-signalC |
| 256 |
| 257 s.withParserLock(func() { |
| 258 tp.popData() |
| 259 }) |
| 260 blocked = true |
| 261 s.signalDataConsumed() |
| 262 }() |
| 263 |
| 264 // Add one chunk so we don't hit the "only byte"
condition. |
| 265 So(s.Append(data(tc.Now(), bytes.Repeat([]byte{0
xBB}, 32)...)), ShouldBeNil) |
| 266 So(blocked, ShouldBeTrue) |
| 267 }) |
| 268 |
| 269 Convey(`Append in an error state`, func() { |
| 270 terr := errors.New("test error") |
| 271 |
| 272 Convey(`Will return the error state.`, func() { |
| 273 s.setAppendError(terr) |
| 274 |
| 275 d := data(tc.Now(), bytes.Repeat([]byte{
0xAA}, 32)...) |
| 276 So(s.Append(d), ShouldEqual, terr) |
| 277 So(d.released, ShouldBeTrue) |
| 278 }) |
| 279 |
| 280 Convey(`Will block if the chunk exceeds buffer s
ize, and return error state.`, func() { |
| 281 signalC := make(chan struct{}) |
| 282 s.c.onAppend = func(appended bool) { |
| 283 if !appended { |
| 284 // Waiting, notify our g
oroutine that we're going to be waiting. |
| 285 close(signalC) |
| 286 } |
| 287 } |
| 288 |
| 289 // Add one chunk so we don't hit the "on
ly byte" condition. |
| 290 So(s.Append(data(tc.Now(), bytes.Repeat(
[]byte{0xAA}, 34)...)), ShouldBeNil) |
| 291 |
| 292 // Wait until we get the signal that App
end() will block, then consume |
| 293 // some data and unblock Append(). |
| 294 go func() { |
| 295 <-signalC |
| 296 s.setAppendError(terr) |
| 297 }() |
| 298 |
| 299 // Add one chunk so we don't hit the "on
ly byte" condition. |
| 300 for _, sz := range []int{32, 1, 0} { |
| 301 d := data(tc.Now(), bytes.Repeat
([]byte{0xAA}, sz)...) |
| 302 So(s.Append(d), ShouldEqual, ter
r) |
| 303 So(d.released, ShouldBeTrue) |
| 304 } |
| 305 }) |
| 306 }) |
| 307 }) |
| 308 |
| 309 Convey(`When building bundle entries`, func() { |
| 310 bb := &builder{ |
| 311 size: 1024, |
| 312 } |
| 313 s := newStream(c) |
| 314 |
| 315 Convey(`Returns nil with no buffered data.`, func() { |
| 316 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 317 So(bb.bundle(), shouldHaveBundleEntries) |
| 318 }) |
| 319 |
| 320 Convey(`With a single record, returns that entry.`, func
() { |
| 321 tp.tags(tc.Now(), "a", "b") |
| 322 |
| 323 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 324 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b") |
| 325 }) |
| 326 |
| 327 Convey(`When truncate is required, returns nil.`, func()
{ |
| 328 tp.tags(tc.Now(), "a", "b") |
| 329 tp.setTrunc(true) |
| 330 tp.tags(tc.Now(), "c") |
| 331 |
| 332 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 333 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b") |
| 334 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 335 |
| 336 So(s.nextBundleEntry(bb, true), ShouldBeTrue) |
| 337 So(bb.bundle(), shouldHaveBundleEntries, "test:a
:b:c") |
| 338 }) |
| 339 |
| 340 Convey(`When an error occurs during stream parsing, drai
ns stream.`, func() { |
| 341 So(s.isDrained(), ShouldBeFalse) |
| 342 tp.tags(tc.Now(), "a") |
| 343 tp.addError(errTestInduced) |
| 344 tp.tags(tc.Now(), "b") |
| 345 |
| 346 So(s.nextBundleEntry(bb, false), ShouldBeTrue) |
| 347 So(s.isDrained(), ShouldBeTrue) |
| 348 So(bb.bundle(), shouldHaveBundleEntries, "+test:
a") |
| 349 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 350 }) |
| 351 |
| 352 Convey(`With only an error, returns no bundle entry.`, f
unc() { |
| 353 So(s.isDrained(), ShouldBeFalse) |
| 354 tp.addError(errTestInduced) |
| 355 tp.tags(tc.Now(), "a") |
| 356 tp.tags(tc.Now(), "b") |
| 357 |
| 358 So(s.nextBundleEntry(bb, false), ShouldBeFalse) |
| 359 So(bb.bundle(), shouldHaveBundleEntries) |
| 360 So(s.isDrained(), ShouldBeTrue) |
| 361 }) |
| 362 }) |
| 363 }) |
| 364 } |
| 365 |
| 366 // TestStreamSmoke tests a Stream in an actual multi-goroutine workflow. |
| 367 func TestStreamSmoke(t *testing.T) { |
| 368 Convey(`When running a smoke test`, t, func() { |
| 369 tc := testclock.New(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) |
| 370 tp := testParser{} |
| 371 c := streamConfig{ |
| 372 name: "test", |
| 373 parser: &tp, |
| 374 template: protocol.ButlerLogBundle_Entry{ |
| 375 Desc: &protocol.LogStreamDescriptor{ |
| 376 Prefix: "test-prefix", |
| 377 Name: "test", |
| 378 }, |
| 379 }, |
| 380 } |
| 381 s := newStream(c) |
| 382 |
| 383 // Appender goroutine, constantly appends data. |
| 384 // |
| 385 // This will be inherently throttled by the nextBundle consumpti
on. |
| 386 go func() { |
| 387 defer s.Close() |
| 388 |
| 389 for i := 0; i < 512; i++ { |
| 390 s.Append(data(tc.Now(), []byte(fmt.Sprintf("%d",
i))...)) |
| 391 tc.Add(time.Second) |
| 392 } |
| 393 }() |
| 394 |
| 395 // The consumer goroutine will consume bundles from the stream. |
| 396 consumerC := make(chan struct{}) |
| 397 bundleC := make(chan *protocol.ButlerLogBundle) |
| 398 for i := 0; i < 32; i++ { |
| 399 go func() { |
| 400 defer func() { |
| 401 consumerC <- struct{}{} |
| 402 }() |
| 403 |
| 404 b := (*builder)(nil) |
| 405 for !s.isDrained() { |
| 406 if b == nil { |
| 407 b = &builder{ |
| 408 size: 128, |
| 409 } |
| 410 } |
| 411 |
| 412 s.nextBundleEntry(b, false) |
| 413 if b.hasContent() { |
| 414 bundleC <- b.bundle() |
| 415 b = nil |
| 416 } else { |
| 417 // No content! Sleep for a secon
d and check again. |
| 418 tc.Sleep(time.Second) |
| 419 } |
| 420 } |
| 421 }() |
| 422 } |
| 423 |
| 424 // Collect all bundles. |
| 425 gotIt := map[int]struct{}{} |
| 426 collectDoneC := make(chan struct{}) |
| 427 go func() { |
| 428 defer close(collectDoneC) |
| 429 |
| 430 for bundle := range bundleC { |
| 431 for _, be := range bundle.Entries { |
| 432 for _, le := range be.Logs { |
| 433 idx, _ := strconv.Atoi(logEntryN
ame(le)) |
| 434 gotIt[idx] = struct{}{} |
| 435 } |
| 436 } |
| 437 } |
| 438 }() |
| 439 |
| 440 for i := 0; i < 32; i++ { |
| 441 <-consumerC |
| 442 } |
| 443 close(bundleC) |
| 444 |
| 445 // Did we get them all? |
| 446 <-collectDoneC |
| 447 for i := 0; i < 512; i++ { |
| 448 _, ok := gotIt[i] |
| 449 So(ok, ShouldBeTrue) |
| 450 } |
| 451 }) |
| 452 } |
| OLD | NEW |