Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package chunkstream | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 "io" | |
| 11 ) | |
| 12 | |
| 13 // Reader is an io.Reader implementation bound to a view of a chunk Buffer. | |
| 14 // A Reader attempts to present the Buffer's ordered Chunk instances as a | |
| 15 // contiguous stream of bytes. | |
| 16 // | |
| 17 // A Reader is tightly coupled to the state of its Buffer. If data is Consumed | |
| 18 // from its source Buffer, the Reader becomes invalid, and any further accesses | |
| 19 // to its methods are undefined. | |
| 20 // | |
| 21 // In addition to an io.Reader and io.ByteReader interface, Reader offers a | |
| 22 // series of utility functions to efficiently traverse the underlying chunk | |
| 23 // space. | |
| 24 type Reader struct { | |
|
iannucci
2015/11/05 01:10:08
rename to View
dnj
2015/11/13 23:22:04
Done.
| |
| 25 cur *chunkNode | |
| 26 cidx int | |
|
iannucci
2015/11/05 01:10:08
wat r this? comment pls?
dnj
2015/11/13 23:22:04
Done.
| |
| 27 size int64 | |
| 28 consumed int64 | |
| 29 | |
| 30 // gen is the generation from which | |
| 31 b *Buffer | |
| 32 gen int64 | |
| 33 walkBuf bytes.Buffer | |
| 34 } | |
| 35 | |
| 36 var _ interface { | |
| 37 io.Reader | |
| 38 io.ByteReader | |
| 39 } = (*Reader)(nil) | |
| 40 | |
| 41 func (r *Reader) Read(b []byte) (int, error) { | |
| 42 total := int64(0) | |
| 43 err := error(nil) | |
| 44 for len(b) > 0 { | |
| 45 chunk := r.chunkBytes() | |
| 46 if len(chunk) == 0 { | |
| 47 err = io.EOF | |
| 48 break | |
| 49 } | |
| 50 | |
| 51 amount := copy(b, chunk) | |
| 52 total += int64(amount) | |
| 53 b = b[amount:] | |
| 54 r.Skip(int64(amount)) | |
| 55 } | |
| 56 if r.Remaining() == 0 { | |
| 57 err = io.EOF | |
| 58 } | |
| 59 return int(total), err | |
| 60 } | |
| 61 | |
| 62 // ReadByte implements io.ByteReader, reading a single byte from the buffer. | |
| 63 func (r *Reader) ReadByte() (byte, error) { | |
| 64 chunk := r.chunkBytes() | |
| 65 if len(chunk) == 0 { | |
| 66 return 0, io.EOF | |
| 67 } | |
| 68 r.Skip(1) | |
| 69 return chunk[0], nil | |
| 70 } | |
| 71 | |
| 72 // Remaining returns the number of bytes remaining in the Reader view. | |
| 73 func (r *Reader) Remaining() int64 { | |
| 74 return r.size | |
| 75 } | |
| 76 | |
| 77 // Consumed returns the number of bytes that have been skipped via Skip or | |
| 78 // higher-level calls. | |
| 79 func (r *Reader) Consumed() int64 { | |
| 80 return r.consumed | |
| 81 } | |
| 82 | |
| 83 // Skip advances the Reader's view forwards a fixed number of bytes. | |
| 84 func (r *Reader) Skip(count int64) int64 { | |
|
iannucci
2015/11/05 01:10:08
why return anything?
dnj
2015/11/13 23:22:04
No good reason.
| |
| 85 start := r.consumed | |
| 86 for count > 0 { | |
| 87 if r.cur == nil { | |
| 88 panic("cannot skip past end buffer") | |
| 89 } | |
| 90 | |
| 91 amount := r.chunkRemaining() | |
| 92 if count < int64(amount) { | |
| 93 amount = int(count) | |
| 94 r.cidx += amount | |
| 95 } else { | |
| 96 // Finished consuming this chunk, move on to the next. | |
| 97 r.cur = r.cur.next | |
| 98 r.cidx = 0 | |
| 99 } | |
| 100 | |
| 101 count -= int64(amount) | |
| 102 r.consumed += int64(amount) | |
| 103 r.size -= int64(amount) | |
| 104 } | |
| 105 return (r.consumed - start) | |
| 106 } | |
| 107 | |
| 108 // Index scans the Reader for the specified needle bytes. If they are | |
| 109 // found, their index in the Reader is returned. Otherwise, Index returns | |
| 110 // -1. | |
| 111 // | |
| 112 // The Reader is not modified during the search. | |
| 113 func (r *Reader) Index(needle []byte) int64 { | |
| 114 if r.Remaining() == 0 { | |
| 115 return -1 | |
| 116 } | |
| 117 if len(needle) == 0 { | |
| 118 return 0 | |
| 119 } | |
| 120 return r.Clone().indexDestructive(needle) | |
| 121 } | |
| 122 | |
| 123 // indexDestructive implements Index by actively mutating the Reader. | |
| 124 func (r *Reader) indexDestructive(needle []byte) int64 { | |
|
iannucci
2015/11/05 01:10:08
What about making this public like `SkipTo(needle
dnj
2015/11/13 23:22:04
I did clean the internal interface up a bit, but I
| |
| 125 tbuf := make([]byte, 2*len(needle)) | |
| 126 initialConsumed := r.consumed | |
| 127 idx := int(0) | |
| 128 for { | |
| 129 data := r.chunkBytes() | |
| 130 if len(data) == 0 { | |
| 131 return -1 | |
| 132 } | |
| 133 | |
| 134 // Scan the current chunk for needle. Note that if the current c hunk is too | |
| 135 // small to hold needle, this is a no-op. | |
| 136 if idx = bytes.Index(data, needle); idx >= 0 { | |
| 137 break | |
| 138 } | |
| 139 if len(data) > len(needle) { | |
| 140 // The needle is definitely not in this space. | |
| 141 r.Skip(int64(len(data) - len(needle))) | |
| 142 } | |
| 143 | |
| 144 // needle isn't in the current chunk; however, it may begin at t he end of | |
| 145 // the current chunk and complete in future chunks. | |
| 146 // | |
| 147 // We will scan a space twice the size of the needle, as otherwi se, this | |
| 148 // would end up scanning for one possibility, incrementing by on e, and | |
| 149 // repeating via 'for' loop iterations. | |
| 150 // | |
| 151 // Afterwards, we advance only the size of the needle, as we don 't want to | |
| 152 // preclude the needle starting after our last scan range. | |
| 153 // | |
| 154 // For example, to find needle "NDL": | |
| 155 // | |
| 156 // AAAAND|L|AAAA | |
| 157 // |------|^- [NDLAAA], 0 | |
| 158 // | |
| 159 // AAAAN|D|NDL|AAAA | |
| 160 // |------| [ANDNDL], 3 | |
| 161 // | |
| 162 // AAAA|A|A|NDL | |
| 163 // |-------| [AAAAND], -1, consume 3 => A|NDL| | |
| 164 // | |
| 165 // | |
| 166 // Note that we perform the read with a cloned Reader so we don' t | |
| 167 // actually consume this data. | |
| 168 pr := r.Clone() | |
| 169 amt, _ := pr.Read(tbuf) | |
| 170 if amt < len(needle) { | |
| 171 // All remaining buffers cannot hold the needle. | |
| 172 return -1 | |
| 173 } | |
| 174 | |
| 175 if idx = bytes.Index(tbuf[:amt], needle); idx >= 0 { | |
| 176 break | |
| 177 } | |
| 178 r.Skip(int64(len(needle))) | |
| 179 } | |
| 180 return r.consumed - initialConsumed + int64(idx) | |
| 181 } | |
| 182 | |
| 183 // Clone returns a copy of the Reader view. | |
| 184 // | |
| 185 // The clone is bound to the same underlying Buffer as the source. | |
| 186 func (r *Reader) Clone() *Reader { | |
| 187 return r.CloneLimit(r.size) | |
| 188 } | |
| 189 | |
| 190 // CloneLimit returns a copy of the Reader view, optionally truncating it. | |
| 191 // | |
| 192 // The clone is bound to the same underlying Buffer as the source. | |
| 193 func (r *Reader) CloneLimit(limit int64) *Reader { | |
| 194 c := *r | |
| 195 if c.size > limit { | |
| 196 c.size = limit | |
| 197 } | |
| 198 return &c | |
| 199 } | |
| 200 | |
| 201 func (r *Reader) chunkRemaining() int { | |
| 202 r.checkGen() | |
| 203 | |
| 204 if r.cur == nil { | |
| 205 return 0 | |
| 206 } | |
| 207 result := r.cur.Len() - r.cidx | |
| 208 if int64(result) > r.size { | |
| 209 result = int(r.size) | |
| 210 } | |
| 211 return result | |
| 212 } | |
| 213 | |
| 214 func (r *Reader) chunkBytes() []byte { | |
| 215 r.checkGen() | |
| 216 | |
| 217 if r.cur == nil { | |
| 218 return nil | |
| 219 } | |
| 220 data := r.cur.Bytes()[r.cidx:] | |
| 221 remaining := r.Remaining() | |
| 222 if int64(len(data)) > remaining { | |
| 223 data = data[:remaining] | |
| 224 } | |
| 225 return data | |
| 226 } | |
| 227 | |
| 228 func (r *Reader) checkGen() { | |
| 229 if r.b.gen != r.gen { | |
| 230 panic(fmt.Errorf("generation mismatch (%v != %v)", r.b.gen, r.ge n)) | |
| 231 } | |
| 232 } | |
| OLD | NEW |