| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "flag" |
| 9 "fmt" |
| 10 "os" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/auth" |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer" |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub/subscriber" |
| 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/server/internal/logdog/collector" |
| 21 "github.com/luci/luci-go/server/internal/logdog/service" |
| 22 "golang.org/x/net/context" |
| 23 ) |
| 24 |
| 25 var ( |
| 26 errInvalidConfig = errors.New("invalid configuration") |
| 27 ) |
| 28 |
| 29 // application is the Collector application state. |
| 30 type application struct { |
| 31 *service.Service |
| 32 |
| 33 // shutdownCtx is a Context that will be cancelled if our application |
| 34 // receives a shutdown signal. |
| 35 shutdownCtx context.Context |
| 36 } |
| 37 |
| 38 // run is the main execution function. |
| 39 func (a *application) runCollector() error { |
| 40 cfg := a.Config() |
| 41 ccfg := cfg.GetCollector() |
| 42 if ccfg == nil { |
| 43 return errors.New("no collector configuration") |
| 44 } |
| 45 |
| 46 pscfg := cfg.GetTransport().GetPubsub() |
| 47 if pscfg == nil { |
| 48 return errors.New("missing Pub/Sub configuration") |
| 49 } |
| 50 |
| 51 // Our Subscription must be a valid one. |
| 52 sub := pubsub.Subscription(pscfg.Subscription) |
| 53 if err := sub.Validate(); err != nil { |
| 54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) |
| 55 } |
| 56 |
| 57 // New PubSub instance with the authenticated client. |
| 58 psClient, err := a.AuthenticatedClient(func(o *auth.Options) { |
| 59 o.Scopes = pubsub.SubscriberScopes |
| 60 }) |
| 61 if err != nil { |
| 62 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.") |
| 63 return err |
| 64 } |
| 65 |
| 66 // Create a retrying Pub/Sub client. |
| 67 ps := &pubsub.Retry{ |
| 68 Connection: pubsub.NewConnection(psClient), |
| 69 Callback: func(err error, d time.Duration) { |
| 70 log.Fields{ |
| 71 log.ErrorKey: err, |
| 72 "delay": d, |
| 73 }.Warningf(a, "Transient error encountered; retrying..."
) |
| 74 }, |
| 75 } |
| 76 |
| 77 exists, err := ps.SubExists(a, sub) |
| 78 if err != nil { |
| 79 log.Fields{ |
| 80 log.ErrorKey: err, |
| 81 "subscription": pscfg.Subscription, |
| 82 }.Errorf(a, "Could not confirm Pub/Sub subscription.") |
| 83 return errInvalidConfig |
| 84 } |
| 85 if !exists { |
| 86 log.Fields{ |
| 87 "subscription": pscfg.Subscription, |
| 88 }.Errorf(a, "Subscription does not exist.") |
| 89 return errInvalidConfig |
| 90 } |
| 91 log.Fields{ |
| 92 "subscription": sub, |
| 93 }.Infof(a, "Successfully validated Pub/Sub subscription.") |
| 94 |
| 95 // Initialize our Storage. |
| 96 s, err := a.Storage() |
| 97 if err != nil { |
| 98 log.WithError(err).Errorf(a, "Failed to get storage instance.") |
| 99 return err |
| 100 } |
| 101 defer s.Close() |
| 102 |
| 103 // Application shutdown will now operate by cancelling the Collector's |
| 104 // shutdown Context. |
| 105 shutdownCtx, shutdownFunc := context.WithCancel(a) |
| 106 a.SetShutdownFunc(shutdownFunc) |
| 107 defer a.SetShutdownFunc(nil) |
| 108 |
| 109 // Start an ACK buffer so that we can batch ACKs. |
| 110 ab := ackbuffer.New(a, ackbuffer.Config{ |
| 111 Ack: ackbuffer.NewACK(ps, sub, 0), |
| 112 }) |
| 113 defer ab.CloseAndFlush() |
| 114 |
| 115 // Initialize our Collector service object. |
| 116 coll := collector.New(collector.Options{ |
| 117 Coordinator: a.Coordinator(), |
| 118 Storage: s, |
| 119 StreamStateCacheExpire: ccfg.StateCacheExpiration.Duration(), |
| 120 Sem: make(parallel.Semaphore, int(ccfg.Workers)), |
| 121 }) |
| 122 |
| 123 // Execute our main Subscriber loop. It will run until the supplied Cont
ext |
| 124 // is cancelled. |
| 125 engine := subscriber.Subscriber{ |
| 126 S: subscriber.NewSource(ps, sub, 0), |
| 127 A: ab, |
| 128 |
| 129 PullWorkers: int(ccfg.TransportWorkers), |
| 130 HandlerWorkers: int(ccfg.Workers), |
| 131 } |
| 132 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { |
| 133 ctx := log.SetFields(a, log.Fields{ |
| 134 "messageID": msg.ID, |
| 135 "size": len(msg.Data), |
| 136 "ackID": msg.AckID, |
| 137 }) |
| 138 |
| 139 if err := coll.Process(ctx, msg.Data); err != nil { |
| 140 if errors.IsTransient(err) { |
| 141 // Do not consume |
| 142 log.Fields{ |
| 143 log.ErrorKey: err, |
| 144 "msgID": msg.ID, |
| 145 "size": len(msg.Data), |
| 146 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S
ub message.") |
| 147 return false |
| 148 } |
| 149 |
| 150 log.Fields{ |
| 151 log.ErrorKey: err, |
| 152 "msgID": msg.ID, |
| 153 "size": len(msg.Data), |
| 154 }.Errorf(ctx, "Error ingesting Pub/Sub message.") |
| 155 } |
| 156 return true |
| 157 }) |
| 158 |
| 159 log.Debugf(a, "Collector finished.") |
| 160 return nil |
| 161 } |
| 162 |
| 163 // mainImpl is the Main implementaion, and returns the application return code |
| 164 // as an integer. |
| 165 func mainImpl() int { |
| 166 a := application{ |
| 167 Service: service.New(context.Background()), |
| 168 } |
| 169 |
| 170 fs := flag.FlagSet{} |
| 171 a.AddFlags(&fs) |
| 172 |
| 173 if err := fs.Parse(os.Args[1:]); err != nil { |
| 174 log.Errorf(log.SetError(a, err), "Failed to parse command-line."
) |
| 175 return 1 |
| 176 } |
| 177 |
| 178 // Run our configured application instance. |
| 179 var rc int |
| 180 if err := a.Run(a.runCollector); err != nil { |
| 181 log.Errorf(log.SetError(a, err), "Application execution failed."
) |
| 182 return 1 |
| 183 } |
| 184 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.") |
| 185 return 0 |
| 186 } |
| 187 |
| 188 // Entry point. |
| 189 func main() { |
| 190 os.Exit(mainImpl()) |
| 191 } |
| OLD | NEW |