Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(298)

Unified Diff: server/internal/logdog/archivist/archivist.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/cmd/logdog_collector/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/archivist/archivist.go
diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go
index 1d79af1d0225532985066c380010d92f0ad693f4..ca0a4c2d05f7175f6132bc49c17a628c0676481a 100644
--- a/server/internal/logdog/archivist/archivist.go
+++ b/server/internal/logdog/archivist/archivist.go
@@ -5,7 +5,10 @@
package archivist
import (
+ "bytes"
+ "encoding/hex"
"fmt"
+ "io"
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
@@ -13,12 +16,29 @@ import (
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/logdog/types"
log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/parallel"
"github.com/luci/luci-go/common/proto/logdog/logpb"
"github.com/luci/luci-go/server/logdog/archive"
"github.com/luci/luci-go/server/logdog/storage"
"golang.org/x/net/context"
)
+// Task is a single archive task.
+type Task interface {
+ // UniqueID returns a task-unique value. Other tasks, and other retries of
+ // this task, should (try to) not reuse this ID.
+ UniqueID() string
+
+ // Task is the archive task to execute.
+ Task() *logdog.ArchiveTask
+
+ // AssertLease asserts that the lease for this Task is still held.
+ //
+ // On failure, it will return an error. If successful, the Archivist may
+ // assume that it holds the lease longer.
+ AssertLease(context.Context) error
+}
+
// Archivist is a stateless configuration capable of archiving individual log
// streams.
type Archivist struct {
@@ -26,16 +46,18 @@ type Archivist struct {
// endpoint.
Service logdog.ServicesClient
- // Storage is the intermediate storage instance to use to pull log entries for
- // archival.
+ // Storage is the archival source Storage instance.
Storage storage.Storage
-
// GSClient is the Google Storage client to for archive generation.
GSClient gs.Client
// GSBase is the base Google Storage path. This includes the bucket name
// and any associated path.
GSBase gs.Path
+ // GSStagingBase is the base Google Storage path for archive staging. This
+ // includes the bucket name and any associated path.
+ GSStagingBase gs.Path
+
// PrefixIndexRange is the maximum number of stream indexes in between index
// entries. See archive.Manifest for more information.
StreamIndexRange int
@@ -52,376 +74,455 @@ type Archivist struct {
const storageBufferSize = types.MaxLogEntryDataSize * 64
// ArchiveTask processes and executes a single log stream archive task.
-func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error {
- var task logdog.ArchiveTask
- if err := proto.Unmarshal(desc, &task); err != nil {
- log.WithError(err).Errorf(c, "Failed to decode archive task.")
- return err
- }
- return a.Archive(c, &task)
-}
-
-// Archive archives a single log stream. If unsuccessful, an error is returned.
//
-// This error may be wrapped in errors.Transient if it is believed to have been
-// caused by a transient failure.
+// It returns true on success (delete the task) and false on failure (don't
+// delete the task). The return value of true should only be used if the task
+// is truly complete and acknowledged by the Coordinator.
//
// If the supplied Context is Done, operation may terminate before completion,
// returning the Context's error.
-func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error {
+func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
+ delete, err := a.archiveTaskImpl(c, task)
+ log.Fields{
+ log.ErrorKey: err,
+ "delete": delete,
+ "path": task.Task().Path,
+ }.Infof(c, "Finished archive task.")
+ return delete
+}
+
+// archiveTaskImpl returns the same boolean value as ArchiveTask, but includes
+// an error. The error is useful for testing to assert that certain conditions
+// were hit.
+func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
+ at := task.Task()
+ log.Fields{
+ "path": at.Path,
+ }.Debugf(c, "Received archival task.")
+
// Load the log stream's current state. If it is already archived, we will
// return an immediate success.
ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
- Path: t.Path,
+ Path: at.Path,
Desc: true,
})
switch {
case err != nil:
log.WithError(err).Errorf(c, "Failed to load log stream.")
- return err
+ return false, err
+
case ls.State == nil:
- return errors.New("missing state")
+ log.Errorf(c, "Log stream did not include state.")
+ return false, errors.New("log stream did not include state")
+
+ case ls.State.Purged:
+ log.Warningf(c, "Log stream is purged. Discarding archival request.")
+ return true, errors.New("log stream is purged")
+
+ case ls.State.Archived:
+ log.Infof(c, "Log stream is already archived. Discarding archival request.")
+ return true, errors.New("log stream is archived")
+
+ case !bytes.Equal(ls.ArchivalKey, at.Key):
+ if len(ls.ArchivalKey) == 0 {
+ // The log stream is not registering as "archive pending" state.
+ //
+ // This can happen if the eventually-consistent datastore hasn't updated
+ // its log stream state by the time this Pub/Sub task is received. In
+ // this case, we will continue retrying the task until datastore registers
+ // that some key is associated with it.
+ log.Fields{
+ "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey),
+ "requestArchivalKey": hex.EncodeToString(at.Key),
+ }.Infof(c, "Archival request received before log stream has its key.")
+ return false, errors.New("premature archival request")
+ }
+
+ // This can happen if a Pub/Sub message is dispatched during a transaction,
+ // but that specific transaction failed. In this case, the Pub/Sub message
+ // will have a key that doesn't match the key that was transactionally
+ // encoded, and can be discarded.
+ log.Fields{
+ "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey),
+ "requestArchivalKey": hex.EncodeToString(at.Key),
+ }.Infof(c, "Superfluous archival request (keys do not match). Discarding.")
+ return true, errors.New("superfluous archival request")
+
case ls.State.ProtoVersion != logpb.Version:
log.Fields{
"protoVersion": ls.State.ProtoVersion,
"expectedVersion": logpb.Version,
}.Errorf(c, "Unsupported log stream protobuf version.")
- return errors.New("unsupported protobuf version")
- case ls.Desc == nil:
- return errors.New("missing descriptor")
+ return false, errors.New("unsupported log stream protobuf version")
- case ls.State.Purged:
- log.Warningf(c, "Log stream is purged.")
- return nil
- case ls.State.Archived:
- log.Infof(c, "Log stream is already archived.")
- return nil
+ case ls.Desc == nil:
+ log.Errorf(c, "Log stream did not include a descriptor.")
+ return false, errors.New("log stream did not include a descriptor")
}
- // Deserialize and validate the descriptor protobuf.
- var desc logpb.LogStreamDescriptor
- if err := proto.Unmarshal(ls.Desc, &desc); err != nil {
+ // If the archival request is younger than the settle delay, kick it back to
+ // retry later.
+ age := ls.Age.Duration()
+ if age < at.SettleDelay.Duration() {
log.Fields{
- log.ErrorKey: err,
- "protoVersion": ls.State.ProtoVersion,
- }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
- return err
+ "age": age,
+ "settleDelay": at.SettleDelay.Duration(),
+ }.Infof(c, "Log stream is younger than the settle delay. Returning task to queue.")
+ return false, errors.New("log stream is within settle delay")
}
- task := &archiveTask{
- Archivist: a,
- ArchiveTask: t,
- ls: ls,
- desc: &desc,
+ // Are we required to archive a complete log stream?
+ complete := (age <= at.CompletePeriod.Duration())
+ if complete && ls.State.TerminalIndex < 0 {
+ log.Warningf(c, "Cannot archive complete stream with no terminal index.")
+ return false, errors.New("completeness required, but stream has no terminal index")
}
- if err := task.archive(c); err != nil {
- log.WithError(err).Errorf(c, "Failed to perform archival operation.")
- return err
+
+ ar := logdog.ArchiveStreamRequest{
+ Path: at.Path,
}
+
+ // Archive to staging.
+ //
+ // If a non-transient failure occurs here, we will report it to the Archivist
+ // under the assumption that it will continue occurring.
+ //
+ // We will handle error creating the plan and executing the plan in the same
+ // switch statement below.
+ staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID())
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to create staged archival plan.")
+ } else {
+ err = staged.stage(c, complete)
+ }
+
+ switch {
+ case errors.IsTransient(err):
+ // If this is a transient error, exit immediately and do not delete the
+ // archival task.
+ log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.")
+ return false, err
+
+ case err != nil:
+ // This is a non-transient error, so we are confident that any future
+ // Archival will also encounter this error. We will mark this archival
+ // as an error and report it to the Coordinator.
+ log.WithError(err).Errorf(c, "Archival failed with non-transient error.")
+ ar.Error = err.Error()
+ if ar.Error == "" {
+ // This needs to be non-nil, so if our acutal error has an empty string,
+ // fill in a generic message.
+ ar.Error = "archival error"
+ }
+
+ default:
+ // In case something fails, clean up our staged archival (best effort).
+ defer staged.cleanup(c)
+
+ // Finalize the archival. First, extend our lease to confirm that we still
+ // hold it.
+ if err := task.AssertLease(c); err != nil {
+ log.WithError(err).Errorf(c, "Failed to extend task lease before finalizing.")
+ return false, err
+ }
+
+ // Finalize the archival.
+ if err := staged.finalize(c, a.GSClient, &ar); err != nil {
+ log.WithError(err).Errorf(c, "Failed to finalize archival.")
+ return false, err
+ }
+ }
+
log.Fields{
- "streamURL": task.ar.StreamUrl,
- "indexURL": task.ar.IndexUrl,
- "dataURL": task.ar.DataUrl,
- "terminalIndex": task.ar.TerminalIndex,
- "complete": task.ar.Complete,
- }.Debugf(c, "Finished archive construction.")
-
- if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
- log.WithError(err).Errorf(c, "Failed to mark log stream as archived.")
- return err
+ "streamURL": ar.StreamUrl,
+ "indexURL": ar.IndexUrl,
+ "dataURL": ar.DataUrl,
+ "terminalIndex": ar.TerminalIndex,
+ "logEntryCount": ar.LogEntryCount,
+ "hadError": ar.Error,
+ "complete": ar.Complete(),
+ }.Debugf(c, "Finished archival round. Reporting archive state.")
+
+ // Extend the lease again to confirm that we still hold it.
+ if err := task.AssertLease(c); err != nil {
+ log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.")
+ return false, err
}
- return nil
+
+ if _, err := a.Service.ArchiveStream(c, &ar); err != nil {
+ log.WithError(err).Errorf(c, "Failed to report archive state.")
+ return false, err
+ }
+
+ // Archival is complete and acknowledged by Coordinator. Consume the archival
+ // task.
+ return true, nil
}
-// archiveTask is the set of parameters for a single archival.
-type archiveTask struct {
- *Archivist
- *logdog.ArchiveTask
+func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) (
+ *stagedArchival, error) {
+ sa := stagedArchival{
+ Archivist: a,
+ path: path,
- // ls is the log stream state.
- ls *logdog.LoadStreamResponse
- // desc is the unmarshaled log stream descriptor.
- desc *logpb.LogStreamDescriptor
+ terminalIndex: ls.State.TerminalIndex,
+ }
- // ar will be populated during archive construction.
- ar logdog.ArchiveStreamRequest
-}
+ // Deserialize and validate the descriptor protobuf. If this fails, it is a
+ // non-transient error.
+ if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "protoVersion": ls.State.ProtoVersion,
+ }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
+ return nil, err
+ }
-// archiveState performs the archival operation on a stream described by a
-// Coordinator State. Upon success, the State will be updated with the result
-// of the archival operation.
-func (t *archiveTask) archive(c context.Context) (err error) {
- // Generate our archival object managers.
- bext := t.desc.BinaryFileExt
+ bext := sa.desc.BinaryFileExt
if bext == "" {
bext = "bin"
}
- path := t.Path
- var streamO, indexO, dataO *gsObject
- streamO, err = t.newGSObject(c, path, "logstream.entries")
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create log object.")
- return
- }
+ // Construct our staged archival paths.
+ sa.stream = a.makeStagingPaths(path, "logstream.entries", uid)
+ sa.index = a.makeStagingPaths(path, "logstream.index", uid)
+ sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid)
+ return &sa, nil
+}
- indexO, err = t.newGSObject(c, path, "logstream.index")
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create index object.")
- return
+// makeStagingPaths returns a stagingPaths instance for the given path and
+// file name. It incorporates a unique ID into the staging name to differentiate
+// it from other staging paths for the same path/name.
+func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths {
+ return stagingPaths{
+ staged: a.GSStagingBase.Concat(string(path), uid, name),
+ final: a.GSBase.Concat(string(path), name),
}
+}
- dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext))
- if err != nil {
- log.WithError(err).Errorf(c, "Failed to create data object.")
- return
- }
+type stagedArchival struct {
+ *Archivist
+
+ path types.StreamPath
+ desc logpb.LogStreamDescriptor
- // Load the URLs into our state.
- t.ar.StreamUrl = streamO.url
- t.ar.IndexUrl = indexO.url
- t.ar.DataUrl = dataO.url
+ stream stagingPaths
+ streamSize int64
+
+ index stagingPaths
+ indexSize int64
+
+ data stagingPaths
+ dataSize int64
+
+ finalized bool
+ terminalIndex int64
+ logEntryCount int64
+}
+// stage executes the archival process, archiving to the staged storage paths.
+//
+// If stage fails, it may return a transient error.
+func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) {
log.Fields{
- "streamURL": t.ar.StreamUrl,
- "indexURL": t.ar.IndexUrl,
- "dataURL": t.ar.DataUrl,
- }.Infof(c, "Archiving log stream...")
-
- // We want to try and delete any GS objects that were created during a failed
- // archival attempt.
- deleteOnFail := func(o *gsObject) {
- if o == nil || err == nil {
- return
+ "streamURL": sa.stream.staged,
+ "indexURL": sa.index.staged,
+ "dataURL": sa.data.staged,
+ }.Debugf(c, "Staging log stream...")
+
+ // Group any transient errors that occur during cleanup. If we aren't
+ // returning a non-transient error, return a transient "terr".
+ var terr errors.MultiError
+ defer func() {
+ if err == nil && len(terr) > 0 {
+ err = errors.WrapTransient(terr)
+ }
+ }()
+
+ // Close our writers on exit. If any of them fail to close, mark the archival
+ // as a transient failure.
+ closeWriter := func(closer io.Closer, path gs.Path) {
+ // Close the Writer. If this results in an error, append it to our transient
+ // error MultiError.
+ if ierr := closer.Close(); ierr != nil {
+ terr = append(terr, ierr)
}
- if ierr := o.delete(); ierr != nil {
+
+ // If we have an archival error, also delete the path associated with this
+ // stream. This is a non-fatal failure, since we've already hit a fatal
+ // one.
+ if err != nil || len(terr) > 0 {
+ if ierr := sa.GSClient.Delete(path); ierr != nil {
+ log.Fields{
+ log.ErrorKey: ierr,
+ "path": path,
+ }.Warningf(c, "Failed to delete stream on error.")
+ }
+ }
+ }
+
+ // createWriter is a shorthand function for creating a writer to a path and
+ // reporting an error if it failed.
+ createWriter := func(p gs.Path) (gs.Writer, error) {
+ w, ierr := sa.GSClient.NewWriter(p)
+ if ierr != nil {
log.Fields{
log.ErrorKey: ierr,
- "url": o.url,
- }.Warningf(c, "Failed to clean-up GS object on failure.")
+ "path": p,
+ }.Errorf(c, "Failed to create writer.")
+ return nil, ierr
}
+ return w, nil
}
- defer deleteOnFail(streamO)
- defer deleteOnFail(indexO)
- defer deleteOnFail(dataO)
-
- // Close our GS object managers on exit. If any of them fail to close, marh
- // the archival as a failure.
- closeOM := func(o *gsObject) {
- if o == nil {
- return
- }
- if ierr := o.Close(); ierr != nil {
- err = ierr
- }
+
+ var streamWriter, indexWriter, dataWriter gs.Writer
+ if streamWriter, err = createWriter(sa.stream.staged); err != nil {
+ return
+ }
+ defer closeWriter(streamWriter, sa.stream.staged)
+
+ if indexWriter, err = createWriter(sa.index.staged); err != nil {
+ return err
+ }
+ defer closeWriter(indexWriter, sa.index.staged)
+
+ if dataWriter, err = createWriter(sa.data.staged); err != nil {
+ return err
}
- defer closeOM(streamO)
- defer closeOM(indexO)
- defer closeOM(dataO)
+ defer closeWriter(dataWriter, sa.data.staged)
// Read our log entries from intermediate storage.
ss := storageSource{
Context: c,
- st: t.Storage,
- path: types.StreamPath(t.Path),
- contiguous: t.Complete,
- terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex),
+ st: sa.Storage,
+ path: sa.path,
+ contiguous: complete,
+ terminalIndex: types.MessageIndex(sa.terminalIndex),
lastIndex: -1,
}
m := archive.Manifest{
- Desc: t.desc,
+ Desc: &sa.desc,
Source: &ss,
- LogWriter: streamO,
- IndexWriter: indexO,
- DataWriter: dataO,
- StreamIndexRange: t.StreamIndexRange,
- PrefixIndexRange: t.PrefixIndexRange,
- ByteRange: t.ByteRange,
+ LogWriter: streamWriter,
+ IndexWriter: indexWriter,
+ DataWriter: dataWriter,
+ StreamIndexRange: sa.StreamIndexRange,
+ PrefixIndexRange: sa.PrefixIndexRange,
+ ByteRange: sa.ByteRange,
Logger: log.Get(c),
}
- err = archive.Archive(m)
- if err != nil {
+ if err = archive.Archive(m); err != nil {
log.WithError(err).Errorf(c, "Failed to archive log stream.")
return
}
- t.ar.TerminalIndex = int64(ss.lastIndex)
- if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex {
- // Fail, if we were requested to archive only the complete log.
- if t.Complete {
- log.Fields{
- "terminalIndex": tidx,
- "lastIndex": t.ar.TerminalIndex,
- }.Errorf(c, "Log stream archival stopped prior to terminal index.")
- return errors.New("stream finished short of terminal index")
- }
+ if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) {
+ // Fail if we were requested to archive only the complete log. We consider
+ // this a transient error with the expectation that the missing entries will
+ // show up in future retries.
+ switch {
+ case complete && ss.hasMissingEntries:
+ log.Errorf(c, "Log stream has missing entries, but completeness is required.")
+ err = errors.WrapTransient(errors.New("stream has missing entries"))
+ return
- if t.ar.TerminalIndex < 0 {
+ case ss.logEntryCount == 0:
// If our last log index was <0, then no logs were archived.
log.Warningf(c, "No log entries were archived.")
- } else {
+
+ default:
// Update our terminal index.
log.Fields{
- "from": tidx,
- "to": t.ar.TerminalIndex,
- }.Infof(c, "Updated log stream terminal index.")
+ "terminalIndex": ss.lastIndex,
+ "logEntryCount": ss.logEntryCount,
+ "hasMissingEntries": ss.hasMissingEntries,
+ }.Debugf(c, "Finished archiving log stream.")
}
}
// Update our state with archival results.
- t.ar.Path = t.Path
- t.ar.StreamSize = streamO.Count()
- t.ar.IndexSize = indexO.Count()
- t.ar.DataSize = dataO.Count()
- t.ar.Complete = !ss.hasMissingEntries
+ sa.terminalIndex = int64(ss.lastIndex)
+ sa.logEntryCount = ss.logEntryCount
+ sa.stream.count = streamWriter.Count()
+ sa.index.count = indexWriter.Count()
+ sa.data.count = dataWriter.Count()
return
}
-func (t *archiveTask) newGSObject(c context.Context, path string, name string) (*gsObject, error) {
- p := t.GSBase.Concat(path, name)
- o := gsObject{
- gs: t.GSClient,
- bucket: p.Bucket(),
- path: p.Filename(),
- }
-
- // Build our GS URL. Note that since buildGSPath joins with "/", the initial
- // token, "gs:/", will become "gs://".
- o.url = string(p)
-
- var err error
- o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path)
- if err != nil {
- log.Fields{
- log.ErrorKey: err,
- "url": o.url,
- }.Errorf(c, "Failed to create Writer.")
- return nil, err
- }
-
- // Delete any existing object at this path.
- if err := o.delete(); err != nil {
- closeErr := o.Close()
-
- log.Fields{
- log.ErrorKey: err,
- "closeErr": closeErr,
- "url": o.url,
- }.Errorf(c, "Could not delete object during creation.")
- return nil, err
- }
- return &o, nil
-}
-
-// gsObjectManger wraps a gsObject instance with metadata.
-type gsObject struct {
- gs.Writer
-
- // gs is the Client instance.
- gs gs.Client
- // bucket is the name of the object's bucket.
- bucket string
- // path is the bucket-relative path of the object.
- path string
- // url is the Google Storage URL (gs://) of this object.
- url string
+type stagingPaths struct {
+ staged gs.Path
+ final gs.Path
+ count int64
}
-func (o *gsObject) delete() error {
- return o.gs.Delete(o.bucket, o.path)
+func (d *stagingPaths) clearStaged() {
+ d.staged = ""
}
-// storageSource is an archive.LogEntrySource that pulls log entries from
-// intermediate storage via its storage.Storage instance.
-type storageSource struct {
- context.Context
-
- st storage.Storage // the storage instance to read from
- path types.StreamPath // the path of the log stream
- contiguous bool // if true, enforce contiguous entries
- terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
-
- buf []*logpb.LogEntry
- lastIndex types.MessageIndex
- hasMissingEntries bool // true if some log entries were missing.
-}
-
-func (s *storageSource) bufferEntries(start types.MessageIndex) error {
- bytes := 0
-
- req := storage.GetRequest{
- Path: s.path,
- Index: start,
- }
- return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
- le := logpb.LogEntry{}
- if err := proto.Unmarshal(d, &le); err != nil {
- log.Fields{
- log.ErrorKey: err,
- "streamIndex": idx,
- }.Errorf(s, "Failed to unmarshal LogEntry.")
- return false
- }
- s.buf = append(s.buf, &le)
-
- // Stop loading if we've reached or exceeded our buffer size.
- bytes += len(d)
- return bytes < storageBufferSize
- })
-}
+func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logdog.ArchiveStreamRequest) error {
+ err := parallel.FanOutIn(func(taskC chan<- func() error) {
+ for _, d := range sa.getStagingPaths() {
+ d := d
-func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
- if len(s.buf) == 0 {
- s.buf = s.buf[:0]
- if err := s.bufferEntries(s.lastIndex + 1); err != nil {
- if err == storage.ErrDoesNotExist {
- log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
- return nil, archive.ErrEndOfStream
+ // Don't copy zero-sized streams.
+ if d.count == 0 {
+ continue
}
- log.WithError(err).Errorf(s, "Failed to retrieve log stream from storage.")
- return nil, err
+ taskC <- func() error {
+ if err := client.Rename(d.staged, d.final); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "stagedPath": d.staged,
+ "finalPath": d.final,
+ }.Errorf(c, "Failed to rename GS object.")
+ return err
+ }
+
+ // Clear the staged value to indicate that it no longer exists.
+ d.clearStaged()
+ return nil
+ }
}
+ })
+ if err != nil {
+ return err
}
- if len(s.buf) == 0 {
- log.Fields{
- "lastIndex": s.lastIndex,
- }.Debugf(s, "Encountered end of stream.")
- return nil, archive.ErrEndOfStream
- }
-
- var le *logpb.LogEntry
- le, s.buf = s.buf[0], s.buf[1:]
+ ar.TerminalIndex = sa.terminalIndex
+ ar.LogEntryCount = sa.logEntryCount
+ ar.StreamUrl = string(sa.stream.final)
+ ar.StreamSize = sa.stream.count
+ ar.IndexUrl = string(sa.index.final)
+ ar.IndexSize = sa.index.count
+ ar.DataUrl = string(sa.data.final)
+ ar.DataSize = sa.data.count
+ return nil
+}
- // If we're enforcing a contiguous log stream, error if this LogEntry is not
- // contiguous.
- sidx := types.MessageIndex(le.StreamIndex)
- nidx := (s.lastIndex + 1)
- if sidx != nidx {
- s.hasMissingEntries = true
+func (sa *stagedArchival) cleanup(c context.Context) {
+ for _, d := range sa.getStagingPaths() {
+ if d.staged == "" {
+ continue
+ }
- if s.contiguous {
+ if err := sa.GSClient.Delete(d.staged); err != nil {
log.Fields{
- "index": sidx,
- "nextIndex": nidx,
- }.Errorf(s, "Non-contiguous log stream while enforcing.")
- return nil, errors.New("non-contiguous log stream")
+ log.ErrorKey: err,
+ "path": d.staged,
+ }.Warningf(c, "Failed to clean up staged path.")
}
- }
- // If we're enforcing a maximum terminal index, return end of stream if this
- // LogEntry exceeds that index.
- if s.terminalIndex >= 0 && sidx > s.terminalIndex {
- log.Fields{
- "index": sidx,
- "terminalIndex": s.terminalIndex,
- }.Warningf(s, "Discarding log entries beyond expected terminal index.")
- return nil, archive.ErrEndOfStream
+ d.clearStaged()
}
+}
- s.lastIndex = sidx
- return le, nil
+func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
+ return []*stagingPaths{
+ &sa.stream,
+ &sa.index,
+ &sa.data,
+ }
}
« no previous file with comments | « server/cmd/logdog_collector/main.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698