Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(64)

Side by Side Diff: server/internal/logdog/collector/collector.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebased, updated from comments. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "time"
10
11 "github.com/golang/protobuf/proto"
12 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/logdog/butlerproto"
14 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
19 "github.com/luci/luci-go/server/logdog/storage"
20 "golang.org/x/net/context"
21 )
22
23 const (
24 // DefaultStreamStateCacheExpire is the default expiration value.
25 DefaultStreamStateCacheExpire = 10 * time.Minute
26 )
27
28 // Options is the set of options to use for collection.
29 type Options struct {
30 // Coordinator is the coordinator service.
31 Coordinator CoordinatorClient
32 // Storage is the backing store to use.
33 Storage storage.Storage
34 // Sem is a semaphore used to throttle the number of simultaneous ingest
35 // tasks that are executed.
36 Sem parallel.Semaphore
37
38 // StreamStateCacheExpire is the maximum amount of time that a cahced st ream
martiniss 2016/01/27 22:19:43 typo
dnj 2016/01/29 20:46:51 Done.
39 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
40 StreamStateCacheExpire time.Duration
41 }
42
43 // Collector is a stateful collection service.
44 type Collector interface {
45 // Process ingests an encoded ButlerLogBundle message. It is goroutine-s afe,
martiniss 2016/01/27 22:19:43 What does ingest mean? Maybe I just don't know the
dnj 2016/01/29 20:46:51 Ingest as in like, eating something.
46 // but may throttle based on the configured semaphore.
47 //
48 // If a transient error occurs during ingest, Process will return an
49 // errors.Transient error.
50 Process(context.Context, []byte) error
51 }
52
53 // collector is an implementation of Collector.
54 type collector struct {
55 *Options
56
57 // streamState is the stream state lookup and caching.
58 streamState *streamStateCache
59 }
60
61 // New instantiates a new Collector instance.
62 func New(o Options) Collector {
63 return &collector{
64 Options: &o,
65 streamState: newStreamStateCache(streamStateCacheOptions{
66 coordinator: o.Coordinator,
67 expiration: o.StreamStateCacheExpire,
68 }),
69 }
70 }
71
72 // Process ingests an encoded ButlerLogBundle message.
martiniss 2016/01/27 22:19:43 Why not just say it implements the interface?
dnj 2016/01/29 20:46:52 Done.
73 //
74 // If a transient error occurs during ingest, Process will return an error.
75 // If no error occurred, or if there was an error with the input data, no error
76 // will be returned.
77 func (c *collector) Process(ctx context.Context, msg []byte) error {
78 pr := butlerproto.Reader{}
79 if err := pr.Read(bytes.NewReader(msg)); err != nil {
80 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
81 return nil
82 }
83 if pr.Metadata.ProtoVersion != logpb.Version {
84 log.Fields{
85 "messageProtoVersion": pr.Metadata.ProtoVersion,
86 "currentProtoVersion": logpb.Version,
87 }.Errorf(ctx, "Unknown protobuf version.")
88 return nil
89 }
90 if pr.Bundle == nil {
91 log.Errorf(ctx, "Protocol message did not contain a Butler bundl e.")
92 return nil
93 }
94
95 // Handle each individual stream in parallel.
martiniss 2016/01/27 22:19:43 This doesn't seem related to the next line of code
iannucci 2016/01/28 01:15:35 duplicate comment w/ below?
dnj 2016/01/29 20:46:51 Done.
96 if len(pr.Bundle.Entries) == 0 {
97 return nil
98 }
99
100 lw := logWork{
101 md: pr.Metadata,
102 b: pr.Bundle,
103 }
104
105 // Handle each bundle entry in parallel. We will use a separate work poo l
106 // here so that top-level bundle dispatch can't deadlock the processing tasks.
107 //
108 // If we don't have a semaphore, this will also be unbounded, since cap( nil)
109 // is 0.
110 err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) {
111 for _, be := range pr.Bundle.Entries {
112 lw := lw
113 lw.be = be
114 taskC <- func() error {
115 return c.processLogStream(ctx, &lw)
116 }
117 }
118 })
119 if err != nil {
120 if hasTransientError(err) {
121 log.Fields{
122 log.ErrorKey: err,
123 }.Warningf(ctx, "Transient error encountered during proc essing.")
124 return err
125 }
126
127 log.Fields{
128 log.ErrorKey: err,
129 }.Errorf(ctx, "Non-transient error encountered during processing ; discarding.")
130 }
131 return nil
martiniss 2016/01/27 22:19:43 wait why don't we return an error if there was one
dnj 2016/01/29 20:46:52 I actually mixed transient and non-transient code
132 }
133
134 // logWork is a cumulative set of read-only state passed around by value for log
135 // processing.
136 type logWork struct {
137 // md is the metadata associated with the overall message.
138 md *logpb.ButlerMetadata
139 // b is the Butler bundle.
140 b *logpb.ButlerLogBundle
141 // be is the Bundle entry.
142 be *logpb.ButlerLogBundle_Entry
143 // path is the constructed path of the stream being processed.
144 path types.StreamPath
145 // le is the LogEntry in the bundle entry.
146 le *logpb.LogEntry
147 }
148
149 // processLogStream processes an individual set of log messages belonging to the
150 // same log stream.
151 func (c *collector) processLogStream(ctx context.Context, lw *logWork) error {
152 if err := lw.be.Desc.Validate(true); err != nil {
153 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.")
154 return nil
155 }
156 lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b e.Desc.Name))
157 ctx = log.SetField(ctx, "path", lw.path)
158
159 if len(lw.be.Secret) == 0 {
160 log.Errorf(ctx, "Missing secret.")
161 return nil
162 }
163
164 // Fetch our cached/remote state. This will replace our state object wit h the
165 // fetched state, so any future calls will need to re-set the Secret val ue.
166 // TODO: Use timeout?
167 state, err := c.streamState.getOrRegister(ctx, &cc.State{
168 Path: lw.path,
169 Secret: types.StreamSecret(lw.be.Secret),
170 ProtoVersion: lw.md.ProtoVersion,
171 Descriptor: lw.be.Desc,
172 })
173 if err != nil {
174 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.")
175 return err
176 }
177
178 // Does the log stream's secret match the expected secret?
179 if !bytes.Equal(lw.be.Secret, []byte(state.secret)) {
180 log.Errorf(log.SetFields(ctx, log.Fields{
181 "secret": lw.be.Secret,
182 "expectedSecret": state.secret,
183 }), "Log entry has incorrect secret.")
184 return nil
185 }
186
187 if state.archived {
188 log.Infof(ctx, "Skipping message bundle for archived stream.")
189 return nil
190 }
191 if state.purged {
192 log.Infof(ctx, "Skipping message bundle for purged stream.")
193 return nil
194 }
195
196 // Update our terminal index if we have one.
197 //
198 // Note that even if our cached value is marked terminal, we could have failed
199 // to push the terminal index to the Coordinator, so we will not refrain from
200 // pushing every terminal index encountered regardless of cache state.
201 if lw.be.Terminal {
202 tidx := types.MessageIndex(lw.be.TerminalIndex)
203 log.Fields{
204 "value": tidx,
205 }.Debugf(ctx, "Bundle includes a terminal index.")
206
207 if state.terminalIndex < 0 {
208 state.terminalIndex = tidx
209 } else if state.terminalIndex != tidx {
210 log.Fields{
211 "cachedIndex": state.terminalIndex,
212 "bundleIndex": tidx,
213 }.Warningf(ctx, "Cached terminal index disagrees with st ate.")
214 }
215 }
216
217 // In parallel, load the log entries into Storage. Throttle this with ou r
218 // ingest semaphore.
219 return parallel.Run(c.Sem, func(taskC chan<- func() error) {
220 for i, le := range lw.be.Logs {
221 i, le := i, le
222
223 // Store this LogEntry
224 taskC <- func() error {
225 if err := le.Validate(lw.be.Desc); err != nil {
226 log.Fields{
227 log.ErrorKey: err,
228 "index": i,
229 }.Warningf(ctx, "Discarding invalid log entry.")
230 return nil
231 }
232
233 if state.terminalIndex >= 0 && types.MessageInde x(le.StreamIndex) > state.terminalIndex {
234 log.Fields{
235 "index": le.StreamIndex,
236 "terminalIndex": state.terminalI ndex,
237 }.Warningf(ctx, "Stream is terminated be fore log entry; discarding.")
238 return nil
239 }
240
241 lw := *lw
242 lw.le = le
243 return c.processLogEntry(ctx, &lw)
244 }
245 }
246
247 // If our bundle entry is terminal, we have an additional task o f reporting
248 // this to the Coordinator.
249 if lw.be.Terminal {
250 taskC <- func() error {
251 // Sentinel task: Update the terminal bundle sta te.
252 state := *state
253 state.terminalIndex = types.MessageIndex(lw.be.T erminalIndex)
254
255 log.Fields{
256 "terminalIndex": state.terminalIndex,
257 }.Infof(ctx, "Received terminal log; updating Co ordinator state.")
258 state.secret = types.StreamSecret(lw.be.Secret)
259
260 if err := c.streamState.setTerminalIndex(ctx, &s tate); err != nil {
261 log.WithError(err).Errorf(ctx, "Failed t o set stream terminal index.")
262 return err
263 }
264 return nil
265 }
266 }
267 })
268 }
269
270 func (c *collector) processLogEntry(ctx context.Context, lw *logWork) error {
271 data, err := proto.Marshal(lw.le)
272 if err != nil {
273 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
274 return err
275 }
276
277 // Post the log to storage.
278 err = c.Storage.Put(&storage.PutRequest{
279 Path: lw.path,
280 Index: types.MessageIndex(lw.le.StreamIndex),
281 Value: data,
282 })
283 // If the log entry already exists, consider the "put" successful.
284 if err != nil && err != storage.ErrExists {
285 log.WithError(err).Errorf(ctx, "Failed to load log entry into St orage.")
286 return err
287 }
288 return nil
289 }
290
291 // wrapMultiErrorTransient wraps an error in a TransientError wrapper.
292 //
293 // If the error is nil, it will return nil. If the error is already transient,
294 // it will be directly returned. If the error is a MultiError, its sub-errors
295 // will be evaluated and wrapped in a TransientError if any of its sub-errors
296 // are transient errors.
297 func hasTransientError(err error) bool {
298 if merr, ok := err.(errors.MultiError); ok {
299 for _, e := range merr {
300 if hasTransientError(e) {
301 return true
302 }
303 }
304 return false
305 }
306
307 return errors.IsTransient(err)
308 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698