Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1311)

Side by Side Diff: server/internal/logdog/collector/collector.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Comments, rebase. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/cmd/logdog_collector/run.sh ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "bytes"
9 "time"
10
11 "github.com/golang/protobuf/proto"
12 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/logdog/butlerproto"
14 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage"
20 "golang.org/x/net/context"
21 )
22
23 // Collector is a stateful object responsible for ingesting LogDog logs,
24 // registering them with a Coordinator, and stowing them in short-term storage
25 // for streaming and processing.
26 type Collector struct {
27 // Coordinator is used to interface with the Coordinator client.
28 //
29 // On production systems, this should wrapped with a caching client (see
30 // the stateCache sub-package) to avoid overwhelming the server.
31 Coordinator coordinator.Coordinator
32
33 // Storage is the backing store to use.
34 Storage storage.Storage
35 // Sem is a semaphore used to throttle the number of simultaneous ingest
36 // tasks that are executed.
37 Sem parallel.Semaphore
38
39 // StreamStateCacheExpire is the maximum amount of time that a cached st ream
40 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be used.
41 StreamStateCacheExpire time.Duration
42 }
43
44 // Process ingests an encoded ButlerLogBundle message, registering it with
45 // the LogDog Coordinator and stowing it in a temporary Storage for streaming
46 // retrieval.
47 //
48 // If a transient error occurs during ingest, Process will return an error.
49 // If no error occurred, or if there was an error with the input data, no error
50 // will be returned.
51 func (c *Collector) Process(ctx context.Context, msg []byte) error {
52 pr := butlerproto.Reader{}
53 if err := pr.Read(bytes.NewReader(msg)); err != nil {
54 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.")
55 return nil
56 }
57 if pr.Metadata.ProtoVersion != logpb.Version {
58 log.Fields{
59 "messageProtoVersion": pr.Metadata.ProtoVersion,
60 "currentProtoVersion": logpb.Version,
61 }.Errorf(ctx, "Unknown protobuf version.")
62 return nil
63 }
64 if pr.Bundle == nil {
65 log.Errorf(ctx, "Protocol message did not contain a Butler bundl e.")
66 return nil
67 }
68
69 // If there are no entries, there is nothing to do.
70 if len(pr.Bundle.Entries) == 0 {
71 return nil
72 }
73
74 lw := logWork{
75 md: pr.Metadata,
76 b: pr.Bundle,
77 }
78
79 // Handle each bundle entry in parallel. We will use a separate work poo l
80 // here so that top-level bundle dispatch can't deadlock the processing tasks.
81 //
82 // If we don't have a semaphore, this will also be unbounded, since cap( nil)
83 // is 0.
84 err := parallel.WorkPool(cap(c.Sem), func(taskC chan<- func() error) {
85 for _, be := range pr.Bundle.Entries {
86 lw := lw
87 lw.be = be
88 taskC <- func() error {
89 return c.processLogStream(ctx, &lw)
90 }
91 }
92 })
93 if err != nil {
94 if hasTransientError(err) && !errors.IsTransient(err) {
95 // err has a nested transient error; propagate that to t op.
96 err = errors.WrapTransient(err)
97 }
98 return err
99 }
100 return nil
101 }
102
103 // logWork is a cumulative set of read-only state passed around by value for log
104 // processing.
105 type logWork struct {
106 // md is the metadata associated with the overall message.
107 md *logpb.ButlerMetadata
108 // b is the Butler bundle.
109 b *logpb.ButlerLogBundle
110 // be is the Bundle entry.
111 be *logpb.ButlerLogBundle_Entry
112 // path is the constructed path of the stream being processed.
113 path types.StreamPath
114 // le is the LogEntry in the bundle entry.
115 le *logpb.LogEntry
116 }
117
118 // processLogStream processes an individual set of log messages belonging to the
119 // same log stream.
120 func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error {
121 if err := lw.be.Desc.Validate(true); err != nil {
122 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.")
123 return nil
124 }
125 lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b e.Desc.Name))
126 ctx = log.SetField(ctx, "path", lw.path)
127
128 if len(lw.be.Secret) == 0 {
129 log.Errorf(ctx, "Missing secret.")
130 return nil
131 }
132
133 // Fetch our cached/remote state. This will replace our state object wit h the
134 // fetched state, so any future calls will need to re-set the Secret val ue.
135 // TODO: Use timeout?
136 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt ate{
137 Path: lw.path,
138 Secret: types.StreamSecret(lw.be.Secret),
139 ProtoVersion: lw.md.ProtoVersion,
140 }, lw.be.Desc)
141 if err != nil {
142 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.")
143 return err
144 }
145
146 // Does the log stream's secret match the expected secret?
147 if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) {
148 log.Errorf(log.SetFields(ctx, log.Fields{
149 "secret": lw.be.Secret,
150 "expectedSecret": state.Secret,
151 }), "Log entry has incorrect secret.")
152 return nil
153 }
154
155 if state.Archived {
156 log.Infof(ctx, "Skipping message bundle for archived stream.")
157 return nil
158 }
159 if state.Purged {
160 log.Infof(ctx, "Skipping message bundle for purged stream.")
161 return nil
162 }
163
164 // Update our terminal index if we have one.
165 //
166 // Note that even if our cached value is marked terminal, we could have failed
167 // to push the terminal index to the Coordinator, so we will not refrain from
168 // pushing every terminal index encountered regardless of cache state.
169 if lw.be.Terminal {
170 tidx := types.MessageIndex(lw.be.TerminalIndex)
171 log.Fields{
172 "value": tidx,
173 }.Debugf(ctx, "Bundle includes a terminal index.")
174
175 if state.TerminalIndex < 0 {
176 state.TerminalIndex = tidx
177 } else if state.TerminalIndex != tidx {
178 log.Fields{
179 "cachedIndex": state.TerminalIndex,
180 "bundleIndex": tidx,
181 }.Warningf(ctx, "Cached terminal index disagrees with st ate.")
182 }
183 }
184
185 // In parallel, load the log entries into Storage. Throttle this with ou r
186 // ingest semaphore.
187 return parallel.Run(c.Sem, func(taskC chan<- func() error) {
188 for i, le := range lw.be.Logs {
189 i, le := i, le
190
191 // Store this LogEntry
192 taskC <- func() error {
193 if err := le.Validate(lw.be.Desc); err != nil {
194 log.Fields{
195 log.ErrorKey: err,
196 "index": i,
197 }.Warningf(ctx, "Discarding invalid log entry.")
198 return nil
199 }
200
201 if state.TerminalIndex >= 0 && types.MessageInde x(le.StreamIndex) > state.TerminalIndex {
202 log.Fields{
203 "index": le.StreamIndex,
204 "terminalIndex": state.TerminalI ndex,
205 }.Warningf(ctx, "Stream is terminated be fore log entry; discarding.")
206 return nil
207 }
208
209 lw := *lw
210 lw.le = le
211 return c.processLogEntry(ctx, &lw)
212 }
213 }
214
215 // If our bundle entry is terminal, we have an additional task o f reporting
216 // this to the Coordinator.
217 if lw.be.Terminal {
218 taskC <- func() error {
219 // Sentinel task: Update the terminal bundle sta te.
220 state := *state
221 state.TerminalIndex = types.MessageIndex(lw.be.T erminalIndex)
222
223 log.Fields{
224 "terminalIndex": state.TerminalIndex,
225 }.Infof(ctx, "Received terminal log; updating Co ordinator state.")
226
227 if err := c.Coordinator.TerminateStream(ctx, &st ate); err != nil {
228 log.WithError(err).Errorf(ctx, "Failed t o set stream terminal index.")
229 return err
230 }
231 return nil
232 }
233 }
234 })
235 }
236
237 func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error {
238 data, err := proto.Marshal(lw.le)
239 if err != nil {
240 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.")
241 return err
242 }
243
244 // Post the log to storage.
245 err = c.Storage.Put(&storage.PutRequest{
246 Path: lw.path,
247 Index: types.MessageIndex(lw.le.StreamIndex),
248 Value: data,
249 })
250
251 // If the log entry already exists, consider the "put" successful.
252 //
253 // All Storage errors are considered transient, as they are safe and
254 // data-agnostic.
255 if err != nil && err != storage.ErrExists {
256 log.WithError(err).Errorf(ctx, "Failed to load log entry into St orage.")
257 return errors.WrapTransient(err)
258 }
259 return nil
260 }
261
262 // wrapMultiErrorTransient wraps an error in a TransientError wrapper.
263 //
264 // If the error is nil, it will return nil. If the error is already transient,
265 // it will be directly returned. If the error is a MultiError, its sub-errors
266 // will be evaluated and wrapped in a TransientError if any of its sub-errors
267 // are transient errors.
268 func hasTransientError(err error) bool {
269 if merr, ok := err.(errors.MultiError); ok {
270 for _, e := range merr {
271 if hasTransientError(e) {
272 return true
273 }
274 }
275 return false
276 }
277
278 return errors.IsTransient(err)
279 }
OLDNEW
« no previous file with comments | « server/cmd/logdog_collector/run.sh ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698