Chromium Code Reviews| Index: server/internal/logdog/collector/collector.go |
| diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9f0f2ed0d7dc20e1286838d8a9b24fbe572abde9 |
| --- /dev/null |
| +++ b/server/internal/logdog/collector/collector.go |
| @@ -0,0 +1,308 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package collector |
| + |
| +import ( |
| + "bytes" |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logdog/butlerproto" |
| + "github.com/luci/luci-go/common/logdog/types" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/parallel" |
| + "github.com/luci/luci-go/common/proto/logdog/logpb" |
| + cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient" |
| + "github.com/luci/luci-go/server/logdog/storage" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +const ( |
| + // DefaultStreamStateCacheExpire is the default expiration value. |
| + DefaultStreamStateCacheExpire = 10 * time.Minute |
| +) |
| + |
| +// Options is the set of options to use for collection. |
| +type Options struct { |
| + // Coordinator is the coordinator service. |
| + Coordinator CoordinatorClient |
| + // Storage is the backing store to use. |
| + Storage storage.Storage |
| + // Sem is a semaphore used to throttle the number of simultaneous ingest |
| + // tasks that are executed. |
| + Sem parallel.Semaphore |
| + |
| + // StreamStateCacheExpire is the maximum amount of time that a cahced stream |
|
martiniss
2016/01/27 22:19:43
typo
dnj
2016/01/29 20:46:51
Done.
|
| + // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used. |
| + StreamStateCacheExpire time.Duration |
| +} |
| + |
| +// Collector is a stateful collection service. |
| +type Collector interface { |
| + // Process ingests an encoded ButlerLogBundle message. It is goroutine-safe, |
|
martiniss
2016/01/27 22:19:43
What does ingest mean? Maybe I just don't know the
dnj
2016/01/29 20:46:51
Ingest as in like, eating something.
|
| + // but may throttle based on the configured semaphore. |
| + // |
| + // If a transient error occurs during ingest, Process will return an |
| + // errors.Transient error. |
| + Process(context.Context, []byte) error |
| +} |
| + |
| +// collector is an implementation of Collector. |
| +type collector struct { |
| + *Options |
| + |
| + // streamState is the stream state lookup and caching. |
| + streamState *streamStateCache |
| +} |
| + |
| +// New instantiates a new Collector instance. |
| +func New(o Options) Collector { |
| + return &collector{ |
| + Options: &o, |
| + streamState: newStreamStateCache(streamStateCacheOptions{ |
| + coordinator: o.Coordinator, |
| + expiration: o.StreamStateCacheExpire, |
| + }), |
| + } |
| +} |
| + |
| +// Process ingests an encoded ButlerLogBundle message. |
|
martiniss
2016/01/27 22:19:43
Why not just say it implements the interface?
dnj
2016/01/29 20:46:52
Done.
|
| +// |
| +// If a transient error occurs during ingest, Process will return an error. |
| +// If no error occurred, or if there was an error with the input data, no error |
| +// will be returned. |
| +func (c *collector) Process(ctx context.Context, msg []byte) error { |
| + pr := butlerproto.Reader{} |
| + if err := pr.Read(bytes.NewReader(msg)); err != nil { |
| + log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") |
| + return nil |
| + } |
| + if pr.Metadata.ProtoVersion != logpb.Version { |
| + log.Fields{ |
| + "messageProtoVersion": pr.Metadata.ProtoVersion, |
| + "currentProtoVersion": logpb.Version, |
| + }.Errorf(ctx, "Unknown protobuf version.") |
| + return nil |
| + } |
| + if pr.Bundle == nil { |
| + log.Errorf(ctx, "Protocol message did not contain a Butler bundle.") |
| + return nil |
| + } |
| + |
| + // Handle each individual stream in parallel. |
|
martiniss
2016/01/27 22:19:43
This doesn't seem related to the next line of code
iannucci
2016/01/28 01:15:35
duplicate comment w/ below?
dnj
2016/01/29 20:46:51
Done.
|
| + if len(pr.Bundle.Entries) == 0 { |
| + return nil |
| + } |
| + |
| + lw := logWork{ |
| + md: pr.Metadata, |
| + b: pr.Bundle, |
| + } |
| + |
| + // Handle each bundle entry in parallel. We will use a separate work pool |
| + // here so that top-level bundle dispatch can't deadlock the processing tasks. |
| + // |
| + // If we don't have a semaphore, this will also be unbounded, since cap(nil) |
| + // is 0. |
| + err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) { |
| + for _, be := range pr.Bundle.Entries { |
| + lw := lw |
| + lw.be = be |
| + taskC <- func() error { |
| + return c.processLogStream(ctx, &lw) |
| + } |
| + } |
| + }) |
| + if err != nil { |
| + if hasTransientError(err) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + }.Warningf(ctx, "Transient error encountered during processing.") |
| + return err |
| + } |
| + |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + }.Errorf(ctx, "Non-transient error encountered during processing; discarding.") |
| + } |
| + return nil |
|
martiniss
2016/01/27 22:19:43
wait why don't we return an error if there was one
dnj
2016/01/29 20:46:52
I actually mixed transient and non-transient code
|
| +} |
| + |
| +// logWork is a cumulative set of read-only state passed around by value for log |
| +// processing. |
| +type logWork struct { |
| + // md is the metadata associated with the overall message. |
| + md *logpb.ButlerMetadata |
| + // b is the Butler bundle. |
| + b *logpb.ButlerLogBundle |
| + // be is the Bundle entry. |
| + be *logpb.ButlerLogBundle_Entry |
| + // path is the constructed path of the stream being processed. |
| + path types.StreamPath |
| + // le is the LogEntry in the bundle entry. |
| + le *logpb.LogEntry |
| +} |
| + |
| +// processLogStream processes an individual set of log messages belonging to the |
| +// same log stream. |
| +func (c *collector) processLogStream(ctx context.Context, lw *logWork) error { |
| + if err := lw.be.Desc.Validate(true); err != nil { |
| + log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.") |
| + return nil |
| + } |
| + lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.be.Desc.Name)) |
| + ctx = log.SetField(ctx, "path", lw.path) |
| + |
| + if len(lw.be.Secret) == 0 { |
| + log.Errorf(ctx, "Missing secret.") |
| + return nil |
| + } |
| + |
| + // Fetch our cached/remote state. This will replace our state object with the |
| + // fetched state, so any future calls will need to re-set the Secret value. |
| + // TODO: Use timeout? |
| + state, err := c.streamState.getOrRegister(ctx, &cc.State{ |
| + Path: lw.path, |
| + Secret: types.StreamSecret(lw.be.Secret), |
| + ProtoVersion: lw.md.ProtoVersion, |
| + Descriptor: lw.be.Desc, |
| + }) |
| + if err != nil { |
| + log.WithError(err).Errorf(ctx, "Failed to get/register current stream state.") |
| + return err |
| + } |
| + |
| + // Does the log stream's secret match the expected secret? |
| + if !bytes.Equal(lw.be.Secret, []byte(state.secret)) { |
| + log.Errorf(log.SetFields(ctx, log.Fields{ |
| + "secret": lw.be.Secret, |
| + "expectedSecret": state.secret, |
| + }), "Log entry has incorrect secret.") |
| + return nil |
| + } |
| + |
| + if state.archived { |
| + log.Infof(ctx, "Skipping message bundle for archived stream.") |
| + return nil |
| + } |
| + if state.purged { |
| + log.Infof(ctx, "Skipping message bundle for purged stream.") |
| + return nil |
| + } |
| + |
| + // Update our terminal index if we have one. |
| + // |
| + // Note that even if our cached value is marked terminal, we could have failed |
| + // to push the terminal index to the Coordinator, so we will not refrain from |
| + // pushing every terminal index encountered regardless of cache state. |
| + if lw.be.Terminal { |
| + tidx := types.MessageIndex(lw.be.TerminalIndex) |
| + log.Fields{ |
| + "value": tidx, |
| + }.Debugf(ctx, "Bundle includes a terminal index.") |
| + |
| + if state.terminalIndex < 0 { |
| + state.terminalIndex = tidx |
| + } else if state.terminalIndex != tidx { |
| + log.Fields{ |
| + "cachedIndex": state.terminalIndex, |
| + "bundleIndex": tidx, |
| + }.Warningf(ctx, "Cached terminal index disagrees with state.") |
| + } |
| + } |
| + |
| + // In parallel, load the log entries into Storage. Throttle this with our |
| + // ingest semaphore. |
| + return parallel.Run(c.Sem, func(taskC chan<- func() error) { |
| + for i, le := range lw.be.Logs { |
| + i, le := i, le |
| + |
| + // Store this LogEntry |
| + taskC <- func() error { |
| + if err := le.Validate(lw.be.Desc); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "index": i, |
| + }.Warningf(ctx, "Discarding invalid log entry.") |
| + return nil |
| + } |
| + |
| + if state.terminalIndex >= 0 && types.MessageIndex(le.StreamIndex) > state.terminalIndex { |
| + log.Fields{ |
| + "index": le.StreamIndex, |
| + "terminalIndex": state.terminalIndex, |
| + }.Warningf(ctx, "Stream is terminated before log entry; discarding.") |
| + return nil |
| + } |
| + |
| + lw := *lw |
| + lw.le = le |
| + return c.processLogEntry(ctx, &lw) |
| + } |
| + } |
| + |
| + // If our bundle entry is terminal, we have an additional task of reporting |
| + // this to the Coordinator. |
| + if lw.be.Terminal { |
| + taskC <- func() error { |
| + // Sentinel task: Update the terminal bundle state. |
| + state := *state |
| + state.terminalIndex = types.MessageIndex(lw.be.TerminalIndex) |
| + |
| + log.Fields{ |
| + "terminalIndex": state.terminalIndex, |
| + }.Infof(ctx, "Received terminal log; updating Coordinator state.") |
| + state.secret = types.StreamSecret(lw.be.Secret) |
| + |
| + if err := c.streamState.setTerminalIndex(ctx, &state); err != nil { |
| + log.WithError(err).Errorf(ctx, "Failed to set stream terminal index.") |
| + return err |
| + } |
| + return nil |
| + } |
| + } |
| + }) |
| +} |
| + |
| +func (c *collector) processLogEntry(ctx context.Context, lw *logWork) error { |
| + data, err := proto.Marshal(lw.le) |
| + if err != nil { |
| + log.WithError(err).Errorf(ctx, "Failed to marshal log entry.") |
| + return err |
| + } |
| + |
| + // Post the log to storage. |
| + err = c.Storage.Put(&storage.PutRequest{ |
| + Path: lw.path, |
| + Index: types.MessageIndex(lw.le.StreamIndex), |
| + Value: data, |
| + }) |
| + // If the log entry already exists, consider the "put" successful. |
| + if err != nil && err != storage.ErrExists { |
| + log.WithError(err).Errorf(ctx, "Failed to load log entry into Storage.") |
| + return err |
| + } |
| + return nil |
| +} |
| + |
| +// wrapMultiErrorTransient wraps an error in a TransientError wrapper. |
| +// |
| +// If the error is nil, it will return nil. If the error is already transient, |
| +// it will be directly returned. If the error is a MultiError, its sub-errors |
| +// will be evaluated and wrapped in a TransientError if any of its sub-errors |
| +// are transient errors. |
| +func hasTransientError(err error) bool { |
| + if merr, ok := err.(errors.MultiError); ok { |
| + for _, e := range merr { |
| + if hasTransientError(e) { |
| + return true |
| + } |
| + } |
| + return false |
| + } |
| + |
| + return errors.IsTransient(err) |
| +} |