Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1378)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes"
9 "encoding/hex"
8 "fmt" 10 "fmt"
11 "io"
9 12
10 "github.com/golang/protobuf/proto" 13 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
12 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/gcloud/gs" 16 "github.com/luci/luci-go/common/gcloud/gs"
14 "github.com/luci/luci-go/common/logdog/types" 17 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging" 18 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel"
16 "github.com/luci/luci-go/common/proto/logdog/logpb" 20 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "github.com/luci/luci-go/server/logdog/archive" 21 "github.com/luci/luci-go/server/logdog/archive"
18 "github.com/luci/luci-go/server/logdog/storage" 22 "github.com/luci/luci-go/server/logdog/storage"
19 "golang.org/x/net/context" 23 "golang.org/x/net/context"
20 ) 24 )
21 25
26 // Task is a single archive task.
27 type Task interface {
28 // UniqueID returns a task-unique value. Other tasks, and other retries of
29 // this task, should (try to) not reuse this ID.
30 UniqueID() string
31
32 // Data is the archive task's data content.
33 Data() []byte
34
35 // AssertLease asserts that the lease for this Task is still held.
36 //
37 // On failure, it will return an error. If successful, the Archivist may
38 // assume that it holds the lease longer.
39 AssertLease(context.Context) error
40 }
41
22 // Archivist is a stateless configuration capable of archiving individual log 42 // Archivist is a stateless configuration capable of archiving individual log
23 // streams. 43 // streams.
24 type Archivist struct { 44 type Archivist struct {
25 // Service is the client to use to communicate with Coordinator's Servic es 45 // Service is the client to use to communicate with Coordinator's Servic es
26 // endpoint. 46 // endpoint.
27 Service logdog.ServicesClient 47 Service logdog.ServicesClient
28 48
29 // Storage is the intermediate storage instance to use to pull log entri es for 49 // Storage is the intermediate storage instance to use to pull log entri es for
30 // archival. 50 // archival.
31 Storage storage.Storage 51 Storage storage.Storage
32 52
33 // GSClient is the Google Storage client to for archive generation. 53 // GSClient is the Google Storage client to for archive generation.
34 GSClient gs.Client 54 GSClient gs.Client
35 55
36 // GSBase is the base Google Storage path. This includes the bucket name 56 // GSBase is the base Google Storage path. This includes the bucket name
37 // and any associated path. 57 // and any associated path.
38 GSBase gs.Path 58 GSBase gs.Path
59 // GSStagingBase is the base Google Storage path for archive staging. Th is
60 // includes the bucket name and any associated path.
61 GSStagingBase gs.Path
62
39 // PrefixIndexRange is the maximum number of stream indexes in between i ndex 63 // PrefixIndexRange is the maximum number of stream indexes in between i ndex
40 // entries. See archive.Manifest for more information. 64 // entries. See archive.Manifest for more information.
41 StreamIndexRange int 65 StreamIndexRange int
42 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex 66 // PrefixIndexRange is the maximum number of prefix indexes in between i ndex
43 // entries. See archive.Manifest for more information. 67 // entries. See archive.Manifest for more information.
44 PrefixIndexRange int 68 PrefixIndexRange int
45 // ByteRange is the maximum number of stream data bytes in between index 69 // ByteRange is the maximum number of stream data bytes in between index
46 // entries. See archive.Manifest for more information. 70 // entries. See archive.Manifest for more information.
47 ByteRange int 71 ByteRange int
48 } 72 }
49 73
50 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used 74 // storageBufferSize is the size, in bytes, of the LogEntry buffer that is used
51 // to during archival. This should be greater than the maximum LogEntry size. 75 // to during archival. This should be greater than the maximum LogEntry size.
52 const storageBufferSize = types.MaxLogEntryDataSize * 64 76 const storageBufferSize = types.MaxLogEntryDataSize * 64
53 77
54 // ArchiveTask processes and executes a single log stream archive task. 78 // ArchiveTask processes and executes a single log stream archive task.
55 func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error {
56 var task logdog.ArchiveTask
57 if err := proto.Unmarshal(desc, &task); err != nil {
58 log.WithError(err).Errorf(c, "Failed to decode archive task.")
59 return err
60 }
61 return a.Archive(c, &task)
62 }
63
64 // Archive archives a single log stream. If unsuccessful, an error is returned.
65 // 79 //
66 // This error may be wrapped in errors.Transient if it is believed to have been 80 // It returns true on success (delete the task) and false on failure (don't
67 // caused by a transient failure. 81 // delete the task). The return value of true should only be used if the task
82 // is truly complete and acknowledged by the Coordinator.
68 // 83 //
69 // If the supplied Context is Done, operation may terminate before completion, 84 // If the supplied Context is Done, operation may terminate before completion,
70 // returning the Context's error. 85 // returning the Context's error.
71 func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { 86 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
dnj 2016/04/11 17:20:04 I re-wrote most of this to use a staging space and
87 » delete, _ := a.archiveTaskImpl(c, task)
88 » return delete
89 }
90
91 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes
92 // an error. The error is useful for testing to assert that certain conditions
93 // were hit.
94 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
95 » // If we can't decode the archival task, we can't decide whether or not to
96 » // delete it, so we will leave it in the queue.
97 » var at logdog.ArchiveTask
98 » if err := proto.Unmarshal(task.Data(), &at); err != nil {
99 » » log.WithError(err).Errorf(c, "Failed to decode archive task.")
100 » » return false, err
101 » }
102
103 » log.Fields{
104 » » "path": at.Path,
105 » }.Debugf(c, "Received archival task.")
106
72 // Load the log stream's current state. If it is already archived, we wi ll 107 // Load the log stream's current state. If it is already archived, we wi ll
73 // return an immediate success. 108 // return an immediate success.
74 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ 109 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
75 » » Path: t.Path, 110 » » Path: at.Path,
76 Desc: true, 111 Desc: true,
77 }) 112 })
78 switch { 113 switch {
79 case err != nil: 114 case err != nil:
80 log.WithError(err).Errorf(c, "Failed to load log stream.") 115 log.WithError(err).Errorf(c, "Failed to load log stream.")
81 » » return err 116 » » return false, err
117
82 case ls.State == nil: 118 case ls.State == nil:
83 » » return errors.New("missing state") 119 » » log.Errorf(c, "Log stream did not include state.")
120 » » return false, errors.New("log stream did not include state")
121
122 » case ls.State.Purged:
123 » » log.Warningf(c, "Log stream is purged. Discarding archival reque st.")
124 » » return true, errors.New("log stream is purged")
125
126 » case ls.State.Archived:
127 » » log.Infof(c, "Log stream is already archived. Discarding archiva l request.")
128 » » return true, errors.New("log stream is archived")
129
130 » case !bytes.Equal(ls.ArchivalKey, at.Key):
131 » » if len(ls.ArchivalKey) == 0 {
132 » » » // The log stream is not registering as "archive pending " state.
133 » » » //
134 » » » // This can happen if the eventually-consistent datastor e hasn't updated
135 » » » // its log stream state by the time this Pub/Sub task is received. In
136 » » » // this case, we will continue retrying the task until d atastore registers
137 » » » // that some key is associated with it.
138 » » » log.Fields{
139 » » » » "logStreamArchivalKey": hex.EncodeToString(ls.Ar chivalKey),
140 » » » » "requestArchivalKey": hex.EncodeToString(at.Ke y),
141 » » » }.Infof(c, "Archival request received before log stream has its key.")
142 » » » return false, errors.New("premature archival request")
143 » » }
144
145 » » // This can happen if a Pub/Sub message is dispatched during a t ransaction,
146 » » // but that specific transaction failed. In this case, the Pub/S ub message
147 » » // will have a key that doesn't match the key that was transacti onally
148 » » // encoded, and can be discarded.
149 » » log.Fields{
150 » » » "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKe y),
151 » » » "requestArchivalKey": hex.EncodeToString(at.Key),
152 » » }.Infof(c, "Superfluous archival request (keys do not match). Di scarding.")
153 » » return true, errors.New("superfluous archival request")
154
84 case ls.State.ProtoVersion != logpb.Version: 155 case ls.State.ProtoVersion != logpb.Version:
85 log.Fields{ 156 log.Fields{
86 "protoVersion": ls.State.ProtoVersion, 157 "protoVersion": ls.State.ProtoVersion,
87 "expectedVersion": logpb.Version, 158 "expectedVersion": logpb.Version,
88 }.Errorf(c, "Unsupported log stream protobuf version.") 159 }.Errorf(c, "Unsupported log stream protobuf version.")
89 » » return errors.New("unsupported protobuf version") 160 » » return false, errors.New("unsupported log stream protobuf versio n")
161
90 case ls.Desc == nil: 162 case ls.Desc == nil:
91 » » return errors.New("missing descriptor") 163 » » log.Errorf(c, "Log stream did not include a descriptor.")
92 164 » » return false, errors.New("log stream did not include a descripto r")
93 » case ls.State.Purged: 165 » }
94 » » log.Warningf(c, "Log stream is purged.") 166
95 » » return nil 167 » // If the archival request is younger than the settle delay, kick it bac k to
96 » case ls.State.Archived: 168 » // retry later.
97 » » log.Infof(c, "Log stream is already archived.") 169 » age := ls.Age.Duration()
98 » » return nil 170 » if age < at.SettleDelay.Duration() {
99 » } 171 » » log.Fields{
100 172 » » » "age": age,
101 » // Deserialize and validate the descriptor protobuf. 173 » » » "settleDelay": at.SettleDelay.Duration(),
102 » var desc logpb.LogStreamDescriptor 174 » » }.Infof(c, "Log stream is younger than the settle delay. Returni ng task to queue.")
103 » if err := proto.Unmarshal(ls.Desc, &desc); err != nil { 175 » » return false, errors.New("log stream is within settle delay")
176 » }
177
178 » // Are we required to archive a complete log stream?
179 » complete := (age <= at.CompletePeriod.Duration())
180 » if complete && ls.State.TerminalIndex < 0 {
181 » » log.Warningf(c, "Cannot archive complete stream with no terminal index.")
182 » » return false, errors.New("completeness required, but stream has no terminal index")
183 » }
184
185 » ar := logdog.ArchiveStreamRequest{
186 » » Path: at.Path,
187 » }
188
189 » // Archive to staging.
190 » //
191 » // If a non-transient failure occurs here, we will report it to the Arch ivist
192 » // under the assumption that it will continue occurring.
193 » //
194 » // We will handle error creating the plan and executing the plan in the same
195 » // switch statement below.
196 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID())
197 » if err != nil {
198 » » log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
199 » } else {
200 » » err = staged.stage(c, complete)
201 » }
202
203 » switch {
204 » case errors.IsTransient(err):
205 » » // If this is a transient error, exit immediately and do not del ete the
206 » » // archival task.
207 » » log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
208 » » return false, err
209
210 » case err != nil:
211 » » // This is a non-transient error, so we are confident that any f uture
212 » » // Archival will also encounter this error. We will mark this ar chival
213 » » // as an error and report it to the Coordinator.
214 » » log.WithError(err).Errorf(c, "Archival failed with non-transient error.")
215 » » ar.Error = err.Error()
216 » » if ar.Error == "" {
217 » » » // This needs to be non-nil, so if our acutal error has an empty string,
218 » » » // fill in a generic message.
219 » » » ar.Error = "archival error"
220 » » }
221
222 » default:
223 » » // In case something fails, clean up our staged archival (best e ffort).
224 » » defer staged.cleanup(c)
225
226 » » // Finalize the archival. First, extend our lease to confirm tha t we still
227 » » // hold it.
228 » » if err := task.AssertLease(c); err != nil {
229 » » » log.WithError(err).Errorf(c, "Failed to extend task leas e before finalizing.")
230 » » » return false, err
231 » » }
232
233 » » // Finalize the archival.
234 » » if err := staged.finalize(c, a.GSClient, &ar); err != nil {
235 » » » log.WithError(err).Errorf(c, "Failed to finalize archiva l.")
236 » » » return false, err
237 » » }
238 » }
239
240 » log.Fields{
241 » » "streamURL": ar.StreamUrl,
242 » » "indexURL": ar.IndexUrl,
243 » » "dataURL": ar.DataUrl,
244 » » "terminalIndex": ar.TerminalIndex,
245 » » "logEntryCount": ar.LogEntryCount,
246 » » "hadError": ar.Error,
247 » » "complete": ar.Complete(),
248 » }.Debugf(c, "Finished archival round. Reporting archive state.")
249
250 » // Extend the lease again to confirm that we still hold it.
251 » if err := task.AssertLease(c); err != nil {
252 » » log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.")
253 » » return false, err
254 » }
255
256 » if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
257 » » log.WithError(err).Errorf(c, "Failed to report archive state.")
258 » » return false, err
259 » }
260
261 » // Archival is complete and acknowledged by Coordinator. Consume the arc hival
262 » // task.
263 » return true, nil
264 }
265
266 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
267 » *stagedArchival, error) {
268 » sa := stagedArchival{
269 » » Archivist: a,
270 » » path: path,
271
272 » » terminalIndex: ls.State.TerminalIndex,
273 » }
274
275 » // Deserialize and validate the descriptor protobuf. If this fails, it i s a
276 » // non-transient error.
277 » if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
104 log.Fields{ 278 log.Fields{
105 log.ErrorKey: err, 279 log.ErrorKey: err,
106 "protoVersion": ls.State.ProtoVersion, 280 "protoVersion": ls.State.ProtoVersion,
107 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") 281 }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
108 » » return err 282 » » return nil, err
109 » } 283 » }
110 284
111 » task := &archiveTask{ 285 » bext := sa.desc.BinaryFileExt
112 » » Archivist: a,
113 » » ArchiveTask: t,
114 » » ls: ls,
115 » » desc: &desc,
116 » }
117 » if err := task.archive(c); err != nil {
118 » » log.WithError(err).Errorf(c, "Failed to perform archival operati on.")
119 » » return err
120 » }
121 » log.Fields{
122 » » "streamURL": task.ar.StreamUrl,
123 » » "indexURL": task.ar.IndexUrl,
124 » » "dataURL": task.ar.DataUrl,
125 » » "terminalIndex": task.ar.TerminalIndex,
126 » » "complete": task.ar.Complete,
127 » }.Debugf(c, "Finished archive construction.")
128
129 » if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
130 » » log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.")
131 » » return err
132 » }
133 » return nil
134 }
135
136 // archiveTask is the set of parameters for a single archival.
137 type archiveTask struct {
138 » *Archivist
139 » *logdog.ArchiveTask
140
141 » // ls is the log stream state.
142 » ls *logdog.LoadStreamResponse
143 » // desc is the unmarshaled log stream descriptor.
144 » desc *logpb.LogStreamDescriptor
145
146 » // ar will be populated during archive construction.
147 » ar logdog.ArchiveStreamRequest
148 }
149
150 // archiveState performs the archival operation on a stream described by a
151 // Coordinator State. Upon success, the State will be updated with the result
152 // of the archival operation.
153 func (t *archiveTask) archive(c context.Context) (err error) {
154 » // Generate our archival object managers.
155 » bext := t.desc.BinaryFileExt
156 if bext == "" { 286 if bext == "" {
157 bext = "bin" 287 bext = "bin"
158 } 288 }
159 289
160 » path := t.Path 290 » // Construct our staged archival paths.
161 » var streamO, indexO, dataO *gsObject 291 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
162 » streamO, err = t.newGSObject(c, path, "logstream.entries") 292 » sa.index = a.makeStagingPaths(path, "logstream.index", uid)
163 » if err != nil { 293 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
164 » » log.WithError(err).Errorf(c, "Failed to create log object.") 294 » return &sa, nil
165 » » return 295 }
166 » } 296
167 297 // makeStagingPaths returns a stagingPaths instance for the given path and
168 » indexO, err = t.newGSObject(c, path, "logstream.index") 298 // file name. It incorporates a unique ID into the staging name to differentiate
169 » if err != nil { 299 // it from other staging paths for the same path/name.
170 » » log.WithError(err).Errorf(c, "Failed to create index object.") 300 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st agingPaths {
171 » » return 301 » return stagingPaths{
172 » } 302 » » staged: a.GSStagingBase.Concat(string(path), uid, name),
173 303 » » final: a.GSBase.Concat(string(path), name),
174 » dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext)) 304 » }
175 » if err != nil { 305 }
176 » » log.WithError(err).Errorf(c, "Failed to create data object.") 306
177 » » return 307 type stagedArchival struct {
178 » } 308 » *Archivist
179 309
180 » // Load the URLs into our state. 310 » path types.StreamPath
181 » t.ar.StreamUrl = streamO.url 311 » desc logpb.LogStreamDescriptor
182 » t.ar.IndexUrl = indexO.url 312
183 » t.ar.DataUrl = dataO.url 313 » stream stagingPaths
184 314 » streamSize int64
315
316 » index stagingPaths
317 » indexSize int64
318
319 » data stagingPaths
320 » dataSize int64
321
322 » finalized bool
323 » terminalIndex int64
324 » logEntryCount int64
325 }
326
327 // stage executes the archival process, archiving to the staged storage paths.
328 //
329 // If stage fails, it may return a transient error.
330 func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
185 log.Fields{ 331 log.Fields{
186 » » "streamURL": t.ar.StreamUrl, 332 » » "streamURL": sa.stream.staged,
187 » » "indexURL": t.ar.IndexUrl, 333 » » "indexURL": sa.index.staged,
188 » » "dataURL": t.ar.DataUrl, 334 » » "dataURL": sa.data.staged,
189 » }.Infof(c, "Archiving log stream...") 335 » }.Debugf(c, "Staging log stream...")
190 336
191 » // We want to try and delete any GS objects that were created during a f ailed 337 » // Group any transient errors that occur during cleanup. If we aren't
192 » // archival attempt. 338 » // returning a non-transient error, return a transient "terr".
193 » deleteOnFail := func(o *gsObject) { 339 » var terr errors.MultiError
194 » » if o == nil || err == nil { 340 » defer func() {
195 » » » return 341 » » if err == nil && len(terr) > 0 {
196 » » } 342 » » » err = errors.WrapTransient(terr)
197 » » if ierr := o.delete(); ierr != nil { 343 » » }
344 » }()
345
346 » // Close our writers on exit. If any of them fail to close, mark the arc hival
347 » // as a transient failure.
348 » closeWriter := func(closer io.Closer, path gs.Path) {
349 » » // Close the Writer. If this results in an error, append it to o ur transient
350 » » // error MultiError.
351 » » if ierr := closer.Close(); ierr != nil {
352 » » » terr = append(terr, ierr)
353 » » }
354
355 » » // If we have an archival error, also delete the path associated with this
356 » » // stream. This is a non-fatal failure, since we've already hit a fatal
357 » » // one.
358 » » if err != nil || len(terr) > 0 {
359 » » » if ierr := sa.GSClient.Delete(path); ierr != nil {
360 » » » » log.Fields{
361 » » » » » log.ErrorKey: ierr,
362 » » » » » "path": path,
363 » » » » }.Warningf(c, "Failed to delete stream on error. ")
364 » » » }
365 » » }
366 » }
367
368 » // createWriter is a shorthand function for creating a writer to a path and
369 » // reporting an error if it failed.
370 » createWriter := func(p gs.Path) (gs.Writer, error) {
371 » » w, ierr := sa.GSClient.NewWriter(p)
372 » » if ierr != nil {
198 log.Fields{ 373 log.Fields{
199 log.ErrorKey: ierr, 374 log.ErrorKey: ierr,
200 » » » » "url": o.url, 375 » » » » "path": p,
201 » » » }.Warningf(c, "Failed to clean-up GS object on failure." ) 376 » » » }.Errorf(c, "Failed to create writer.")
202 » » } 377 » » » return nil, ierr
203 » } 378 » » }
204 » defer deleteOnFail(streamO) 379 » » return w, nil
205 » defer deleteOnFail(indexO) 380 » }
206 » defer deleteOnFail(dataO) 381
207 382 » var streamWriter, indexWriter, dataWriter gs.Writer
208 » // Close our GS object managers on exit. If any of them fail to close, m arh 383 » if streamWriter, err = createWriter(sa.stream.staged); err != nil {
209 » // the archival as a failure. 384 » » return
210 » closeOM := func(o *gsObject) { 385 » }
211 » » if o == nil { 386 » defer closeWriter(streamWriter, sa.stream.staged)
212 » » » return 387
213 » » } 388 » if indexWriter, err = createWriter(sa.index.staged); err != nil {
214 » » if ierr := o.Close(); ierr != nil { 389 » » return err
215 » » » err = ierr 390 » }
216 » » } 391 » defer closeWriter(indexWriter, sa.index.staged)
217 » } 392
218 » defer closeOM(streamO) 393 » if dataWriter, err = createWriter(sa.data.staged); err != nil {
219 » defer closeOM(indexO) 394 » » return err
220 » defer closeOM(dataO) 395 » }
396 » defer closeWriter(dataWriter, sa.data.staged)
221 397
222 // Read our log entries from intermediate storage. 398 // Read our log entries from intermediate storage.
223 ss := storageSource{ 399 ss := storageSource{
224 Context: c, 400 Context: c,
225 » » st: t.Storage, 401 » » st: sa.Storage,
226 » » path: types.StreamPath(t.Path), 402 » » path: sa.path,
227 » » contiguous: t.Complete, 403 » » contiguous: complete,
228 » » terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), 404 » » terminalIndex: types.MessageIndex(sa.terminalIndex),
229 lastIndex: -1, 405 lastIndex: -1,
230 } 406 }
231 407
232 m := archive.Manifest{ 408 m := archive.Manifest{
233 » » Desc: t.desc, 409 » » Desc: &sa.desc,
234 Source: &ss, 410 Source: &ss,
235 » » LogWriter: streamO, 411 » » LogWriter: streamWriter,
236 » » IndexWriter: indexO, 412 » » IndexWriter: indexWriter,
237 » » DataWriter: dataO, 413 » » DataWriter: dataWriter,
238 » » StreamIndexRange: t.StreamIndexRange, 414 » » StreamIndexRange: sa.StreamIndexRange,
239 » » PrefixIndexRange: t.PrefixIndexRange, 415 » » PrefixIndexRange: sa.PrefixIndexRange,
240 » » ByteRange: t.ByteRange, 416 » » ByteRange: sa.ByteRange,
241 417
242 Logger: log.Get(c), 418 Logger: log.Get(c),
243 } 419 }
244 » err = archive.Archive(m) 420 » if err = archive.Archive(m); err != nil {
245 » if err != nil {
246 log.WithError(err).Errorf(c, "Failed to archive log stream.") 421 log.WithError(err).Errorf(c, "Failed to archive log stream.")
247 return 422 return
248 } 423 }
249 424
250 » t.ar.TerminalIndex = int64(ss.lastIndex) 425 » if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
251 » if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { 426 » » // Fail if we were requested to archive only the complete log. W e consider
252 » » // Fail, if we were requested to archive only the complete log. 427 » » // this a transient error with the expectation that the missing entries will
253 » » if t.Complete { 428 » » // show up in future retries.
254 » » » log.Fields{ 429 » » switch {
255 » » » » "terminalIndex": tidx, 430 » » case complete && ss.hasMissingEntries:
256 » » » » "lastIndex": t.ar.TerminalIndex, 431 » » » log.Errorf(c, "Log stream has missing entries, but compl eteness is required.")
257 » » » }.Errorf(c, "Log stream archival stopped prior to termin al index.") 432 » » » err = errors.WrapTransient(errors.New("stream has missin g entries"))
258 » » » return errors.New("stream finished short of terminal ind ex") 433 » » » return
259 » » } 434
260 435 » » case ss.logEntryCount == 0:
261 » » if t.ar.TerminalIndex < 0 {
262 // If our last log index was <0, then no logs were archi ved. 436 // If our last log index was <0, then no logs were archi ved.
263 log.Warningf(c, "No log entries were archived.") 437 log.Warningf(c, "No log entries were archived.")
264 » » } else { 438
439 » » default:
265 // Update our terminal index. 440 // Update our terminal index.
266 log.Fields{ 441 log.Fields{
267 » » » » "from": tidx, 442 » » » » "terminalIndex": ss.lastIndex,
268 » » » » "to": t.ar.TerminalIndex, 443 » » » » "logEntryCount": ss.logEntryCount,
269 » » » }.Infof(c, "Updated log stream terminal index.") 444 » » » » "hasMissingEntries": ss.hasMissingEntries,
445 » » » }.Debugf(c, "Finished archiving log stream.")
270 } 446 }
271 } 447 }
272 448
273 // Update our state with archival results. 449 // Update our state with archival results.
274 » t.ar.Path = t.Path 450 » sa.terminalIndex = int64(ss.lastIndex)
275 » t.ar.StreamSize = streamO.Count() 451 » sa.logEntryCount = ss.logEntryCount
276 » t.ar.IndexSize = indexO.Count() 452 » sa.stream.count = streamWriter.Count()
277 » t.ar.DataSize = dataO.Count() 453 » sa.index.count = indexWriter.Count()
278 » t.ar.Complete = !ss.hasMissingEntries 454 » sa.data.count = dataWriter.Count()
279 return 455 return
280 } 456 }
281 457
282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { 458 type stagingPaths struct {
283 » p := t.GSBase.Concat(path, name) 459 » staged gs.Path
284 » o := gsObject{ 460 » final gs.Path
285 » » gs: t.GSClient, 461 » count int64
286 » » bucket: p.Bucket(), 462 }
287 » » path: p.Filename(), 463
288 » } 464 func (d *stagingPaths) clearStaged() {
289 465 » d.staged = ""
290 » // Build our GS URL. Note that since buildGSPath joins with "/", the ini tial 466 }
291 » // token, "gs:/", will become "gs://". 467
292 » o.url = string(p) 468 func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logd og.ArchiveStreamRequest) error {
293 469 » err := parallel.FanOutIn(func(taskC chan<- func() error) {
294 » var err error 470 » » for _, d := range sa.getStagingPaths() {
295 » o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path) 471 » » » d := d
472
473 » » » // Don't copy zero-sized streams.
474 » » » if d.count == 0 {
475 » » » » continue
476 » » » }
477
478 » » » taskC <- func() error {
479 » » » » if err := client.Rename(d.staged, d.final); err != nil {
480 » » » » » log.Fields{
481 » » » » » » log.ErrorKey: err,
482 » » » » » » "stagedPath": d.staged,
483 » » » » » » "finalPath": d.final,
484 » » » » » }.Errorf(c, "Failed to rename GS object. ")
485 » » » » » return err
486 » » » » }
487
488 » » » » // Clear the staged value to indicate that it no longer exists.
489 » » » » d.clearStaged()
490 » » » » return nil
491 » » » }
492 » » }
493 » })
296 if err != nil { 494 if err != nil {
297 » » log.Fields{ 495 » » return err
298 » » » log.ErrorKey: err, 496 » }
299 » » » "url": o.url, 497
300 » » }.Errorf(c, "Failed to create Writer.") 498 » ar.TerminalIndex = sa.terminalIndex
301 » » return nil, err 499 » ar.LogEntryCount = sa.logEntryCount
302 » } 500 » ar.StreamUrl = string(sa.stream.final)
303 501 » ar.StreamSize = sa.stream.count
304 » // Delete any existing object at this path. 502 » ar.IndexUrl = string(sa.index.final)
305 » if err := o.delete(); err != nil { 503 » ar.IndexSize = sa.index.count
306 » » closeErr := o.Close() 504 » ar.DataUrl = string(sa.data.final)
307 505 » ar.DataSize = sa.data.count
308 » » log.Fields{ 506 » return nil
309 » » » log.ErrorKey: err, 507 }
310 » » » "closeErr": closeErr, 508
311 » » » "url": o.url, 509 func (sa *stagedArchival) cleanup(c context.Context) {
312 » » }.Errorf(c, "Could not delete object during creation.") 510 » for _, d := range sa.getStagingPaths() {
313 » » return nil, err 511 » » if d.staged == "" {
314 » } 512 » » » continue
315 » return &o, nil 513 » » }
316 } 514
317 515 » » if err := sa.GSClient.Delete(d.staged); err != nil {
318 // gsObjectManger wraps a gsObject instance with metadata.
319 type gsObject struct {
320 » gs.Writer
321
322 » // gs is the Client instance.
323 » gs gs.Client
324 » // bucket is the name of the object's bucket.
325 » bucket string
326 » // path is the bucket-relative path of the object.
327 » path string
328 » // url is the Google Storage URL (gs://) of this object.
329 » url string
330 }
331
332 func (o *gsObject) delete() error {
333 » return o.gs.Delete(o.bucket, o.path)
334 }
335
336 // storageSource is an archive.LogEntrySource that pulls log entries from
337 // intermediate storage via its storage.Storage instance.
338 type storageSource struct {
339 » context.Context
340
341 » st storage.Storage // the storage instance to read from
342 » path types.StreamPath // the path of the log stream
343 » contiguous bool // if true, enforce contiguous entries
344 » terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
345
346 » buf []*logpb.LogEntry
347 » lastIndex types.MessageIndex
348 » hasMissingEntries bool // true if some log entries were missing.
349 }
350
351 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
352 » bytes := 0
353
354 » req := storage.GetRequest{
355 » » Path: s.path,
356 » » Index: start,
357 » }
358 » return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
359 » » le := logpb.LogEntry{}
360 » » if err := proto.Unmarshal(d, &le); err != nil {
361 log.Fields{ 516 log.Fields{
362 » » » » log.ErrorKey: err, 517 » » » » log.ErrorKey: err,
363 » » » » "streamIndex": idx, 518 » » » » "path": d.staged,
364 » » » }.Errorf(s, "Failed to unmarshal LogEntry.") 519 » » » }.Warningf(c, "Failed to clean up staged path.")
365 » » » return false 520 » » }
366 » » } 521
367 » » s.buf = append(s.buf, &le) 522 » » d.clearStaged()
368 523 » }
369 » » // Stop loading if we've reached or exceeded our buffer size. 524 }
370 » » bytes += len(d) 525
371 » » return bytes < storageBufferSize 526 func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
372 » }) 527 » return []*stagingPaths{
373 } 528 » » &sa.stream,
374 529 » » &sa.index,
375 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { 530 » » &sa.data,
376 » if len(s.buf) == 0 { 531 » }
377 » » s.buf = s.buf[:0] 532 }
378 » » if err := s.bufferEntries(s.lastIndex + 1); err != nil {
379 » » » if err == storage.ErrDoesNotExist {
380 » » » » log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
381 » » » » return nil, archive.ErrEndOfStream
382 » » » }
383
384 » » » log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.")
385 » » » return nil, err
386 » » }
387 » }
388
389 » if len(s.buf) == 0 {
390 » » log.Fields{
391 » » » "lastIndex": s.lastIndex,
392 » » }.Debugf(s, "Encountered end of stream.")
393 » » return nil, archive.ErrEndOfStream
394 » }
395
396 » var le *logpb.LogEntry
397 » le, s.buf = s.buf[0], s.buf[1:]
398
399 » // If we're enforcing a contiguous log stream, error if this LogEntry is not
400 » // contiguous.
401 » sidx := types.MessageIndex(le.StreamIndex)
402 » nidx := (s.lastIndex + 1)
403 » if sidx != nidx {
404 » » s.hasMissingEntries = true
405
406 » » if s.contiguous {
407 » » » log.Fields{
408 » » » » "index": sidx,
409 » » » » "nextIndex": nidx,
410 » » » }.Errorf(s, "Non-contiguous log stream while enforcing." )
411 » » » return nil, errors.New("non-contiguous log stream")
412 » » }
413 » }
414
415 » // If we're enforcing a maximum terminal index, return end of stream if this
416 » // LogEntry exceeds that index.
417 » if s.terminalIndex >= 0 && sidx > s.terminalIndex {
418 » » log.Fields{
419 » » » "index": sidx,
420 » » » "terminalIndex": s.terminalIndex,
421 » » }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
422 » » return nil, archive.ErrEndOfStream
423 » }
424
425 » s.lastIndex = sidx
426 » return le, nil
427 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698