Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Unified Diff: server/cmd/logdog_collector/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/cmd/logdog_archivist/task.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/cmd/logdog_collector/main.go
diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
index 9ea89b23c4264bf9b6f7acef50d9925341352c25..683e8dcbab0a5c01dec94b1caf3f1e987286b6c7 100644
--- a/server/cmd/logdog_collector/main.go
+++ b/server/cmd/logdog_collector/main.go
@@ -6,6 +6,7 @@ package main
import (
"fmt"
+ "io"
"time"
"github.com/luci/luci-go/common/auth"
@@ -97,11 +98,6 @@ func (a *application) runCollector(c context.Context) error {
}
defer st.Close()
- // Application shutdown will now operate by cancelling the Collector's
- // shutdown Context.
- shutdownCtx, shutdownFunc := context.WithCancel(c)
- a.SetShutdownFunc(shutdownFunc)
-
// Initialize our Collector service object using a caching Coordinator
// interface.
coord := coordinator.NewCoordinator(a.Coordinator())
@@ -121,29 +117,33 @@ func (a *application) runCollector(c context.Context) error {
log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.")
return err
}
- defer func() {
- log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...")
- psIterator.Stop()
- log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
- }()
+ defer psIterator.Stop()
+
+ // Application shutdown will now operate by cancelling the Collector's
+ // shutdown Context.
+ a.SetShutdownFunc(psIterator.Stop)
parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
// Loop until shut down.
- for shutdownCtx.Err() == nil {
+ for {
msg, err := psIterator.Next()
- if err != nil {
+ switch err {
+ case nil:
+ taskC <- func() error {
+ c := log.SetField(c, "messageID", msg.ID)
+ msg.Done(a.processMessage(c, &coll, msg))
+ return nil
+ }
+
+ case io.EOF, context.Canceled, context.DeadlineExceeded:
+ return
+
+ default:
log.Fields{
log.ErrorKey: err,
"delay": pubsubPullErrorDelay,
}.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...")
clock.Sleep(c, pubsubPullErrorDelay)
- continue
- }
-
- taskC <- func() error {
- c := log.SetField(c, "messageID", msg.ID)
- msg.Done(a.processMessage(c, &coll, msg))
- return nil
}
}
}))
« no previous file with comments | « server/cmd/logdog_archivist/task.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698