Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "io" | |
| 9 "time" | 10 "time" |
| 10 | 11 |
| 11 "github.com/luci/luci-go/common/auth" | 12 "github.com/luci/luci-go/common/auth" |
| 12 "github.com/luci/luci-go/common/clock" | 13 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
| 17 "github.com/luci/luci-go/server/internal/logdog/collector" | 18 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 109 | 110 |
| 110 coll := collector.Collector{ | 111 coll := collector.Collector{ |
| 111 Coordinator: coord, | 112 Coordinator: coord, |
| 112 Storage: st, | 113 Storage: st, |
| 113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), | 114 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), |
| 114 } | 115 } |
| 115 defer coll.Close() | 116 defer coll.Close() |
| 116 | 117 |
| 117 // Execute our main subscription pull loop. It will run until the suppli ed | 118 // Execute our main subscription pull loop. It will run until the suppli ed |
| 118 // Context is cancelled. | 119 // Context is cancelled. |
| 119 » psIterator, err := psSub.Pull(c) | 120 » psIterator, err := psSub.Pull(shutdownCtx) |
| 120 if err != nil { | 121 if err != nil { |
| 121 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ") | 122 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ") |
| 122 return err | 123 return err |
| 123 } | 124 } |
| 124 » defer func() { | 125 » defer psIterator.Stop() |
| 125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop ...") | |
| 126 » » psIterator.Stop() | |
| 127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.") | |
| 128 » }() | |
| 129 | 126 |
| 130 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) { | 127 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) { |
| 131 // Loop until shut down. | 128 // Loop until shut down. |
| 132 » » for shutdownCtx.Err() == nil { | 129 » » for { |
| 133 msg, err := psIterator.Next() | 130 msg, err := psIterator.Next() |
| 134 » » » if err != nil { | 131 » » » switch err { |
|
dnj
2016/04/11 17:20:04
This was a bugfix ported from Archivist in respons
| |
| 132 » » » case nil: | |
| 133 » » » » taskC <- func() error { | |
| 134 » » » » » c := log.SetField(c, "messageID", msg.ID ) | |
| 135 » » » » » msg.Done(a.processMessage(c, &coll, msg) ) | |
| 136 » » » » » return nil | |
| 137 » » » » } | |
| 138 | |
| 139 » » » case io.EOF, context.Canceled, context.DeadlineExceeded: | |
| 140 » » » » return | |
| 141 | |
| 142 » » » default: | |
| 135 log.Fields{ | 143 log.Fields{ |
| 136 log.ErrorKey: err, | 144 log.ErrorKey: err, |
| 137 "delay": pubsubPullErrorDelay, | 145 "delay": pubsubPullErrorDelay, |
| 138 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...") | 146 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...") |
| 139 clock.Sleep(c, pubsubPullErrorDelay) | 147 clock.Sleep(c, pubsubPullErrorDelay) |
| 140 continue | |
| 141 } | |
| 142 | |
| 143 taskC <- func() error { | |
| 144 c := log.SetField(c, "messageID", msg.ID) | |
| 145 msg.Done(a.processMessage(c, &coll, msg)) | |
| 146 return nil | |
| 147 } | 148 } |
| 148 } | 149 } |
| 149 })) | 150 })) |
| 150 | 151 |
| 151 log.Debugf(c, "Collector finished.") | 152 log.Debugf(c, "Collector finished.") |
| 152 return nil | 153 return nil |
| 153 } | 154 } |
| 154 | 155 |
| 155 // processMessage returns true if the message should be ACK'd (deleted from | 156 // processMessage returns true if the message should be ACK'd (deleted from |
| 156 // Pub/Sub) or false if the message should not be ACK'd. | 157 // Pub/Sub) or false if the message should not be ACK'd. |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.") | 191 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.") |
| 191 return true | 192 return true |
| 192 } | 193 } |
| 193 } | 194 } |
| 194 | 195 |
| 195 // Entry point. | 196 // Entry point. |
| 196 func main() { | 197 func main() { |
| 197 a := application{} | 198 a := application{} |
| 198 a.Run(context.Background(), a.runCollector) | 199 a.Run(context.Background(), a.runCollector) |
| 199 } | 200 } |
| OLD | NEW |