Chromium Code Reviews| Index: common/chunkstream/reader.go |
| diff --git a/common/chunkstream/reader.go b/common/chunkstream/reader.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..633fe8f8b9e5feaf6369d05469c5e75391fe2a64 |
| --- /dev/null |
| +++ b/common/chunkstream/reader.go |
| @@ -0,0 +1,232 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package chunkstream |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "io" |
| +) |
| + |
| +// Reader is an io.Reader implementation bound to a view of a chunk Buffer. |
| +// A Reader attempts to present the Buffer's ordered Chunk instances as a |
| +// contiguous stream of bytes. |
| +// |
| +// A Reader is tightly coupled to the state of its Buffer. If data is Consumed |
| +// from its source Buffer, the Reader becomes invalid, and any further accesses |
| +// to its methods are undefined. |
| +// |
| +// In addition to an io.Reader and io.ByteReader interface, Reader offers a |
| +// series of utility functions to efficiently traverse the underlying chunk |
| +// space. |
| +type Reader struct { |
|
iannucci
2015/11/05 01:10:08
rename to View
dnj
2015/11/13 23:22:04
Done.
|
| + cur *chunkNode |
| + cidx int |
|
iannucci
2015/11/05 01:10:08
wat r this? comment pls?
dnj
2015/11/13 23:22:04
Done.
|
| + size int64 |
| + consumed int64 |
| + |
| + // gen is the generation from which |
| + b *Buffer |
| + gen int64 |
| + walkBuf bytes.Buffer |
| +} |
| + |
| +var _ interface { |
| + io.Reader |
| + io.ByteReader |
| +} = (*Reader)(nil) |
| + |
| +func (r *Reader) Read(b []byte) (int, error) { |
| + total := int64(0) |
| + err := error(nil) |
| + for len(b) > 0 { |
| + chunk := r.chunkBytes() |
| + if len(chunk) == 0 { |
| + err = io.EOF |
| + break |
| + } |
| + |
| + amount := copy(b, chunk) |
| + total += int64(amount) |
| + b = b[amount:] |
| + r.Skip(int64(amount)) |
| + } |
| + if r.Remaining() == 0 { |
| + err = io.EOF |
| + } |
| + return int(total), err |
| +} |
| + |
| +// ReadByte implements io.ByteReader, reading a single byte from the buffer. |
| +func (r *Reader) ReadByte() (byte, error) { |
| + chunk := r.chunkBytes() |
| + if len(chunk) == 0 { |
| + return 0, io.EOF |
| + } |
| + r.Skip(1) |
| + return chunk[0], nil |
| +} |
| + |
| +// Remaining returns the number of bytes remaining in the Reader view. |
| +func (r *Reader) Remaining() int64 { |
| + return r.size |
| +} |
| + |
| +// Consumed returns the number of bytes that have been skipped via Skip or |
| +// higher-level calls. |
| +func (r *Reader) Consumed() int64 { |
| + return r.consumed |
| +} |
| + |
| +// Skip advances the Reader's view forwards a fixed number of bytes. |
| +func (r *Reader) Skip(count int64) int64 { |
|
iannucci
2015/11/05 01:10:08
why return anything?
dnj
2015/11/13 23:22:04
No good reason.
|
| + start := r.consumed |
| + for count > 0 { |
| + if r.cur == nil { |
| + panic("cannot skip past end buffer") |
| + } |
| + |
| + amount := r.chunkRemaining() |
| + if count < int64(amount) { |
| + amount = int(count) |
| + r.cidx += amount |
| + } else { |
| + // Finished consuming this chunk, move on to the next. |
| + r.cur = r.cur.next |
| + r.cidx = 0 |
| + } |
| + |
| + count -= int64(amount) |
| + r.consumed += int64(amount) |
| + r.size -= int64(amount) |
| + } |
| + return (r.consumed - start) |
| +} |
| + |
| +// Index scans the Reader for the specified needle bytes. If they are |
| +// found, their index in the Reader is returned. Otherwise, Index returns |
| +// -1. |
| +// |
| +// The Reader is not modified during the search. |
| +func (r *Reader) Index(needle []byte) int64 { |
| + if r.Remaining() == 0 { |
| + return -1 |
| + } |
| + if len(needle) == 0 { |
| + return 0 |
| + } |
| + return r.Clone().indexDestructive(needle) |
| +} |
| + |
| +// indexDestructive implements Index by actively mutating the Reader. |
| +func (r *Reader) indexDestructive(needle []byte) int64 { |
|
iannucci
2015/11/05 01:10:08
What about making this public like `SkipTo(needle
dnj
2015/11/13 23:22:04
I did clean the internal interface up a bit, but I
|
| + tbuf := make([]byte, 2*len(needle)) |
| + initialConsumed := r.consumed |
| + idx := int(0) |
| + for { |
| + data := r.chunkBytes() |
| + if len(data) == 0 { |
| + return -1 |
| + } |
| + |
| + // Scan the current chunk for needle. Note that if the current chunk is too |
| + // small to hold needle, this is a no-op. |
| + if idx = bytes.Index(data, needle); idx >= 0 { |
| + break |
| + } |
| + if len(data) > len(needle) { |
| + // The needle is definitely not in this space. |
| + r.Skip(int64(len(data) - len(needle))) |
| + } |
| + |
| + // needle isn't in the current chunk; however, it may begin at the end of |
| + // the current chunk and complete in future chunks. |
| + // |
| + // We will scan a space twice the size of the needle, as otherwise, this |
| + // would end up scanning for one possibility, incrementing by one, and |
| + // repeating via 'for' loop iterations. |
| + // |
| + // Afterwards, we advance only the size of the needle, as we don't want to |
| + // preclude the needle starting after our last scan range. |
| + // |
| + // For example, to find needle "NDL": |
| + // |
| + // AAAAND|L|AAAA |
| + // |------|^- [NDLAAA], 0 |
| + // |
| + // AAAAN|D|NDL|AAAA |
| + // |------| [ANDNDL], 3 |
| + // |
| + // AAAA|A|A|NDL |
| + // |-------| [AAAAND], -1, consume 3 => A|NDL| |
| + // |
| + // |
| + // Note that we perform the read with a cloned Reader so we don't |
| + // actually consume this data. |
| + pr := r.Clone() |
| + amt, _ := pr.Read(tbuf) |
| + if amt < len(needle) { |
| + // All remaining buffers cannot hold the needle. |
| + return -1 |
| + } |
| + |
| + if idx = bytes.Index(tbuf[:amt], needle); idx >= 0 { |
| + break |
| + } |
| + r.Skip(int64(len(needle))) |
| + } |
| + return r.consumed - initialConsumed + int64(idx) |
| +} |
| + |
| +// Clone returns a copy of the Reader view. |
| +// |
| +// The clone is bound to the same underlying Buffer as the source. |
| +func (r *Reader) Clone() *Reader { |
| + return r.CloneLimit(r.size) |
| +} |
| + |
| +// CloneLimit returns a copy of the Reader view, optionally truncating it. |
| +// |
| +// The clone is bound to the same underlying Buffer as the source. |
| +func (r *Reader) CloneLimit(limit int64) *Reader { |
| + c := *r |
| + if c.size > limit { |
| + c.size = limit |
| + } |
| + return &c |
| +} |
| + |
| +func (r *Reader) chunkRemaining() int { |
| + r.checkGen() |
| + |
| + if r.cur == nil { |
| + return 0 |
| + } |
| + result := r.cur.Len() - r.cidx |
| + if int64(result) > r.size { |
| + result = int(r.size) |
| + } |
| + return result |
| +} |
| + |
| +func (r *Reader) chunkBytes() []byte { |
| + r.checkGen() |
| + |
| + if r.cur == nil { |
| + return nil |
| + } |
| + data := r.cur.Bytes()[r.cidx:] |
| + remaining := r.Remaining() |
| + if int64(len(data)) > remaining { |
| + data = data[:remaining] |
| + } |
| + return data |
| +} |
| + |
| +func (r *Reader) checkGen() { |
| + if r.b.gen != r.gen { |
| + panic(fmt.Errorf("generation mismatch (%v != %v)", r.b.gen, r.gen)) |
| + } |
| +} |