| Index: server/cmd/logdog_collector/main.go
|
| diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
|
| index 9ea89b23c4264bf9b6f7acef50d9925341352c25..683e8dcbab0a5c01dec94b1caf3f1e987286b6c7 100644
|
| --- a/server/cmd/logdog_collector/main.go
|
| +++ b/server/cmd/logdog_collector/main.go
|
| @@ -6,6 +6,7 @@ package main
|
|
|
| import (
|
| "fmt"
|
| + "io"
|
| "time"
|
|
|
| "github.com/luci/luci-go/common/auth"
|
| @@ -97,11 +98,6 @@ func (a *application) runCollector(c context.Context) error {
|
| }
|
| defer st.Close()
|
|
|
| - // Application shutdown will now operate by cancelling the Collector's
|
| - // shutdown Context.
|
| - shutdownCtx, shutdownFunc := context.WithCancel(c)
|
| - a.SetShutdownFunc(shutdownFunc)
|
| -
|
| // Initialize our Collector service object using a caching Coordinator
|
| // interface.
|
| coord := coordinator.NewCoordinator(a.Coordinator())
|
| @@ -121,29 +117,33 @@ func (a *application) runCollector(c context.Context) error {
|
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.")
|
| return err
|
| }
|
| - defer func() {
|
| - log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...")
|
| - psIterator.Stop()
|
| - log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
|
| - }()
|
| + defer psIterator.Stop()
|
| +
|
| + // Application shutdown will now operate by cancelling the Collector's
|
| + // shutdown Context.
|
| + a.SetShutdownFunc(psIterator.Stop)
|
|
|
| parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
|
| // Loop until shut down.
|
| - for shutdownCtx.Err() == nil {
|
| + for {
|
| msg, err := psIterator.Next()
|
| - if err != nil {
|
| + switch err {
|
| + case nil:
|
| + taskC <- func() error {
|
| + c := log.SetField(c, "messageID", msg.ID)
|
| + msg.Done(a.processMessage(c, &coll, msg))
|
| + return nil
|
| + }
|
| +
|
| + case io.EOF, context.Canceled, context.DeadlineExceeded:
|
| + return
|
| +
|
| + default:
|
| log.Fields{
|
| log.ErrorKey: err,
|
| "delay": pubsubPullErrorDelay,
|
| }.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...")
|
| clock.Sleep(c, pubsubPullErrorDelay)
|
| - continue
|
| - }
|
| -
|
| - taskC <- func() error {
|
| - c := log.SetField(c, "messageID", msg.ID)
|
| - msg.Done(a.processMessage(c, &coll, msg))
|
| - return nil
|
| }
|
| }
|
| }))
|
|
|