Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(60)

Unified Diff: common/chunkstream/reader.go

Issue 1413923013: Add `chunk` segmented data library. (Closed) Base URL: https://github.com/luci/luci-go@logdog-review-streamserver
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/chunkstream/reader.go
diff --git a/common/chunkstream/reader.go b/common/chunkstream/reader.go
new file mode 100644
index 0000000000000000000000000000000000000000..633fe8f8b9e5feaf6369d05469c5e75391fe2a64
--- /dev/null
+++ b/common/chunkstream/reader.go
@@ -0,0 +1,232 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package chunkstream
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+)
+
+// Reader is an io.Reader implementation bound to a view of a chunk Buffer.
+// A Reader attempts to present the Buffer's ordered Chunk instances as a
+// contiguous stream of bytes.
+//
+// A Reader is tightly coupled to the state of its Buffer. If data is Consumed
+// from its source Buffer, the Reader becomes invalid, and any further accesses
+// to its methods are undefined.
+//
+// In addition to an io.Reader and io.ByteReader interface, Reader offers a
+// series of utility functions to efficiently traverse the underlying chunk
+// space.
+type Reader struct {
iannucci 2015/11/05 01:10:08 rename to View
dnj 2015/11/13 23:22:04 Done.
+ cur *chunkNode
+ cidx int
iannucci 2015/11/05 01:10:08 wat r this? comment pls?
dnj 2015/11/13 23:22:04 Done.
+ size int64
+ consumed int64
+
+ // gen is the generation from which
+ b *Buffer
+ gen int64
+ walkBuf bytes.Buffer
+}
+
+var _ interface {
+ io.Reader
+ io.ByteReader
+} = (*Reader)(nil)
+
+func (r *Reader) Read(b []byte) (int, error) {
+ total := int64(0)
+ err := error(nil)
+ for len(b) > 0 {
+ chunk := r.chunkBytes()
+ if len(chunk) == 0 {
+ err = io.EOF
+ break
+ }
+
+ amount := copy(b, chunk)
+ total += int64(amount)
+ b = b[amount:]
+ r.Skip(int64(amount))
+ }
+ if r.Remaining() == 0 {
+ err = io.EOF
+ }
+ return int(total), err
+}
+
+// ReadByte implements io.ByteReader, reading a single byte from the buffer.
+func (r *Reader) ReadByte() (byte, error) {
+ chunk := r.chunkBytes()
+ if len(chunk) == 0 {
+ return 0, io.EOF
+ }
+ r.Skip(1)
+ return chunk[0], nil
+}
+
+// Remaining returns the number of bytes remaining in the Reader view.
+func (r *Reader) Remaining() int64 {
+ return r.size
+}
+
+// Consumed returns the number of bytes that have been skipped via Skip or
+// higher-level calls.
+func (r *Reader) Consumed() int64 {
+ return r.consumed
+}
+
+// Skip advances the Reader's view forwards a fixed number of bytes.
+func (r *Reader) Skip(count int64) int64 {
iannucci 2015/11/05 01:10:08 why return anything?
dnj 2015/11/13 23:22:04 No good reason.
+ start := r.consumed
+ for count > 0 {
+ if r.cur == nil {
+ panic("cannot skip past end buffer")
+ }
+
+ amount := r.chunkRemaining()
+ if count < int64(amount) {
+ amount = int(count)
+ r.cidx += amount
+ } else {
+ // Finished consuming this chunk, move on to the next.
+ r.cur = r.cur.next
+ r.cidx = 0
+ }
+
+ count -= int64(amount)
+ r.consumed += int64(amount)
+ r.size -= int64(amount)
+ }
+ return (r.consumed - start)
+}
+
+// Index scans the Reader for the specified needle bytes. If they are
+// found, their index in the Reader is returned. Otherwise, Index returns
+// -1.
+//
+// The Reader is not modified during the search.
+func (r *Reader) Index(needle []byte) int64 {
+ if r.Remaining() == 0 {
+ return -1
+ }
+ if len(needle) == 0 {
+ return 0
+ }
+ return r.Clone().indexDestructive(needle)
+}
+
+// indexDestructive implements Index by actively mutating the Reader.
+func (r *Reader) indexDestructive(needle []byte) int64 {
iannucci 2015/11/05 01:10:08 What about making this public like `SkipTo(needle
dnj 2015/11/13 23:22:04 I did clean the internal interface up a bit, but I
+ tbuf := make([]byte, 2*len(needle))
+ initialConsumed := r.consumed
+ idx := int(0)
+ for {
+ data := r.chunkBytes()
+ if len(data) == 0 {
+ return -1
+ }
+
+ // Scan the current chunk for needle. Note that if the current chunk is too
+ // small to hold needle, this is a no-op.
+ if idx = bytes.Index(data, needle); idx >= 0 {
+ break
+ }
+ if len(data) > len(needle) {
+ // The needle is definitely not in this space.
+ r.Skip(int64(len(data) - len(needle)))
+ }
+
+ // needle isn't in the current chunk; however, it may begin at the end of
+ // the current chunk and complete in future chunks.
+ //
+ // We will scan a space twice the size of the needle, as otherwise, this
+ // would end up scanning for one possibility, incrementing by one, and
+ // repeating via 'for' loop iterations.
+ //
+ // Afterwards, we advance only the size of the needle, as we don't want to
+ // preclude the needle starting after our last scan range.
+ //
+ // For example, to find needle "NDL":
+ //
+ // AAAAND|L|AAAA
+ // |------|^- [NDLAAA], 0
+ //
+ // AAAAN|D|NDL|AAAA
+ // |------| [ANDNDL], 3
+ //
+ // AAAA|A|A|NDL
+ // |-------| [AAAAND], -1, consume 3 => A|NDL|
+ //
+ //
+ // Note that we perform the read with a cloned Reader so we don't
+ // actually consume this data.
+ pr := r.Clone()
+ amt, _ := pr.Read(tbuf)
+ if amt < len(needle) {
+ // All remaining buffers cannot hold the needle.
+ return -1
+ }
+
+ if idx = bytes.Index(tbuf[:amt], needle); idx >= 0 {
+ break
+ }
+ r.Skip(int64(len(needle)))
+ }
+ return r.consumed - initialConsumed + int64(idx)
+}
+
+// Clone returns a copy of the Reader view.
+//
+// The clone is bound to the same underlying Buffer as the source.
+func (r *Reader) Clone() *Reader {
+ return r.CloneLimit(r.size)
+}
+
+// CloneLimit returns a copy of the Reader view, optionally truncating it.
+//
+// The clone is bound to the same underlying Buffer as the source.
+func (r *Reader) CloneLimit(limit int64) *Reader {
+ c := *r
+ if c.size > limit {
+ c.size = limit
+ }
+ return &c
+}
+
+func (r *Reader) chunkRemaining() int {
+ r.checkGen()
+
+ if r.cur == nil {
+ return 0
+ }
+ result := r.cur.Len() - r.cidx
+ if int64(result) > r.size {
+ result = int(r.size)
+ }
+ return result
+}
+
+func (r *Reader) chunkBytes() []byte {
+ r.checkGen()
+
+ if r.cur == nil {
+ return nil
+ }
+ data := r.cur.Bytes()[r.cidx:]
+ remaining := r.Remaining()
+ if int64(len(data)) > remaining {
+ data = data[:remaining]
+ }
+ return data
+}
+
+func (r *Reader) checkGen() {
+ if r.b.gen != r.gen {
+ panic(fmt.Errorf("generation mismatch (%v != %v)", r.b.gen, r.gen))
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698