Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(590)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/cmd/logdog_collector/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes"
9 "encoding/hex"
8 "fmt" 10 "fmt"
11 "io"
9 12
10 "github.com/golang/protobuf/proto" 13 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
12 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/gcloud/gs" 16 "github.com/luci/luci-go/common/gcloud/gs"
14 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel"
16 "github.com/luci/luci-go/common/proto/logdog/logpb" 20 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "github.com/luci/luci-go/server/logdog/archive" 21 "github.com/luci/luci-go/server/logdog/archive"
18 "github.com/luci/luci-go/server/logdog/storage" 22 "github.com/luci/luci-go/server/logdog/storage"
19 "golang.org/x/net/context" 23 "golang.org/x/net/context"
20 ) 24 )
21 25
26 // Task is a single archive task.
27 type Task interface {
28 // UniqueID returns a task-unique value. Other tasks, and other retries of
29 // this task, should (try to) not reuse this ID.
30 UniqueID() string
31
32 // Task is the archive task to execute.
33 Task() *logdog.ArchiveTask
34
35 // AssertLease asserts that the lease for this Task is still held.
36 //
37 // On failure, it will return an error. If successful, the Archivist may
38 // assume that it holds the lease longer.
39 AssertLease(context.Context) error
40 }
41
22 // Archivist is a stateless configuration capable of archiving individual log 42 // Archivist is a stateless configuration capable of archiving individual log
23 // streams. 43 // streams.
24 type Archivist struct { 44 type Archivist struct {
25 // Service is the client to use to communicate with Coordinator's Servic es 45 // Service is the client to use to communicate with Coordinator's Servic es
26 // endpoint. 46 // endpoint.
27 Service logdog.ServicesClient 47 Service logdog.ServicesClient
28 48
29 » // Storage is the intermediate storage instance to use to pull log entri es for 49 » // Storage is the archival source Storage instance.
30 » // archival.
31 Storage storage.Storage 50 Storage storage.Storage
32
33 // GSClient is the Google Storage client to for archive generation. 51 // GSClient is the Google Storage client to for archive generation.
34 GSClient gs.Client 52 GSClient gs.Client
35 53
36 // GSBase is the base Google Storage path. This includes the bucket name 54 // GSBase is the base Google Storage path. This includes the bucket name
37 // and any associated path. 55 // and any associated path.
38 GSBase gs.Path 56 GSBase gs.Path
57 // GSStagingBase is the base Google Storage path for archive staging. Th is
58 // includes the bucket name and any associated path.
59 GSStagingBase gs.Path
60
39 // PrefixIndexRange is the maximum number of stream indexes in between i ndex 61 // PrefixIndexRange is the maximum number of stream indexes in between i ndex
40 // entries. See archive.Manifest for more information. 62 // entries. See archive.Manifest for more information.
41 StreamIndexRange int 63 StreamIndexRange int
42 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex 64 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex
43 // entries. See archive.Manifest for more information. 65 // entries. See archive.Manifest for more information.
44 PrefixIndexRange int 66 PrefixIndexRange int
45 // ByteRange is the maximum number of stream data bytes in between index 67 // ByteRange is the maximum number of stream data bytes in between index
46 // entries. See archive.Manifest for more information. 68 // entries. See archive.Manifest for more information.
47 ByteRange int 69 ByteRange int
48 } 70 }
49 71
50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used 72 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
51 // to during archival. This should be greater than the maximum LogEntry size. 73 // to during archival. This should be greater than the maximum LogEntry size.
52 const storageBufferSize = types.MaxLogEntryDataSize * 64 74 const storageBufferSize = types.MaxLogEntryDataSize * 64
53 75
54 // ArchiveTask processes and executes a single log stream archive task. 76 // ArchiveTask processes and executes a single log stream archive task.
55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error {
56 var task logdog.ArchiveTask
57 if err := proto.Unmarshal(desc, &task); err != nil {
58 log.WithError(err).Errorf(c, "Failed to decode archive task.")
59 return err
60 }
61 return a.Archive(c, &task)
62 }
63
64 // Archive archives a single log stream. If unsuccessful, an error is returned.
65 // 77 //
66 // This error may be wrapped in errors.Transient if it is believed to have been 78 // It returns true on success (delete the task) and false on failure (don't
67 // caused by a transient failure. 79 // delete the task). The return value of true should only be used if the task
80 // is truly complete and acknowledged by the Coordinator.
68 // 81 //
69 // If the supplied Context is Done, operation may terminate before completion, 82 // If the supplied Context is Done, operation may terminate before completion,
70 // returning the Context's error. 83 // returning the Context's error.
71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { 84 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
85 » delete, err := a.archiveTaskImpl(c, task)
86 » log.Fields{
87 » » log.ErrorKey: err,
88 » » "delete": delete,
89 » » "path": task.Task().Path,
90 » }.Infof(c, "Finished archive task.")
91 » return delete
92 }
93
94 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes
95 // an error. The error is useful for testing to assert that certain conditions
96 // were hit.
97 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
98 » at := task.Task()
99 » log.Fields{
100 » » "path": at.Path,
101 » }.Debugf(c, "Received archival task.")
102
72 // Load the log stream's current state. If it is already archived, we wi ll 103 // Load the log stream's current state. If it is already archived, we wi ll
73 // return an immediate success. 104 // return an immediate success.
74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ 105 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
75 » » Path: t.Path, 106 » » Path: at.Path,
76 Desc: true, 107 Desc: true,
77 }) 108 })
78 switch { 109 switch {
79 case err != nil: 110 case err != nil:
80 log.WithError(err).Errorf(c, "Failed to load log stream.") 111 log.WithError(err).Errorf(c, "Failed to load log stream.")
81 » » return err 112 » » return false, err
113
82 case ls.State == nil: 114 case ls.State == nil:
83 » » return errors.New("missing state") 115 » » log.Errorf(c, "Log stream did not include state.")
116 » » return false, errors.New("log stream did not include state")
117
118 » case ls.State.Purged:
119 » » log.Warningf(c, "Log stream is purged. Discarding archival reque st.")
120 » » return true, errors.New("log stream is purged")
121
122 » case ls.State.Archived:
123 » » log.Infof(c, "Log stream is already archived. Discarding archiva l request.")
124 » » return true, errors.New("log stream is archived")
125
126 » case !bytes.Equal(ls.ArchivalKey, at.Key):
127 » » if len(ls.ArchivalKey) == 0 {
128 » » » // The log stream is not registering as "archive pending " state.
129 » » » //
130 » » » // This can happen if the eventually-consistent datastor e hasn't updated
131 » » » // its log stream state by the time this Pub/Sub task is received. In
132 » » » // this case, we will continue retrying the task until d atastore registers
133 » » » // that some key is associated with it.
134 » » » log.Fields{
135 » » » » "logStreamArchivalKey": hex.EncodeToString(ls.Ar chivalKey),
136 » » » » "requestArchivalKey": hex.EncodeToString(at.Ke y),
137 » » » }.Infof(c, "Archival request received before log stream has its key.")
138 » » » return false, errors.New("premature archival request")
139 » » }
140
141 » » // This can happen if a Pub/Sub message is dispatched during a t ransaction,
142 » » // but that specific transaction failed. In this case, the Pub/S ub message
143 » » // will have a key that doesn't match the key that was transacti onally
144 » » // encoded, and can be discarded.
145 » » log.Fields{
146 » » » "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKe y),
147 » » » "requestArchivalKey": hex.EncodeToString(at.Key),
148 » » }.Infof(c, "Superfluous archival request (keys do not match). Di scarding.")
149 » » return true, errors.New("superfluous archival request")
150
84 case ls.State.ProtoVersion != logpb.Version: 151 case ls.State.ProtoVersion != logpb.Version:
85 log.Fields{ 152 log.Fields{
86 "protoVersion": ls.State.ProtoVersion, 153 "protoVersion": ls.State.ProtoVersion,
87 "expectedVersion": logpb.Version, 154 "expectedVersion": logpb.Version,
88 }.Errorf(c, "Unsupported log stream protobuf version.") 155 }.Errorf(c, "Unsupported log stream protobuf version.")
89 » » return errors.New("unsupported protobuf version") 156 » » return false, errors.New("unsupported log stream protobuf versio n")
157
90 case ls.Desc == nil: 158 case ls.Desc == nil:
91 » » return errors.New("missing descriptor") 159 » » log.Errorf(c, "Log stream did not include a descriptor.")
92 160 » » return false, errors.New("log stream did not include a descripto r")
93 » case ls.State.Purged: 161 » }
94 » » log.Warningf(c, "Log stream is purged.") 162
95 » » return nil 163 » // If the archival request is younger than the settle delay, kick it bac k to
96 » case ls.State.Archived: 164 » // retry later.
97 » » log.Infof(c, "Log stream is already archived.") 165 » age := ls.Age.Duration()
98 » » return nil 166 » if age < at.SettleDelay.Duration() {
99 » } 167 » » log.Fields{
100 168 » » » "age": age,
101 » // Deserialize and validate the descriptor protobuf. 169 » » » "settleDelay": at.SettleDelay.Duration(),
102 » var desc logpb.LogStreamDescriptor 170 » » }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
103 » if err := proto.Unmarshal(ls.Desc, &desc); err != nil { 171 » » return false, errors.New("log stream is within settle delay")
172 » }
173
174 » // Are we required to archive a complete log stream?
175 » complete := (age <= at.CompletePeriod.Duration())
176 » if complete && ls.State.TerminalIndex < 0 {
177 » » log.Warningf(c, "Cannot archive complete stream with no terminal index.")
178 » » return false, errors.New("completeness required, but stream has no terminal index")
179 » }
180
181 » ar := logdog.ArchiveStreamRequest{
182 » » Path: at.Path,
183 » }
184
185 » // Archive to staging.
186 » //
187 » // If a non-transient failure occurs here, we will report it to the Arch ivist
188 » // under the assumption that it will continue occurring.
189 » //
190 » // We will handle error creating the plan and executing the plan in the same
191 » // switch statement below.
192 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID())
193 » if err != nil {
194 » » log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
195 » } else {
196 » » err = staged.stage(c, complete)
197 » }
198
199 » switch {
200 » case errors.IsTransient(err):
201 » » // If this is a transient error, exit immediately and do not del ete the
202 » » // archival task.
203 » » log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
204 » » return false, err
205
206 » case err != nil:
207 » » // This is a non-transient error, so we are confident that any f uture
208 » » // Archival will also encounter this error. We will mark this ar chival
209 » » // as an error and report it to the Coordinator.
210 » » log.WithError(err).Errorf(c, "Archival failed with non-transient error.")
211 » » ar.Error = err.Error()
212 » » if ar.Error == "" {
213 » » » // This needs to be non-nil, so if our acutal error has an empty string,
214 » » » // fill in a generic message.
215 » » » ar.Error = "archival error"
216 » » }
217
218 » default:
219 » » // In case something fails, clean up our staged archival (best e ffort).
220 » » defer staged.cleanup(c)
221
222 » » // Finalize the archival. First, extend our lease to confirm tha t we still
223 » » // hold it.
224 » » if err := task.AssertLease(c); err != nil {
225 » » » log.WithError(err).Errorf(c, "Failed to extend task leas e before finalizing.")
226 » » » return false, err
227 » » }
228
229 » » // Finalize the archival.
230 » » if err := staged.finalize(c, a.GSClient, &ar); err != nil {
231 » » » log.WithError(err).Errorf(c, "Failed to finalize archiva l.")
232 » » » return false, err
233 » » }
234 » }
235
236 » log.Fields{
237 » » "streamURL": ar.StreamUrl,
238 » » "indexURL": ar.IndexUrl,
239 » » "dataURL": ar.DataUrl,
240 » » "terminalIndex": ar.TerminalIndex,
241 » » "logEntryCount": ar.LogEntryCount,
242 » » "hadError": ar.Error,
243 » » "complete": ar.Complete(),
244 » }.Debugf(c, "Finished archival round. Reporting archive state.")
245
246 » // Extend the lease again to confirm that we still hold it.
247 » if err := task.AssertLease(c); err != nil {
248 » » log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.")
249 » » return false, err
250 » }
251
252 » if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
253 » » log.WithError(err).Errorf(c, "Failed to report archive state.")
254 » » return false, err
255 » }
256
257 » // Archival is complete and acknowledged by Coordinator. Consume the arc hival
258 » // task.
259 » return true, nil
260 }
261
262 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
263 » *stagedArchival, error) {
264 » sa := stagedArchival{
265 » » Archivist: a,
266 » » path: path,
267
268 » » terminalIndex: ls.State.TerminalIndex,
269 » }
270
271 » // Deserialize and validate the descriptor protobuf. If this fails, it i s a
272 » // non-transient error.
273 » if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
104 log.Fields{ 274 log.Fields{
105 log.ErrorKey: err, 275 log.ErrorKey: err,
106 "protoVersion": ls.State.ProtoVersion, 276 "protoVersion": ls.State.ProtoVersion,
107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") 277 }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
108 » » return err 278 » » return nil, err
109 » } 279 » }
110 280
111 » task := &archiveTask{ 281 » bext := sa.desc.BinaryFileExt
112 » » Archivist: a,
113 » » ArchiveTask: t,
114 » » ls: ls,
115 » » desc: &desc,
116 » }
117 » if err := task.archive(c); err != nil {
118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati on.")
119 » » return err
120 » }
121 » log.Fields{
122 » » "streamURL": task.ar.StreamUrl,
123 » » "indexURL": task.ar.IndexUrl,
124 » » "dataURL": task.ar.DataUrl,
125 » » "terminalIndex": task.ar.TerminalIndex,
126 » » "complete": task.ar.Complete,
127 » }.Debugf(c, "Finished archive construction.")
128
129 » if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.")
131 » » return err
132 » }
133 » return nil
134 }
135
136 // archiveTask is the set of parameters for a single archival.
137 type archiveTask struct {
138 » *Archivist
139 » *logdog.ArchiveTask
140
141 » // ls is the log stream state.
142 » ls *logdog.LoadStreamResponse
143 » // desc is the unmarshaled log stream descriptor.
144 » desc *logpb.LogStreamDescriptor
145
146 » // ar will be populated during archive construction.
147 » ar logdog.ArchiveStreamRequest
148 }
149
150 // archiveState performs the archival operation on a stream described by a
151 // Coordinator State. Upon success, the State will be updated with the result
152 // of the archival operation.
153 func (t *archiveTask) archive(c context.Context) (err error) {
154 » // Generate our archival object managers.
155 » bext := t.desc.BinaryFileExt
156 if bext == "" { 282 if bext == "" {
157 bext = "bin" 283 bext = "bin"
158 } 284 }
159 285
160 » path := t.Path 286 » // Construct our staged archival paths.
161 » var streamO, indexO, dataO *gsObject 287 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
162 » streamO, err = t.newGSObject(c, path, "logstream.entries") 288 » sa.index = a.makeStagingPaths(path, "logstream.index", uid)
163 » if err != nil { 289 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
164 » » log.WithError(err).Errorf(c, "Failed to create log object.") 290 » return &sa, nil
165 » » return 291 }
166 » } 292
167 293 // makeStagingPaths returns a stagingPaths instance for the given path and
168 » indexO, err = t.newGSObject(c, path, "logstream.index") 294 // file name. It incorporates a unique ID into the staging name to differentiate
169 » if err != nil { 295 // it from other staging paths for the same path/name.
170 » » log.WithError(err).Errorf(c, "Failed to create index object.") 296 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st agingPaths {
171 » » return 297 » return stagingPaths{
172 » } 298 » » staged: a.GSStagingBase.Concat(string(path), uid, name),
173 299 » » final: a.GSBase.Concat(string(path), name),
174 » dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext)) 300 » }
175 » if err != nil { 301 }
176 » » log.WithError(err).Errorf(c, "Failed to create data object.") 302
177 » » return 303 type stagedArchival struct {
178 » } 304 » *Archivist
179 305
180 » // Load the URLs into our state. 306 » path types.StreamPath
181 » t.ar.StreamUrl = streamO.url 307 » desc logpb.LogStreamDescriptor
182 » t.ar.IndexUrl = indexO.url 308
183 » t.ar.DataUrl = dataO.url 309 » stream stagingPaths
184 310 » streamSize int64
311
312 » index stagingPaths
313 » indexSize int64
314
315 » data stagingPaths
316 » dataSize int64
317
318 » finalized bool
319 » terminalIndex int64
320 » logEntryCount int64
321 }
322
323 // stage executes the archival process, archiving to the staged storage paths.
324 //
325 // If stage fails, it may return a transient error.
326 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
185 log.Fields{ 327 log.Fields{
186 » » "streamURL": t.ar.StreamUrl, 328 » » "streamURL": sa.stream.staged,
187 » » "indexURL": t.ar.IndexUrl, 329 » » "indexURL": sa.index.staged,
188 » » "dataURL": t.ar.DataUrl, 330 » » "dataURL": sa.data.staged,
189 » }.Infof(c, "Archiving log stream...") 331 » }.Debugf(c, "Staging log stream...")
190 332
191 » // We want to try and delete any GS objects that were created during a f ailed 333 » // Group any transient errors that occur during cleanup. If we aren't
192 » // archival attempt. 334 » // returning a non-transient error, return a transient "terr".
193 » deleteOnFail := func(o *gsObject) { 335 » var terr errors.MultiError
194 » » if o == nil || err == nil { 336 » defer func() {
195 » » » return 337 » » if err == nil && len(terr) > 0 {
196 » » } 338 » » » err = errors.WrapTransient(terr)
197 » » if ierr := o.delete(); ierr != nil { 339 » » }
340 » }()
341
342 » // Close our writers on exit. If any of them fail to close, mark the arc hival
343 » // as a transient failure.
344 » closeWriter := func(closer io.Closer, path gs.Path) {
345 » » // Close the Writer. If this results in an error, append it to o ur transient
346 » » // error MultiError.
347 » » if ierr := closer.Close(); ierr != nil {
348 » » » terr = append(terr, ierr)
349 » » }
350
351 » » // If we have an archival error, also delete the path associated with this
352 » » // stream. This is a non-fatal failure, since we've already hit a fatal
353 » » // one.
354 » » if err != nil || len(terr) > 0 {
355 » » » if ierr := sa.GSClient.Delete(path); ierr != nil {
356 » » » » log.Fields{
357 » » » » » log.ErrorKey: ierr,
358 » » » » » "path": path,
359 » » » » }.Warningf(c, "Failed to delete stream on error. ")
360 » » » }
361 » » }
362 » }
363
364 » // createWriter is a shorthand function for creating a writer to a path and
365 » // reporting an error if it failed.
366 » createWriter := func(p gs.Path) (gs.Writer, error) {
367 » » w, ierr := sa.GSClient.NewWriter(p)
368 » » if ierr != nil {
198 log.Fields{ 369 log.Fields{
199 log.ErrorKey: ierr, 370 log.ErrorKey: ierr,
200 » » » » "url": o.url, 371 » » » » "path": p,
201 » » » }.Warningf(c, "Failed to clean-up GS object on failure." ) 372 » » » }.Errorf(c, "Failed to create writer.")
202 » » } 373 » » » return nil, ierr
203 » } 374 » » }
204 » defer deleteOnFail(streamO) 375 » » return w, nil
205 » defer deleteOnFail(indexO) 376 » }
206 » defer deleteOnFail(dataO) 377
207 378 » var streamWriter, indexWriter, dataWriter gs.Writer
208 » // Close our GS object managers on exit. If any of them fail to close, m arh 379 » if streamWriter, err = createWriter(sa.stream.staged); err != nil {
209 » // the archival as a failure. 380 » » return
210 » closeOM := func(o *gsObject) { 381 » }
211 » » if o == nil { 382 » defer closeWriter(streamWriter, sa.stream.staged)
212 » » » return 383
213 » » } 384 » if indexWriter, err = createWriter(sa.index.staged); err != nil {
214 » » if ierr := o.Close(); ierr != nil { 385 » » return err
215 » » » err = ierr 386 » }
216 » » } 387 » defer closeWriter(indexWriter, sa.index.staged)
217 » } 388
218 » defer closeOM(streamO) 389 » if dataWriter, err = createWriter(sa.data.staged); err != nil {
219 » defer closeOM(indexO) 390 » » return err
220 » defer closeOM(dataO) 391 » }
392 » defer closeWriter(dataWriter, sa.data.staged)
221 393
222 // Read our log entries from intermediate storage. 394 // Read our log entries from intermediate storage.
223 ss := storageSource{ 395 ss := storageSource{
224 Context: c, 396 Context: c,
225 » » st: t.Storage, 397 » » st: sa.Storage,
226 » » path: types.StreamPath(t.Path), 398 » » path: sa.path,
227 » » contiguous: t.Complete, 399 » » contiguous: complete,
228 » » terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), 400 » » terminalIndex: types.MessageIndex(sa.terminalIndex),
229 lastIndex: -1, 401 lastIndex: -1,
230 } 402 }
231 403
232 m := archive.Manifest{ 404 m := archive.Manifest{
233 » » Desc: t.desc, 405 » » Desc: &sa.desc,
234 Source: &ss, 406 Source: &ss,
235 » » LogWriter: streamO, 407 » » LogWriter: streamWriter,
236 » » IndexWriter: indexO, 408 » » IndexWriter: indexWriter,
237 » » DataWriter: dataO, 409 » » DataWriter: dataWriter,
238 » » StreamIndexRange: t.StreamIndexRange, 410 » » StreamIndexRange: sa.StreamIndexRange,
239 » » PrefixIndexRange: t.PrefixIndexRange, 411 » » PrefixIndexRange: sa.PrefixIndexRange,
240 » » ByteRange: t.ByteRange, 412 » » ByteRange: sa.ByteRange,
241 413
242 Logger: log.Get(c), 414 Logger: log.Get(c),
243 } 415 }
244 » err = archive.Archive(m) 416 » if err = archive.Archive(m); err != nil {
245 » if err != nil {
246 log.WithError(err).Errorf(c, "Failed to archive log stream.") 417 log.WithError(err).Errorf(c, "Failed to archive log stream.")
247 return 418 return
248 } 419 }
249 420
250 » t.ar.TerminalIndex = int64(ss.lastIndex) 421 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
251 » if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { 422 » » // Fail if we were requested to archive only the complete log. W e consider
252 » » // Fail, if we were requested to archive only the complete log. 423 » » // this a transient error with the expectation that the missing entries will
253 » » if t.Complete { 424 » » // show up in future retries.
254 » » » log.Fields{ 425 » » switch {
255 » » » » "terminalIndex": tidx, 426 » » case complete && ss.hasMissingEntries:
256 » » » » "lastIndex": t.ar.TerminalIndex, 427 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.")
257 » » » }.Errorf(c, "Log stream archival stopped prior to termin al index.") 428 » » » err = errors.WrapTransient(errors.New("stream has missin g entries"))
258 » » » return errors.New("stream finished short of terminal ind ex") 429 » » » return
259 » » } 430
260 431 » » case ss.logEntryCount == 0:
261 » » if t.ar.TerminalIndex < 0 {
262 // If our last log index was <0, then no logs were archi ved. 432 // If our last log index was <0, then no logs were archi ved.
263 log.Warningf(c, "No log entries were archived.") 433 log.Warningf(c, "No log entries were archived.")
264 » » } else { 434
435 » » default:
265 // Update our terminal index. 436 // Update our terminal index.
266 log.Fields{ 437 log.Fields{
267 » » » » "from": tidx, 438 » » » » "terminalIndex": ss.lastIndex,
268 » » » » "to": t.ar.TerminalIndex, 439 » » » » "logEntryCount": ss.logEntryCount,
269 » » » }.Infof(c, "Updated log stream terminal index.") 440 » » » » "hasMissingEntries": ss.hasMissingEntries,
441 » » » }.Debugf(c, "Finished archiving log stream.")
270 } 442 }
271 } 443 }
272 444
273 // Update our state with archival results. 445 // Update our state with archival results.
274 » t.ar.Path = t.Path 446 » sa.terminalIndex = int64(ss.lastIndex)
275 » t.ar.StreamSize = streamO.Count() 447 » sa.logEntryCount = ss.logEntryCount
276 » t.ar.IndexSize = indexO.Count() 448 » sa.stream.count = streamWriter.Count()
277 » t.ar.DataSize = dataO.Count() 449 » sa.index.count = indexWriter.Count()
278 » t.ar.Complete = !ss.hasMissingEntries 450 » sa.data.count = dataWriter.Count()
279 return 451 return
280 } 452 }
281 453
282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { 454 type stagingPaths struct {
283 » p := t.GSBase.Concat(path, name) 455 » staged gs.Path
284 » o := gsObject{ 456 » final gs.Path
285 » » gs: t.GSClient, 457 » count int64
286 » » bucket: p.Bucket(), 458 }
287 » » path: p.Filename(), 459
288 » } 460 func (d *stagingPaths) clearStaged() {
289 461 » d.staged = ""
290 » // Build our GS URL. Note that since buildGSPath joins with "/", the ini tial 462 }
291 » // token, "gs:/", will become "gs://". 463
292 » o.url = string(p) 464 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd og.ArchiveStreamRequest) error {
293 465 » err := parallel.FanOutIn(func(taskC chan<- func() error) {
294 » var err error 466 » » for _, d := range sa.getStagingPaths() {
295 » o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path) 467 » » » d := d
468
469 » » » // Don't copy zero-sized streams.
470 » » » if d.count == 0 {
471 » » » » continue
472 » » » }
473
474 » » » taskC <- func() error {
475 » » » » if err := client.Rename(d.staged, d.final); err != nil {
476 » » » » » log.Fields{
477 » » » » » » log.ErrorKey: err,
478 » » » » » » "stagedPath": d.staged,
479 » » » » » » "finalPath": d.final,
480 » » » » » }.Errorf(c, "Failed to rename GS object. ")
481 » » » » » return err
482 » » » » }
483
484 » » » » // Clear the staged value to indicate that it no longer exists.
485 » » » » d.clearStaged()
486 » » » » return nil
487 » » » }
488 » » }
489 » })
296 if err != nil { 490 if err != nil {
297 » » log.Fields{ 491 » » return err
298 » » » log.ErrorKey: err, 492 » }
299 » » » "url": o.url, 493
300 » » }.Errorf(c, "Failed to create Writer.") 494 » ar.TerminalIndex = sa.terminalIndex
301 » » return nil, err 495 » ar.LogEntryCount = sa.logEntryCount
302 » } 496 » ar.StreamUrl = string(sa.stream.final)
303 497 » ar.StreamSize = sa.stream.count
304 » // Delete any existing object at this path. 498 » ar.IndexUrl = string(sa.index.final)
305 » if err := o.delete(); err != nil { 499 » ar.IndexSize = sa.index.count
306 » » closeErr := o.Close() 500 » ar.DataUrl = string(sa.data.final)
307 501 » ar.DataSize = sa.data.count
308 » » log.Fields{ 502 » return nil
309 » » » log.ErrorKey: err, 503 }
310 » » » "closeErr": closeErr, 504
311 » » » "url": o.url, 505 func (sa *stagedArchival) cleanup(c context.Context) {
312 » » }.Errorf(c, "Could not delete object during creation.") 506 » for _, d := range sa.getStagingPaths() {
313 » » return nil, err 507 » » if d.staged == "" {
314 » } 508 » » » continue
315 » return &o, nil 509 » » }
316 } 510
317 511 » » if err := sa.GSClient.Delete(d.staged); err != nil {
318 // gsObjectManger wraps a gsObject instance with metadata.
319 type gsObject struct {
320 » gs.Writer
321
322 » // gs is the Client instance.
323 » gs gs.Client
324 » // bucket is the name of the object's bucket.
325 » bucket string
326 » // path is the bucket-relative path of the object.
327 » path string
328 » // url is the Google Storage URL (gs://) of this object.
329 » url string
330 }
331
332 func (o *gsObject) delete() error {
333 » return o.gs.Delete(o.bucket, o.path)
334 }
335
336 // storageSource is an archive.LogEntrySource that pulls log entries from
337 // intermediate storage via its storage.Storage instance.
338 type storageSource struct {
339 » context.Context
340
341 » st storage.Storage // the storage instance to read from
342 » path types.StreamPath // the path of the log stream
343 » contiguous bool // if true, enforce contiguous entries
344 » terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
345
346 » buf []*logpb.LogEntry
347 » lastIndex types.MessageIndex
348 » hasMissingEntries bool // true if some log entries were missing.
349 }
350
351 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
352 » bytes := 0
353
354 » req := storage.GetRequest{
355 » » Path: s.path,
356 » » Index: start,
357 » }
358 » return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
359 » » le := logpb.LogEntry{}
360 » » if err := proto.Unmarshal(d, &le); err != nil {
361 log.Fields{ 512 log.Fields{
362 » » » » log.ErrorKey: err, 513 » » » » log.ErrorKey: err,
363 » » » » "streamIndex": idx, 514 » » » » "path": d.staged,
364 » » » }.Errorf(s, "Failed to unmarshal LogEntry.") 515 » » » }.Warningf(c, "Failed to clean up staged path.")
365 » » » return false 516 » » }
366 » » } 517
367 » » s.buf = append(s.buf, &le) 518 » » d.clearStaged()
368 519 » }
369 » » // Stop loading if we've reached or exceeded our buffer size. 520 }
370 » » bytes += len(d) 521
371 » » return bytes < storageBufferSize 522 func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
372 » }) 523 » return []*stagingPaths{
373 } 524 » » &sa.stream,
374 525 » » &sa.index,
375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { 526 » » &sa.data,
376 » if len(s.buf) == 0 { 527 » }
377 » » s.buf = s.buf[:0] 528 }
378 » » if err := s.bufferEntries(s.lastIndex + 1); err != nil {
379 » » » if err == storage.ErrDoesNotExist {
380 » » » » log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
381 » » » » return nil, archive.ErrEndOfStream
382 » » » }
383
384 » » » log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.")
385 » » » return nil, err
386 » » }
387 » }
388
389 » if len(s.buf) == 0 {
390 » » log.Fields{
391 » » » "lastIndex": s.lastIndex,
392 » » }.Debugf(s, "Encountered end of stream.")
393 » » return nil, archive.ErrEndOfStream
394 » }
395
396 » var le *logpb.LogEntry
397 » le, s.buf = s.buf[0], s.buf[1:]
398
399 » // If we're enforcing a contiguous log stream, error if this LogEntry is not
400 » // contiguous.
401 » sidx := types.MessageIndex(le.StreamIndex)
402 » nidx := (s.lastIndex + 1)
403 » if sidx != nidx {
404 » » s.hasMissingEntries = true
405
406 » » if s.contiguous {
407 » » » log.Fields{
408 » » » » "index": sidx,
409 » » » » "nextIndex": nidx,
410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing." )
411 » » » return nil, errors.New("non-contiguous log stream")
412 » » }
413 » }
414
415 » // If we're enforcing a maximum terminal index, return end of stream if this
416 » // LogEntry exceeds that index.
417 » if s.terminalIndex >= 0 && sidx > s.terminalIndex {
418 » » log.Fields{
419 » » » "index": sidx,
420 » » » "terminalIndex": s.terminalIndex,
421 » » }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
422 » » return nil, archive.ErrEndOfStream
423 » }
424
425 » s.lastIndex = sidx
426 » return le, nil
427 }
OLDNEW
« no previous file with comments | « server/cmd/logdog_collector/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698