Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(43)

Unified Diff: client/internal/logdog/butler/bundler/textParser.go

Issue 1412063008: logdog: Add bundler library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Enhanced doc.go. Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: client/internal/logdog/butler/bundler/textParser.go
diff --git a/client/internal/logdog/butler/bundler/textParser.go b/client/internal/logdog/butler/bundler/textParser.go
new file mode 100644
index 0000000000000000000000000000000000000000..eb0cf459381eac71fbe8f8deb87ac1de312f7995
--- /dev/null
+++ b/client/internal/logdog/butler/bundler/textParser.go
@@ -0,0 +1,137 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bundler
+
+import (
+ "bytes"
+ "time"
+ "unicode/utf8"
+
+ "github.com/luci/luci-go/common/logdog/protocol"
+)
+
+const (
+ posixNewline = "\n"
+ windowsNewline = "\r\n"
+)
+
+var (
+ posixNewlineBytes = []byte(posixNewline)
+)
+
+// textParser is a parser implementation for the LogDog TEXT stream type.
+type textParser struct {
+ baseParser
+
+ sequence int64
+ buf bytes.Buffer
+}
+
+var _ parser = (*textParser)(nil)
+
+func (p *textParser) nextEntry(c *constraints) (*protocol.LogEntry, error) {
+ limit := int64(c.limit)
+ ts := time.Time{}
+ txt := protocol.Text{}
+ lineCount := 0
+ for limit > 0 {
+ br := p.ReaderLimit(limit)
+ if br.Remaining() == 0 {
+ // Exceeded either limit or available buffer data.
+ break
+ }
+
+ // Use the timestamp of the first data chunk.
+ if len(txt.Lines) == 0 {
+ ts, _ = p.firstChunkTime()
+ } else if ct, _ := p.firstChunkTime(); !ct.Equal(ts) {
+ // New timestamp, so need new LogEntry.
+ break
+ }
+
+ // Find the index of our delimiter.
+ //
+ // We do this using a cross-platform approach that works on POSIX systems,
+ // Mac (>=OSX), and Windows: we scan for "\n", then look backwards to see if
+ // it was preceded by "\r" (for Windows-style newlines, "\r\n").
+ idx := br.Index(posixNewlineBytes)
+
+ newline := ""
+ if idx >= 0 {
+ br = br.CloneLimit(idx)
+ newline = posixNewline
+ } else if !c.truncate {
+ // No delimiter within our limit, and we're not asked to truncate, so
+ // we're done.
+ break
+ }
+
+ // Load the exportable data into our buffer.
+ p.buf.Reset()
+ p.buf.ReadFrom(br)
+
+ // Does our exportable buffer end with "\r"? If so, treat it as a possible
+ // Windows newline sequence.
+ if p.buf.Len() > 0 && p.buf.Bytes()[p.buf.Len()-1] == byte('\r') {
+ truncate := false
+ if newline != "" {
+ // "\n" => "\r\n"
+ newline = windowsNewline
+ truncate = true
+ } else {
+ // If we're closed and this is the last byte in the stream, it is a
+ // dangling "\r" and we should include it. Otherwise, leave it for the
+ // next round.
iannucci 2015/11/13 07:16:38 what about weird stuff where the last bundle doesn
dnj 2015/11/14 00:30:36 "closed" means the input stream is closed. The bun
+ truncate = !(c.closed && int64(p.buf.Len()) == p.Len())
+ }
+
+ if truncate {
+ p.buf.Truncate(p.buf.Len() - 1)
+ }
+ }
+
+ partial := (idx < 0)
+ if !partial {
+ lineCount++
+ }
+
+ // If we didn't have a delimiter, make sure we don't terminate in the middle
+ // of a UTF8 character.
+ if partial {
+ count := 0
+ lidx := -1
+ b := p.buf.Bytes()
+ for len(b) > 0 {
+ r, sz := utf8.DecodeRune(b)
+ count += sz
+ if r != utf8.RuneError {
+ lidx = count
+ }
+ b = b[sz:]
+ }
+ if lidx < 0 {
+ break
+ }
+ p.buf.Truncate(lidx)
+ }
+
+ txt.Lines = append(txt.Lines, &protocol.Text_Line{
+ Value: p.buf.String(),
+ Delimiter: newline,
+ })
+ p.Consume(int64(p.buf.Len() + len(newline)))
+ limit -= int64(p.buf.Len() + len(newline))
+ }
+
+ if len(txt.Lines) == 0 {
+ return nil, nil
+ }
+ le := p.baseLogEntry(ts)
+ le.Sequence = uint64(p.sequence)
+ le.Content = &protocol.LogEntry_Text{Text: &txt}
+
+ p.sequence += int64(lineCount)
+ return le, nil
+}

Powered by Google App Engine
This is Rietveld 408576698