Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Unified Diff: client/internal/logdog/butler/bundler/datagramParser.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/datagramParser.go
diff --git a/client/internal/logdog/butler/bundler/datagramParser.go b/client/internal/logdog/butler/bundler/datagramParser.go
new file mode 100644
index 0000000000000000000000000000000000000000..f9c5ccd462e671f5b846da77b430bc80f0900c22
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/datagramParser.go
@@ -0,0 +1,128 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/luci/luci-go/common/logdog/protocol"
+ "github.com/luci/luci-go/common/recordio"
+)
+
+// datagramParser is a parser implementation for the LogDog DATAGRAb stream
iannucci 2015/11/13 07:16:38 DATAGRAB is actually the codename of a secret gove
dnj 2015/11/14 00:30:36 A not-so-crafty code word!
+// type.
+type datagramParser struct {
+ baseParser
+
+ // maxSize is the maximum allowed datagram size. Datagrams larger than this
+ // will result in a processing error.
+ maxSize int64
+
+ // seq is the current datagram sequence number.
+ seq int64
+ // remaining is the amount of data remaining in a datagram that has previously
+ // been emitted partially.
+ //
+ // This will be zero if we're not continuing a partial datagram.
+ remaining int64
+ // size is the size of the current partial datagram.
+ //
+ // This value is only valid if we're continuing a partial datagram (i.e., if
+ // remaining is non-zero).
+ size int64
+}
+
+var _ parser = (*datagramParser)(nil)
+
+func (s *datagramParser) nextEntry(c *constraints) (*protocol.LogEntry, error) {
+ // Use the current Buffer timestamp.
+ ts, has := s.firstChunkTime()
+ if !has {
+ // No chunks, so no data.
+ return nil, nil
+ }
+
+ // If remaining is zero, we don't have a buffered size header.
+ //
+ // Note that zero-size datagrams will store zero here on load; however, such
+ // datagrams will never fail to emit a LogEntry, so s.remaining will have been
+ // reset to zero by the next call.
+ if s.remaining == 0 {
+ br := s.Reader()
+
+ // Read the next datagram size header.
+ rio := recordio.NewReader(br, s.maxSize)
+ size, _, err := rio.ReadFrame()
+ if err != nil {
+ switch err {
+ case io.EOF:
+ // Not enough data for a size header.
+ return nil, nil
+
+ case recordio.ErrFrameTooLarge:
+ return nil, recordio.ErrFrameTooLarge
+
+ default:
+ // Non-EOF errors should not be possible.
+ panic(fmt.Errorf("unexpected error encountered: %v", err))
+ }
+ }
+
+ s.size = size
+ s.remaining = size
+
+ // Don't need to read the size header again.
+ s.Consume(br.Consumed())
+ }
+
+ // If we read this, will it be partial?
+ emitCount := s.remaining
+ partial := false
+ if emitCount > int64(c.limit) {
+ partial = true
+ emitCount = int64(c.limit)
+ }
+
+ br := s.ReaderLimit(s.remaining)
+ if r := br.Remaining(); r < emitCount {
+ // Not enough buffered data to complete the datagram in one round.
+ partial = true
+ emitCount = r
+ }
+ if s.remaining > 0 && emitCount == 0 {
+ // The datagram has data, but we can't emit any of it. No point in issuing
+ // a zero-size partial datagram.
+ return nil, nil
+ }
+
+ // We're not willing to emit a partial datagram unless we're allowed to
+ // truncate.
iannucci 2015/11/13 07:16:38 erm... does truncate actually mean truncate? I tho
dnj 2015/11/14 00:30:36 Yeah "truncate" says, "you may truncate this data
+ if partial && !c.truncate {
+ return nil, nil
+ }
+
+ dg := protocol.Datagram{
+ Partial: partial,
+ Size: uint64(s.size),
+ }
+ if emitCount > 0 {
+ dg.Data = make([]byte, emitCount)
+ br.Read(dg.Data)
+ s.Consume(emitCount)
+ }
+
+ le := s.baseLogEntry(ts)
+ le.Sequence = uint64(s.seq)
+ le.Content = &protocol.LogEntry_Datagram{Datagram: &dg}
+
+ if !partial {
+ s.seq++
+ s.remaining = 0
+ } else {
+ s.remaining -= emitCount
+ }
+ return le, nil
+}

Powered by Google App Engine
This is Rietveld 408576698