Chromium Code Reviews| Index: client/internal/logdog/butler/bundler/textParser.go |
| diff --git a/client/internal/logdog/butler/bundler/textParser.go b/client/internal/logdog/butler/bundler/textParser.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..eb0cf459381eac71fbe8f8deb87ac1de312f7995 |
| --- /dev/null |
| +++ b/client/internal/logdog/butler/bundler/textParser.go |
| @@ -0,0 +1,137 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package bundler |
| + |
| +import ( |
| + "bytes" |
| + "time" |
| + "unicode/utf8" |
| + |
| + "github.com/luci/luci-go/common/logdog/protocol" |
| +) |
| + |
| +const ( |
| + posixNewline = "\n" |
| + windowsNewline = "\r\n" |
| +) |
| + |
| +var ( |
| + posixNewlineBytes = []byte(posixNewline) |
| +) |
| + |
| +// textParser is a parser implementation for the LogDog TEXT stream type. |
| +type textParser struct { |
| + baseParser |
| + |
| + sequence int64 |
| + buf bytes.Buffer |
| +} |
| + |
| +var _ parser = (*textParser)(nil) |
| + |
| +func (p *textParser) nextEntry(c *constraints) (*protocol.LogEntry, error) { |
| + limit := int64(c.limit) |
| + ts := time.Time{} |
| + txt := protocol.Text{} |
| + lineCount := 0 |
| + for limit > 0 { |
| + br := p.ReaderLimit(limit) |
| + if br.Remaining() == 0 { |
| + // Exceeded either limit or available buffer data. |
| + break |
| + } |
| + |
| + // Use the timestamp of the first data chunk. |
| + if len(txt.Lines) == 0 { |
| + ts, _ = p.firstChunkTime() |
| + } else if ct, _ := p.firstChunkTime(); !ct.Equal(ts) { |
| + // New timestamp, so need new LogEntry. |
| + break |
| + } |
| + |
| + // Find the index of our delimiter. |
| + // |
| + // We do this using a cross-platform approach that works on POSIX systems, |
| + // Mac (>=OSX), and Windows: we scan for "\n", then look backwards to see if |
| + // it was preceded by "\r" (for Windows-style newlines, "\r\n"). |
| + idx := br.Index(posixNewlineBytes) |
| + |
| + newline := "" |
| + if idx >= 0 { |
| + br = br.CloneLimit(idx) |
| + newline = posixNewline |
| + } else if !c.truncate { |
| + // No delimiter within our limit, and we're not asked to truncate, so |
| + // we're done. |
| + break |
| + } |
| + |
| + // Load the exportable data into our buffer. |
| + p.buf.Reset() |
| + p.buf.ReadFrom(br) |
| + |
| + // Does our exportable buffer end with "\r"? If so, treat it as a possible |
| + // Windows newline sequence. |
| + if p.buf.Len() > 0 && p.buf.Bytes()[p.buf.Len()-1] == byte('\r') { |
| + truncate := false |
| + if newline != "" { |
| + // "\n" => "\r\n" |
| + newline = windowsNewline |
| + truncate = true |
| + } else { |
| + // If we're closed and this is the last byte in the stream, it is a |
| + // dangling "\r" and we should include it. Otherwise, leave it for the |
| + // next round. |
|
iannucci
2015/11/13 07:16:38
what about weird stuff where the last bundle doesn
dnj
2015/11/14 00:30:36
"closed" means the input stream is closed. The bun
|
| + truncate = !(c.closed && int64(p.buf.Len()) == p.Len()) |
| + } |
| + |
| + if truncate { |
| + p.buf.Truncate(p.buf.Len() - 1) |
| + } |
| + } |
| + |
| + partial := (idx < 0) |
| + if !partial { |
| + lineCount++ |
| + } |
| + |
| + // If we didn't have a delimiter, make sure we don't terminate in the middle |
| + // of a UTF8 character. |
| + if partial { |
| + count := 0 |
| + lidx := -1 |
| + b := p.buf.Bytes() |
| + for len(b) > 0 { |
| + r, sz := utf8.DecodeRune(b) |
| + count += sz |
| + if r != utf8.RuneError { |
| + lidx = count |
| + } |
| + b = b[sz:] |
| + } |
| + if lidx < 0 { |
| + break |
| + } |
| + p.buf.Truncate(lidx) |
| + } |
| + |
| + txt.Lines = append(txt.Lines, &protocol.Text_Line{ |
| + Value: p.buf.String(), |
| + Delimiter: newline, |
| + }) |
| + p.Consume(int64(p.buf.Len() + len(newline))) |
| + limit -= int64(p.buf.Len() + len(newline)) |
| + } |
| + |
| + if len(txt.Lines) == 0 { |
| + return nil, nil |
| + } |
| + le := p.baseLogEntry(ts) |
| + le.Sequence = uint64(p.sequence) |
| + le.Content = &protocol.LogEntry_Text{Text: &txt} |
| + |
| + p.sequence += int64(lineCount) |
| + return le, nil |
| +} |