| Index: server/internal/logdog/collector/collector.go
|
| diff --git a/server/internal/logdog/collector/collector.go b/server/internal/logdog/collector/collector.go
|
| index 90d055278688ecebb61f36fd5f52fd5b9a23a7ad..8884c9550ed1042b3e56556d7a85ec68747e181e 100644
|
| --- a/server/internal/logdog/collector/collector.go
|
| +++ b/server/internal/logdog/collector/collector.go
|
| @@ -163,6 +163,17 @@ type bundleEntryHandler struct {
|
| // processLogStream processes an individual set of log messages belonging to the
|
| // same log stream.
|
| func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error {
|
| + // If this bundle has neither log entries nor a terminal index, it is junk and
|
| + // must be discarded.
|
| + //
|
| + // This is more important than a basic optimization, as it enforces that no
|
| + // zero-entry log streams can be ingested. Either some entries exist, or there
|
| + // is a promise of a terminal entry.
|
| + if len(h.be.Logs) == 0 && !h.be.Terminal {
|
| + log.Warningf(ctx, "Bundle entry is non-terminal and contains no logs; discarding.")
|
| + return nil
|
| + }
|
| +
|
| if err := h.be.Desc.Validate(true); err != nil {
|
| log.Errorf(log.SetError(ctx, err), "Invalid log stream descriptor.")
|
| return err
|
|
|