Chromium Code Reviews| Index: server/cmd/logdog_collector/main.go |
| diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go |
| index 9ea89b23c4264bf9b6f7acef50d9925341352c25..73769e7bc460ee08c9e0bccd711fde687f5a342b 100644 |
| --- a/server/cmd/logdog_collector/main.go |
| +++ b/server/cmd/logdog_collector/main.go |
| @@ -6,6 +6,7 @@ package main |
| import ( |
| "fmt" |
| + "io" |
| "time" |
| "github.com/luci/luci-go/common/auth" |
| @@ -116,34 +117,34 @@ func (a *application) runCollector(c context.Context) error { |
| // Execute our main subscription pull loop. It will run until the supplied |
| // Context is cancelled. |
| - psIterator, err := psSub.Pull(c) |
| + psIterator, err := psSub.Pull(shutdownCtx) |
| if err != nil { |
| log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.") |
| return err |
| } |
| - defer func() { |
| - log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...") |
| - psIterator.Stop() |
| - log.Debugf(c, "Pub/Sub subscription iterator has stopped.") |
| - }() |
| + defer psIterator.Stop() |
| parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) { |
| // Loop until shut down. |
| - for shutdownCtx.Err() == nil { |
| + for { |
| msg, err := psIterator.Next() |
| - if err != nil { |
| + switch err { |
|
dnj
2016/04/11 17:20:04
This was a bugfix ported from Archivist in respons
|
| + case nil: |
| + taskC <- func() error { |
| + c := log.SetField(c, "messageID", msg.ID) |
| + msg.Done(a.processMessage(c, &coll, msg)) |
| + return nil |
| + } |
| + |
| + case io.EOF, context.Canceled, context.DeadlineExceeded: |
| + return |
| + |
| + default: |
| log.Fields{ |
| log.ErrorKey: err, |
| "delay": pubsubPullErrorDelay, |
| }.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...") |
| clock.Sleep(c, pubsubPullErrorDelay) |
| - continue |
| - } |
| - |
| - taskC <- func() error { |
| - c := log.SetField(c, "messageID", msg.ID) |
| - msg.Done(a.processMessage(c, &coll, msg)) |
| - return nil |
| } |
| } |
| })) |