| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package collector |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "errors" |
| 10 "fmt" |
| 11 "sort" |
| 12 "strings" |
| 13 "sync" |
| 14 "time" |
| 15 |
| 16 "github.com/golang/protobuf/proto" |
| 17 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 19 "github.com/luci/luci-go/common/logdog/types" |
| 20 "github.com/luci/luci-go/common/proto/google" |
| 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator
" |
| 23 "github.com/luci/luci-go/server/logdog/storage" |
| 24 "golang.org/x/net/context" |
| 25 ) |
| 26 |
| 27 var testSecret = bytes.Repeat([]byte{0x55}, types.StreamSecretLength) |
| 28 |
| 29 // testCoordinator is an implementation of Coordinator that can be used for |
| 30 // testing. |
| 31 type testCoordinator struct { |
| 32 sync.Mutex |
| 33 |
| 34 // errC is a channel that error status will be read from if not nil. |
| 35 errC chan error |
| 36 |
| 37 // state is the latest tracked stream state. |
| 38 state map[string]*cc.LogStreamState |
| 39 } |
| 40 |
| 41 var _ cc.Coordinator = (*testCoordinator)(nil) |
| 42 |
| 43 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { |
| 44 c.Lock() |
| 45 defer c.Unlock() |
| 46 |
| 47 // Update our state. |
| 48 if c.state == nil { |
| 49 c.state = make(map[string]*cc.LogStreamState) |
| 50 } |
| 51 if sp := c.state[string(s.Path)]; sp != nil { |
| 52 return *sp |
| 53 } |
| 54 c.state[string(s.Path)] = &s |
| 55 return s |
| 56 } |
| 57 |
| 58 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta
te, d *logpb.LogStreamDescriptor) ( |
| 59 *cc.LogStreamState, error) { |
| 60 if err := c.enter(); err != nil { |
| 61 return nil, err |
| 62 } |
| 63 |
| 64 // Enforce that the submitted stream is not terminal. |
| 65 rs := *s |
| 66 rs.TerminalIndex = -1 |
| 67 |
| 68 // Update our state. |
| 69 sp := c.register(rs) |
| 70 return &sp, nil |
| 71 } |
| 72 |
| 73 func (c *testCoordinator) TerminateStream(ctx context.Context, st *cc.LogStreamS
tate) error { |
| 74 if err := c.enter(); err != nil { |
| 75 return err |
| 76 } |
| 77 |
| 78 if st.TerminalIndex < 0 { |
| 79 return errors.New("submitted stream is not terminal") |
| 80 } |
| 81 |
| 82 c.Lock() |
| 83 defer c.Unlock() |
| 84 |
| 85 // Update our state. |
| 86 cachedState, ok := c.state[string(st.Path)] |
| 87 if !ok { |
| 88 return fmt.Errorf("no such stream: %s", st.Path) |
| 89 } |
| 90 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter
minalIndex { |
| 91 return fmt.Errorf("incompatible terminal indexes: %d != %d", st.
TerminalIndex, cachedState.TerminalIndex) |
| 92 } |
| 93 |
| 94 cachedState.TerminalIndex = st.TerminalIndex |
| 95 return nil |
| 96 } |
| 97 |
| 98 // enter is an entry point for client goroutines. It offers the opportunity |
| 99 // to trap executing goroutines within client calls. |
| 100 // |
| 101 // This must NOT be called while the lock is held, else it could lead to |
| 102 // deadlock if multiple goroutines are trapped. |
| 103 func (c *testCoordinator) enter() error { |
| 104 if c.errC != nil { |
| 105 return <-c.errC |
| 106 } |
| 107 return nil |
| 108 } |
| 109 |
| 110 func (c *testCoordinator) stream(name string) (int, bool) { |
| 111 c.Lock() |
| 112 defer c.Unlock() |
| 113 |
| 114 sp, ok := c.state[name] |
| 115 if !ok { |
| 116 return 0, false |
| 117 } |
| 118 return int(sp.TerminalIndex), true |
| 119 } |
| 120 |
| 121 // testStorage is a testing storage instance that returns errors. |
| 122 type testStorage struct { |
| 123 storage.Storage |
| 124 err func() error |
| 125 } |
| 126 |
| 127 func (s *testStorage) Put(r *storage.PutRequest) error { |
| 128 if s.err != nil { |
| 129 if err := s.err(); err != nil { |
| 130 return err |
| 131 } |
| 132 } |
| 133 return s.Storage.Put(r) |
| 134 } |
| 135 |
| 136 // bundleBuilder is a set of utility functions to help test cases construct |
| 137 // specific logpb.ButlerLogBundle layouts. |
| 138 type bundleBuilder struct { |
| 139 context.Context |
| 140 |
| 141 base time.Time |
| 142 entries []*logpb.ButlerLogBundle_Entry |
| 143 } |
| 144 |
| 145 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
| 146 if b.base.IsZero() { |
| 147 b.base = clock.Now(b) |
| 148 } |
| 149 |
| 150 b.entries = append(b.entries, be) |
| 151 } |
| 152 |
| 153 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp
b.ButlerLogBundle_Entry { |
| 154 p, n := types.StreamPath(name).Split() |
| 155 be := logpb.ButlerLogBundle_Entry{ |
| 156 Secret: testSecret, |
| 157 Desc: &logpb.LogStreamDescriptor{ |
| 158 Prefix: string(p), |
| 159 Name: string(n), |
| 160 ContentType: "application/test-message", |
| 161 StreamType: logpb.StreamType_TEXT, |
| 162 Timestamp: google.NewTimestamp(clock.Now(b)), |
| 163 }, |
| 164 } |
| 165 |
| 166 if len(idxs) > 0 { |
| 167 be.Logs = make([]*logpb.LogEntry, len(idxs)) |
| 168 for i, idx := range idxs { |
| 169 be.Logs[i] = b.logEntry(idx) |
| 170 } |
| 171 if tidx >= 0 { |
| 172 be.Terminal = true |
| 173 be.TerminalIndex = uint64(tidx) |
| 174 } |
| 175 } |
| 176 |
| 177 return &be |
| 178 } |
| 179 |
| 180 func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) { |
| 181 b.addBundleEntry(b.genBundleEntry(name, term, idxs...)) |
| 182 } |
| 183 |
| 184 func (b *bundleBuilder) addFullStream(name string, count int) { |
| 185 idxs := make([]int, count) |
| 186 for i := range idxs { |
| 187 idxs[i] = i |
| 188 } |
| 189 b.addStreamEntries(name, count-1, idxs...) |
| 190 } |
| 191 |
| 192 func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry { |
| 193 return &logpb.LogEntry{ |
| 194 StreamIndex: uint64(idx), |
| 195 Sequence: uint64(idx), |
| 196 Content: &logpb.LogEntry_Text{ |
| 197 Text: &logpb.Text{ |
| 198 Lines: []*logpb.Text_Line{ |
| 199 { |
| 200 Value: fmt.Sprintf("Line #%d
", idx), |
| 201 Delimiter: "\n", |
| 202 }, |
| 203 }, |
| 204 }, |
| 205 }, |
| 206 } |
| 207 } |
| 208 |
| 209 func (b *bundleBuilder) bundle() []byte { |
| 210 bytes := b.bundleWithEntries(b.entries...) |
| 211 b.entries = nil |
| 212 return bytes |
| 213 } |
| 214 |
| 215 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b
yte { |
| 216 bundle := logpb.ButlerLogBundle{ |
| 217 Source: "test stream", |
| 218 Timestamp: google.NewTimestamp(clock.Now(b)), |
| 219 Entries: e, |
| 220 } |
| 221 |
| 222 buf := bytes.Buffer{} |
| 223 w := butlerproto.Writer{Compress: true} |
| 224 if err := w.Write(&buf, &bundle); err != nil { |
| 225 panic(err) |
| 226 } |
| 227 return buf.Bytes() |
| 228 } |
| 229 |
| 230 type indexRange struct { |
| 231 start int |
| 232 end int |
| 233 } |
| 234 |
| 235 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
.end) } |
| 236 |
| 237 // shouldHaveRegisteredStream asserts that a testCoordinator has |
| 238 // registered a stream (string) and its terminal index (int). |
| 239 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
ing { |
| 240 tcc := actual.(*testCoordinator) |
| 241 name := expected[0].(string) |
| 242 tidx := expected[1].(int) |
| 243 |
| 244 cur, ok := tcc.stream(name) |
| 245 if !ok { |
| 246 return fmt.Sprintf("stream %q is not registered", name) |
| 247 } |
| 248 if tidx >= 0 && cur < 0 { |
| 249 return fmt.Sprintf("stream %q is expected to be terminated, but
isn't.", name) |
| 250 } |
| 251 if cur >= 0 && tidx < 0 { |
| 252 return fmt.Sprintf("stream %q is NOT expected to be terminated,
but it is.", name) |
| 253 } |
| 254 return "" |
| 255 } |
| 256 |
| 257 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not |
| 258 // registered a stream (string). |
| 259 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
string { |
| 260 tcc := actual.(*testCoordinator) |
| 261 name := expected[0].(string) |
| 262 |
| 263 if _, ok := tcc.stream(name); ok { |
| 264 return fmt.Sprintf("stream %q is registered, but it should NOT b
e.", name) |
| 265 } |
| 266 return "" |
| 267 } |
| 268 |
| 269 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous |
| 270 // stream records in it. |
| 271 // |
| 272 // actual is the storage.Storage instance. expected is a stream name (string) |
| 273 // followed by a a series of records to assert. This can either be a specific |
| 274 // integer index or an intexRange marking a closed range of indices. |
| 275 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
{ |
| 276 st := actual.(storage.Storage) |
| 277 name := expected[0].(string) |
| 278 |
| 279 // Load all entries for this stream. |
| 280 req := storage.GetRequest{ |
| 281 Path: types.StreamPath(name), |
| 282 } |
| 283 |
| 284 entries := make(map[int]*logpb.LogEntry) |
| 285 var ierr error |
| 286 err := st.Get(&req, func(idx types.MessageIndex, d []byte) bool { |
| 287 le := logpb.LogEntry{} |
| 288 if ierr = proto.Unmarshal(d, &le); ierr != nil { |
| 289 return false |
| 290 } |
| 291 entries[int(idx)] = &le |
| 292 return true |
| 293 }) |
| 294 if ierr != nil { |
| 295 err = ierr |
| 296 } |
| 297 if err != nil && err != storage.ErrDoesNotExist { |
| 298 return fmt.Sprintf("error: %v", err) |
| 299 } |
| 300 |
| 301 assertLogEntry := func(i int) string { |
| 302 le := entries[i] |
| 303 if le == nil { |
| 304 return fmt.Sprintf("%d", i) |
| 305 } |
| 306 delete(entries, i) |
| 307 |
| 308 if le.StreamIndex != uint64(i) { |
| 309 return fmt.Sprintf("*%d", i) |
| 310 } |
| 311 return "" |
| 312 } |
| 313 |
| 314 var failed []string |
| 315 for _, exp := range expected[1:] { |
| 316 switch e := exp.(type) { |
| 317 case int: |
| 318 if err := assertLogEntry(e); err != "" { |
| 319 failed = append(failed, err) |
| 320 } |
| 321 |
| 322 case indexRange: |
| 323 var errs []string |
| 324 for i := e.start; i <= e.end; i++ { |
| 325 if err := assertLogEntry(i); err != "" { |
| 326 errs = append(errs, err) |
| 327 } |
| 328 } |
| 329 if len(errs) > 0 { |
| 330 failed = append(failed, fmt.Sprintf("%s{%s}", e.
String(), strings.Join(errs, ","))) |
| 331 } |
| 332 |
| 333 default: |
| 334 panic(fmt.Errorf("unknown expected type %T", e)) |
| 335 } |
| 336 } |
| 337 |
| 338 // Extras? |
| 339 if len(entries) > 0 { |
| 340 idxs := make([]int, 0, len(entries)) |
| 341 for i := range entries { |
| 342 idxs = append(idxs, i) |
| 343 } |
| 344 sort.Ints(idxs) |
| 345 |
| 346 extra := make([]string, len(idxs)) |
| 347 for i, idx := range idxs { |
| 348 extra[i] = fmt.Sprintf("%d", idx) |
| 349 } |
| 350 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex
tra, ","))) |
| 351 } |
| 352 |
| 353 if len(failed) > 0 { |
| 354 return strings.Join(failed, ", ") |
| 355 } |
| 356 return "" |
| 357 } |
| OLD | NEW |