Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package main
6
7 import (
8 "flag"
9 "fmt"
10 "os"
11 "time"
12
13 "github.com/luci/luci-go/common/auth"
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/pubsub"
16 "github.com/luci/luci-go/common/gcloud/pubsub/ackbuffer"
17 "github.com/luci/luci-go/common/gcloud/pubsub/subscriber"
18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/server/internal/logdog/collector"
21 "github.com/luci/luci-go/server/internal/logdog/service"
22 "golang.org/x/net/context"
23 )
24
25 var (
26 errInvalidConfig = errors.New("invalid configuration")
27 )
28
29 // application is the Collector application state.
30 type application struct {
31 *service.Service
32
33 // shutdownCtx is a Context that will be cancelled if our application
34 // receives a shutdown signal.
35 shutdownCtx context.Context
36 }
37
38 // run is the main execution function.
39 func (a *application) runCollector() error {
40 cfg := a.Config()
41 ccfg := cfg.GetCollector()
42 if ccfg == nil {
43 return errors.New("no collector configuration")
44 }
45
46 pscfg := cfg.GetTransport().GetPubsub()
47 if pscfg == nil {
48 return errors.New("missing Pub/Sub configuration")
49 }
50
51 // Our Subscription must be a valid one.
52 sub := pubsub.Subscription(pscfg.Subscription)
53 if err := sub.Validate(); err != nil {
54 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er r)
55 }
56
57 // New PubSub instance with the authenticated client.
58 psClient, err := a.AuthenticatedClient(func(o *auth.Options) {
59 o.Scopes = pubsub.SubscriberScopes
60 })
61 if err != nil {
62 log.WithError(err).Errorf(a, "Failed to create Pub/Sub client.")
63 return err
64 }
65
66 // Create a retrying Pub/Sub client.
67 ps := &pubsub.Retry{
68 Connection: pubsub.NewConnection(psClient),
69 Callback: func(err error, d time.Duration) {
70 log.Fields{
71 log.ErrorKey: err,
72 "delay": d,
73 }.Warningf(a, "Transient error encountered; retrying..." )
74 },
75 }
76
77 exists, err := ps.SubExists(a, sub)
78 if err != nil {
79 log.Fields{
80 log.ErrorKey: err,
81 "subscription": pscfg.Subscription,
82 }.Errorf(a, "Could not confirm Pub/Sub subscription.")
83 return errInvalidConfig
84 }
85 if !exists {
86 log.Fields{
87 "subscription": pscfg.Subscription,
88 }.Errorf(a, "Subscription does not exist.")
89 return errInvalidConfig
90 }
91 log.Fields{
92 "subscription": sub,
93 }.Infof(a, "Successfully validated Pub/Sub subscription.")
94
95 // Initialize our Storage.
96 s, err := a.Storage()
97 if err != nil {
98 log.WithError(err).Errorf(a, "Failed to get storage instance.")
99 return err
100 }
101 defer s.Close()
102
103 // Application shutdown will now operate by cancelling the Collector's
104 // shutdown Context.
105 shutdownCtx, shutdownFunc := context.WithCancel(a)
106 a.SetShutdownFunc(shutdownFunc)
107 defer a.SetShutdownFunc(nil)
108
109 // Start an ACK buffer so that we can batch ACKs.
110 ab := ackbuffer.New(a, ackbuffer.Config{
111 Ack: ackbuffer.NewACK(ps, sub, 0),
112 })
113 defer ab.CloseAndFlush()
114
115 // Initialize our Collector service object.
116 coll := collector.New(collector.Options{
117 Coordinator: a.Coordinator(),
118 Storage: s,
119 StreamStateCacheExpire: ccfg.StateCacheExpiration.Duration(),
120 Sem: make(parallel.Semaphore, int(ccfg.Workers)),
121 })
122
123 // Execute our main Subscriber loop. It will run until the supplied Cont ext
124 // is cancelled.
125 engine := subscriber.Subscriber{
126 S: subscriber.NewSource(ps, sub, 0),
127 A: ab,
128
129 PullWorkers: int(ccfg.TransportWorkers),
130 HandlerWorkers: int(ccfg.Workers),
131 }
132 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool {
133 ctx := log.SetFields(a, log.Fields{
134 "messageID": msg.ID,
135 "size": len(msg.Data),
136 "ackID": msg.AckID,
137 })
138
139 if err := coll.Process(ctx, msg.Data); err != nil {
140 if errors.IsTransient(err) {
141 // Do not consume
142 log.Fields{
143 log.ErrorKey: err,
144 "msgID": msg.ID,
145 "size": len(msg.Data),
146 }.Warningf(ctx, "TRANSIENT error ingesting Pub/S ub message.")
147 return false
148 }
149
150 log.Fields{
151 log.ErrorKey: err,
152 "msgID": msg.ID,
153 "size": len(msg.Data),
154 }.Errorf(ctx, "Error ingesting Pub/Sub message.")
155 }
156 return true
157 })
158
159 log.Debugf(a, "Collector finished.")
160 return nil
161 }
162
163 // mainImpl is the Main implementaion, and returns the application return code
164 // as an integer.
165 func mainImpl() int {
166 a := application{
167 Service: service.New(context.Background()),
168 }
169
170 fs := flag.FlagSet{}
171 a.AddFlags(&fs)
172
173 if err := fs.Parse(os.Args[1:]); err != nil {
174 log.Errorf(log.SetError(a, err), "Failed to parse command-line." )
175 return 1
176 }
177
178 // Run our configured application instance.
179 var rc int
180 if err := a.Run(a.runCollector); err != nil {
181 log.Errorf(log.SetError(a, err), "Application execution failed." )
182 return 1
183 }
184 log.Infof(log.SetField(a, "returnCode", rc), "Terminating.")
185 return 0
186 }
187
188 // Entry point.
189 func main() {
190 os.Exit(mainImpl())
191 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698