Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Unified Diff: server/cmd/logdog_collector/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/cmd/logdog_collector/main.go
diff --git a/server/cmd/logdog_collector/main.go b/server/cmd/logdog_collector/main.go
index 9ea89b23c4264bf9b6f7acef50d9925341352c25..73769e7bc460ee08c9e0bccd711fde687f5a342b 100644
--- a/server/cmd/logdog_collector/main.go
+++ b/server/cmd/logdog_collector/main.go
@@ -6,6 +6,7 @@ package main
import (
"fmt"
+ "io"
"time"
"github.com/luci/luci-go/common/auth"
@@ -116,34 +117,34 @@ func (a *application) runCollector(c context.Context) error {
// Execute our main subscription pull loop. It will run until the supplied
// Context is cancelled.
- psIterator, err := psSub.Pull(c)
+ psIterator, err := psSub.Pull(shutdownCtx)
if err != nil {
log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator.")
return err
}
- defer func() {
- log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop...")
- psIterator.Stop()
- log.Debugf(c, "Pub/Sub subscription iterator has stopped.")
- }()
+ defer psIterator.Stop()
parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
// Loop until shut down.
- for shutdownCtx.Err() == nil {
+ for {
msg, err := psIterator.Next()
- if err != nil {
+ switch err {
dnj 2016/04/11 17:20:04 This was a bugfix ported from Archivist in respons
+ case nil:
+ taskC <- func() error {
+ c := log.SetField(c, "messageID", msg.ID)
+ msg.Done(a.processMessage(c, &coll, msg))
+ return nil
+ }
+
+ case io.EOF, context.Canceled, context.DeadlineExceeded:
+ return
+
+ default:
log.Fields{
log.ErrorKey: err,
"delay": pubsubPullErrorDelay,
}.Errorf(c, "Failed to fetch Pub/Sub message, retry after delay...")
clock.Sleep(c, pubsubPullErrorDelay)
- continue
- }
-
- taskC <- func() error {
- c := log.SetField(c, "messageID", msg.ID)
- msg.Done(a.processMessage(c, &coll, msg))
- return nil
}
}
}))

Powered by Google App Engine
This is Rietveld 408576698