Chromium Code Reviews| Index: server/internal/logdog/archivist/archivist.go |
| diff --git a/server/internal/logdog/archivist/archivist.go b/server/internal/logdog/archivist/archivist.go |
| index 1d79af1d0225532985066c380010d92f0ad693f4..b7518acc7c19116dcdced712e8417ef1ff44801b 100644 |
| --- a/server/internal/logdog/archivist/archivist.go |
| +++ b/server/internal/logdog/archivist/archivist.go |
| @@ -5,7 +5,10 @@ |
| package archivist |
| import ( |
| + "bytes" |
| + "encoding/hex" |
| "fmt" |
| + "io" |
| "github.com/golang/protobuf/proto" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| @@ -13,12 +16,29 @@ import ( |
| "github.com/luci/luci-go/common/gcloud/gs" |
| "github.com/luci/luci-go/common/logdog/types" |
| log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/parallel" |
| "github.com/luci/luci-go/common/proto/logdog/logpb" |
| "github.com/luci/luci-go/server/logdog/archive" |
| "github.com/luci/luci-go/server/logdog/storage" |
| "golang.org/x/net/context" |
| ) |
| +// Task is a single archive task. |
| +type Task interface { |
| + // UniqueID returns a task-unique value. Other tasks, and other retries of |
| + // this task, should (try to) not reuse this ID. |
| + UniqueID() string |
| + |
| + // Data is the archive task's data content. |
| + Data() []byte |
| + |
| + // AssertLease asserts that the lease for this Task is still held. |
| + // |
| + // On failure, it will return an error. If successful, the Archivist may |
| + // assume that it holds the lease longer. |
| + AssertLease(context.Context) error |
| +} |
| + |
| // Archivist is a stateless configuration capable of archiving individual log |
| // streams. |
| type Archivist struct { |
| @@ -36,6 +56,10 @@ type Archivist struct { |
| // GSBase is the base Google Storage path. This includes the bucket name |
| // and any associated path. |
| GSBase gs.Path |
| + // GSStagingBase is the base Google Storage path for archive staging. This |
| + // includes the bucket name and any associated path. |
| + GSStagingBase gs.Path |
| + |
| // PrefixIndexRange is the maximum number of stream indexes in between index |
| // entries. See archive.Manifest for more information. |
| StreamIndexRange int |
| @@ -52,376 +76,457 @@ type Archivist struct { |
| const storageBufferSize = types.MaxLogEntryDataSize * 64 |
| // ArchiveTask processes and executes a single log stream archive task. |
| -func (a *Archivist) ArchiveTask(c context.Context, desc []byte) error { |
| - var task logdog.ArchiveTask |
| - if err := proto.Unmarshal(desc, &task); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to decode archive task.") |
| - return err |
| - } |
| - return a.Archive(c, &task) |
| -} |
| - |
| -// Archive archives a single log stream. If unsuccessful, an error is returned. |
| // |
| -// This error may be wrapped in errors.Transient if it is believed to have been |
| -// caused by a transient failure. |
| +// It returns true on success (delete the task) and false on failure (don't |
| +// delete the task). The return value of true should only be used if the task |
| +// is truly complete and acknowledged by the Coordinator. |
| // |
| // If the supplied Context is Done, operation may terminate before completion, |
| // returning the Context's error. |
| -func (a *Archivist) Archive(c context.Context, t *logdog.ArchiveTask) error { |
| +func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
|
dnj
2016/04/11 17:20:04
I re-wrote most of this to use a staging space and
|
| + delete, _ := a.archiveTaskImpl(c, task) |
| + return delete |
| +} |
| + |
| +// archiveTaskImpl returns the same boolean value as ArchiveTask, but includes |
| +// an error. The error is useful for testing to assert that certain conditions |
| +// were hit. |
| +func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { |
| + // If we can't decode the archival task, we can't decide whether or not to |
| + // delete it, so we will leave it in the queue. |
| + var at logdog.ArchiveTask |
| + if err := proto.Unmarshal(task.Data(), &at); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to decode archive task.") |
| + return false, err |
| + } |
| + |
| + log.Fields{ |
| + "path": at.Path, |
| + }.Debugf(c, "Received archival task.") |
| + |
| // Load the log stream's current state. If it is already archived, we will |
| // return an immediate success. |
| ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| - Path: t.Path, |
| + Path: at.Path, |
| Desc: true, |
| }) |
| switch { |
| case err != nil: |
| log.WithError(err).Errorf(c, "Failed to load log stream.") |
| - return err |
| + return false, err |
| + |
| case ls.State == nil: |
| - return errors.New("missing state") |
| + log.Errorf(c, "Log stream did not include state.") |
| + return false, errors.New("log stream did not include state") |
| + |
| + case ls.State.Purged: |
| + log.Warningf(c, "Log stream is purged. Discarding archival request.") |
| + return true, errors.New("log stream is purged") |
| + |
| + case ls.State.Archived: |
| + log.Infof(c, "Log stream is already archived. Discarding archival request.") |
| + return true, errors.New("log stream is archived") |
| + |
| + case !bytes.Equal(ls.ArchivalKey, at.Key): |
| + if len(ls.ArchivalKey) == 0 { |
| + // The log stream is not registering as "archive pending" state. |
| + // |
| + // This can happen if the eventually-consistent datastore hasn't updated |
| + // its log stream state by the time this Pub/Sub task is received. In |
| + // this case, we will continue retrying the task until datastore registers |
| + // that some key is associated with it. |
| + log.Fields{ |
| + "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey), |
| + "requestArchivalKey": hex.EncodeToString(at.Key), |
| + }.Infof(c, "Archival request received before log stream has its key.") |
| + return false, errors.New("premature archival request") |
| + } |
| + |
| + // This can happen if a Pub/Sub message is dispatched during a transaction, |
| + // but that specific transaction failed. In this case, the Pub/Sub message |
| + // will have a key that doesn't match the key that was transactionally |
| + // encoded, and can be discarded. |
| + log.Fields{ |
| + "logStreamArchivalKey": hex.EncodeToString(ls.ArchivalKey), |
| + "requestArchivalKey": hex.EncodeToString(at.Key), |
| + }.Infof(c, "Superfluous archival request (keys do not match). Discarding.") |
| + return true, errors.New("superfluous archival request") |
| + |
| case ls.State.ProtoVersion != logpb.Version: |
| log.Fields{ |
| "protoVersion": ls.State.ProtoVersion, |
| "expectedVersion": logpb.Version, |
| }.Errorf(c, "Unsupported log stream protobuf version.") |
| - return errors.New("unsupported protobuf version") |
| - case ls.Desc == nil: |
| - return errors.New("missing descriptor") |
| + return false, errors.New("unsupported log stream protobuf version") |
| - case ls.State.Purged: |
| - log.Warningf(c, "Log stream is purged.") |
| - return nil |
| - case ls.State.Archived: |
| - log.Infof(c, "Log stream is already archived.") |
| - return nil |
| + case ls.Desc == nil: |
| + log.Errorf(c, "Log stream did not include a descriptor.") |
| + return false, errors.New("log stream did not include a descriptor") |
| } |
| - // Deserialize and validate the descriptor protobuf. |
| - var desc logpb.LogStreamDescriptor |
| - if err := proto.Unmarshal(ls.Desc, &desc); err != nil { |
| + // If the archival request is younger than the settle delay, kick it back to |
| + // retry later. |
| + age := ls.Age.Duration() |
| + if age < at.SettleDelay.Duration() { |
| log.Fields{ |
| - log.ErrorKey: err, |
| - "protoVersion": ls.State.ProtoVersion, |
| - }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| - return err |
| + "age": age, |
| + "settleDelay": at.SettleDelay.Duration(), |
| + }.Infof(c, "Log stream is younger than the settle delay. Returning task to queue.") |
| + return false, errors.New("log stream is within settle delay") |
| } |
| - task := &archiveTask{ |
| - Archivist: a, |
| - ArchiveTask: t, |
| - ls: ls, |
| - desc: &desc, |
| + // Are we required to archive a complete log stream? |
| + complete := (age <= at.CompletePeriod.Duration()) |
| + if complete && ls.State.TerminalIndex < 0 { |
| + log.Warningf(c, "Cannot archive complete stream with no terminal index.") |
| + return false, errors.New("completeness required, but stream has no terminal index") |
| } |
| - if err := task.archive(c); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to perform archival operation.") |
| - return err |
| + |
| + ar := logdog.ArchiveStreamRequest{ |
| + Path: at.Path, |
| + } |
| + |
| + // Archive to staging. |
| + // |
| + // If a non-transient failure occurs here, we will report it to the Archivist |
| + // under the assumption that it will continue occurring. |
| + // |
| + // We will handle error creating the plan and executing the plan in the same |
| + // switch statement below. |
| + staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, task.UniqueID()) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to create staged archival plan.") |
| + } else { |
| + err = staged.stage(c, complete) |
| + } |
| + |
| + switch { |
| + case errors.IsTransient(err): |
| + // If this is a transient error, exit immediately and do not delete the |
| + // archival task. |
| + log.WithError(err).Warningf(c, "TRANSIENT error during archival operation.") |
| + return false, err |
| + |
| + case err != nil: |
| + // This is a non-transient error, so we are confident that any future |
| + // Archival will also encounter this error. We will mark this archival |
| + // as an error and report it to the Coordinator. |
| + log.WithError(err).Errorf(c, "Archival failed with non-transient error.") |
| + ar.Error = err.Error() |
| + if ar.Error == "" { |
| + // This needs to be non-nil, so if our acutal error has an empty string, |
| + // fill in a generic message. |
| + ar.Error = "archival error" |
| + } |
| + |
| + default: |
| + // In case something fails, clean up our staged archival (best effort). |
| + defer staged.cleanup(c) |
| + |
| + // Finalize the archival. First, extend our lease to confirm that we still |
| + // hold it. |
| + if err := task.AssertLease(c); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to extend task lease before finalizing.") |
| + return false, err |
| + } |
| + |
| + // Finalize the archival. |
| + if err := staged.finalize(c, a.GSClient, &ar); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to finalize archival.") |
| + return false, err |
| + } |
| } |
| + |
| log.Fields{ |
| - "streamURL": task.ar.StreamUrl, |
| - "indexURL": task.ar.IndexUrl, |
| - "dataURL": task.ar.DataUrl, |
| - "terminalIndex": task.ar.TerminalIndex, |
| - "complete": task.ar.Complete, |
| - }.Debugf(c, "Finished archive construction.") |
| - |
| - if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to mark log stream as archived.") |
| - return err |
| + "streamURL": ar.StreamUrl, |
| + "indexURL": ar.IndexUrl, |
| + "dataURL": ar.DataUrl, |
| + "terminalIndex": ar.TerminalIndex, |
| + "logEntryCount": ar.LogEntryCount, |
| + "hadError": ar.Error, |
| + "complete": ar.Complete(), |
| + }.Debugf(c, "Finished archival round. Reporting archive state.") |
| + |
| + // Extend the lease again to confirm that we still hold it. |
| + if err := task.AssertLease(c); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to extend task lease before reporting.") |
| + return false, err |
| } |
| - return nil |
| + |
| + if _, err := a.Service.ArchiveStream(c, &ar); err != nil { |
| + log.WithError(err).Errorf(c, "Failed to report archive state.") |
| + return false, err |
| + } |
| + |
| + // Archival is complete and acknowledged by Coordinator. Consume the archival |
| + // task. |
| + return true, nil |
| } |
| -// archiveTask is the set of parameters for a single archival. |
| -type archiveTask struct { |
| - *Archivist |
| - *logdog.ArchiveTask |
| +func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( |
| + *stagedArchival, error) { |
| + sa := stagedArchival{ |
| + Archivist: a, |
| + path: path, |
| - // ls is the log stream state. |
| - ls *logdog.LoadStreamResponse |
| - // desc is the unmarshaled log stream descriptor. |
| - desc *logpb.LogStreamDescriptor |
| + terminalIndex: ls.State.TerminalIndex, |
| + } |
| - // ar will be populated during archive construction. |
| - ar logdog.ArchiveStreamRequest |
| -} |
| + // Deserialize and validate the descriptor protobuf. If this fails, it is a |
| + // non-transient error. |
| + if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "protoVersion": ls.State.ProtoVersion, |
| + }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| + return nil, err |
| + } |
| -// archiveState performs the archival operation on a stream described by a |
| -// Coordinator State. Upon success, the State will be updated with the result |
| -// of the archival operation. |
| -func (t *archiveTask) archive(c context.Context) (err error) { |
| - // Generate our archival object managers. |
| - bext := t.desc.BinaryFileExt |
| + bext := sa.desc.BinaryFileExt |
| if bext == "" { |
| bext = "bin" |
| } |
| - path := t.Path |
| - var streamO, indexO, dataO *gsObject |
| - streamO, err = t.newGSObject(c, path, "logstream.entries") |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create log object.") |
| - return |
| - } |
| + // Construct our staged archival paths. |
| + sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) |
| + sa.index = a.makeStagingPaths(path, "logstream.index", uid) |
| + sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) |
| + return &sa, nil |
| +} |
| - indexO, err = t.newGSObject(c, path, "logstream.index") |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create index object.") |
| - return |
| +// makeStagingPaths returns a stagingPaths instance for the given path and |
| +// file name. It incorporates a unique ID into the staging name to differentiate |
| +// it from other staging paths for the same path/name. |
| +func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) stagingPaths { |
| + return stagingPaths{ |
| + staged: a.GSStagingBase.Concat(string(path), uid, name), |
| + final: a.GSBase.Concat(string(path), name), |
| } |
| +} |
| - dataO, err = t.newGSObject(c, path, fmt.Sprintf("data.%s", bext)) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create data object.") |
| - return |
| - } |
| +type stagedArchival struct { |
| + *Archivist |
| - // Load the URLs into our state. |
| - t.ar.StreamUrl = streamO.url |
| - t.ar.IndexUrl = indexO.url |
| - t.ar.DataUrl = dataO.url |
| + path types.StreamPath |
| + desc logpb.LogStreamDescriptor |
| + |
| + stream stagingPaths |
| + streamSize int64 |
| + |
| + index stagingPaths |
| + indexSize int64 |
| + |
| + data stagingPaths |
| + dataSize int64 |
| + |
| + finalized bool |
| + terminalIndex int64 |
| + logEntryCount int64 |
| +} |
| +// stage executes the archival process, archiving to the staged storage paths. |
| +// |
| +// If stage fails, it may return a transient error. |
| +func (sa *stagedArchival) stage(c context.Context, complete bool) (err error) { |
| log.Fields{ |
| - "streamURL": t.ar.StreamUrl, |
| - "indexURL": t.ar.IndexUrl, |
| - "dataURL": t.ar.DataUrl, |
| - }.Infof(c, "Archiving log stream...") |
| - |
| - // We want to try and delete any GS objects that were created during a failed |
| - // archival attempt. |
| - deleteOnFail := func(o *gsObject) { |
| - if o == nil || err == nil { |
| - return |
| + "streamURL": sa.stream.staged, |
| + "indexURL": sa.index.staged, |
| + "dataURL": sa.data.staged, |
| + }.Debugf(c, "Staging log stream...") |
| + |
| + // Group any transient errors that occur during cleanup. If we aren't |
| + // returning a non-transient error, return a transient "terr". |
| + var terr errors.MultiError |
| + defer func() { |
| + if err == nil && len(terr) > 0 { |
| + err = errors.WrapTransient(terr) |
| + } |
| + }() |
| + |
| + // Close our writers on exit. If any of them fail to close, mark the archival |
| + // as a transient failure. |
| + closeWriter := func(closer io.Closer, path gs.Path) { |
| + // Close the Writer. If this results in an error, append it to our transient |
| + // error MultiError. |
| + if ierr := closer.Close(); ierr != nil { |
| + terr = append(terr, ierr) |
| } |
| - if ierr := o.delete(); ierr != nil { |
| + |
| + // If we have an archival error, also delete the path associated with this |
| + // stream. This is a non-fatal failure, since we've already hit a fatal |
| + // one. |
| + if err != nil || len(terr) > 0 { |
| + if ierr := sa.GSClient.Delete(path); ierr != nil { |
| + log.Fields{ |
| + log.ErrorKey: ierr, |
| + "path": path, |
| + }.Warningf(c, "Failed to delete stream on error.") |
| + } |
| + } |
| + } |
| + |
| + // createWriter is a shorthand function for creating a writer to a path and |
| + // reporting an error if it failed. |
| + createWriter := func(p gs.Path) (gs.Writer, error) { |
| + w, ierr := sa.GSClient.NewWriter(p) |
| + if ierr != nil { |
| log.Fields{ |
| log.ErrorKey: ierr, |
| - "url": o.url, |
| - }.Warningf(c, "Failed to clean-up GS object on failure.") |
| + "path": p, |
| + }.Errorf(c, "Failed to create writer.") |
| + return nil, ierr |
| } |
| + return w, nil |
| } |
| - defer deleteOnFail(streamO) |
| - defer deleteOnFail(indexO) |
| - defer deleteOnFail(dataO) |
| - |
| - // Close our GS object managers on exit. If any of them fail to close, marh |
| - // the archival as a failure. |
| - closeOM := func(o *gsObject) { |
| - if o == nil { |
| - return |
| - } |
| - if ierr := o.Close(); ierr != nil { |
| - err = ierr |
| - } |
| + |
| + var streamWriter, indexWriter, dataWriter gs.Writer |
| + if streamWriter, err = createWriter(sa.stream.staged); err != nil { |
| + return |
| + } |
| + defer closeWriter(streamWriter, sa.stream.staged) |
| + |
| + if indexWriter, err = createWriter(sa.index.staged); err != nil { |
| + return err |
| + } |
| + defer closeWriter(indexWriter, sa.index.staged) |
| + |
| + if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| + return err |
| } |
| - defer closeOM(streamO) |
| - defer closeOM(indexO) |
| - defer closeOM(dataO) |
| + defer closeWriter(dataWriter, sa.data.staged) |
| // Read our log entries from intermediate storage. |
| ss := storageSource{ |
| Context: c, |
| - st: t.Storage, |
| - path: types.StreamPath(t.Path), |
| - contiguous: t.Complete, |
| - terminalIndex: types.MessageIndex(t.ls.State.TerminalIndex), |
| + st: sa.Storage, |
| + path: sa.path, |
| + contiguous: complete, |
| + terminalIndex: types.MessageIndex(sa.terminalIndex), |
| lastIndex: -1, |
| } |
| m := archive.Manifest{ |
| - Desc: t.desc, |
| + Desc: &sa.desc, |
| Source: &ss, |
| - LogWriter: streamO, |
| - IndexWriter: indexO, |
| - DataWriter: dataO, |
| - StreamIndexRange: t.StreamIndexRange, |
| - PrefixIndexRange: t.PrefixIndexRange, |
| - ByteRange: t.ByteRange, |
| + LogWriter: streamWriter, |
| + IndexWriter: indexWriter, |
| + DataWriter: dataWriter, |
| + StreamIndexRange: sa.StreamIndexRange, |
| + PrefixIndexRange: sa.PrefixIndexRange, |
| + ByteRange: sa.ByteRange, |
| Logger: log.Get(c), |
| } |
| - err = archive.Archive(m) |
| - if err != nil { |
| + if err = archive.Archive(m); err != nil { |
| log.WithError(err).Errorf(c, "Failed to archive log stream.") |
| return |
| } |
| - t.ar.TerminalIndex = int64(ss.lastIndex) |
| - if tidx := t.ls.State.TerminalIndex; tidx != t.ar.TerminalIndex { |
| - // Fail, if we were requested to archive only the complete log. |
| - if t.Complete { |
| - log.Fields{ |
| - "terminalIndex": tidx, |
| - "lastIndex": t.ar.TerminalIndex, |
| - }.Errorf(c, "Log stream archival stopped prior to terminal index.") |
| - return errors.New("stream finished short of terminal index") |
| - } |
| + if tidx := sa.terminalIndex; tidx != int64(ss.lastIndex) { |
| + // Fail if we were requested to archive only the complete log. We consider |
| + // this a transient error with the expectation that the missing entries will |
| + // show up in future retries. |
| + switch { |
| + case complete && ss.hasMissingEntries: |
| + log.Errorf(c, "Log stream has missing entries, but completeness is required.") |
| + err = errors.WrapTransient(errors.New("stream has missing entries")) |
| + return |
| - if t.ar.TerminalIndex < 0 { |
| + case ss.logEntryCount == 0: |
| // If our last log index was <0, then no logs were archived. |
| log.Warningf(c, "No log entries were archived.") |
| - } else { |
| + |
| + default: |
| // Update our terminal index. |
| log.Fields{ |
| - "from": tidx, |
| - "to": t.ar.TerminalIndex, |
| - }.Infof(c, "Updated log stream terminal index.") |
| + "terminalIndex": ss.lastIndex, |
| + "logEntryCount": ss.logEntryCount, |
| + "hasMissingEntries": ss.hasMissingEntries, |
| + }.Debugf(c, "Finished archiving log stream.") |
| } |
| } |
| // Update our state with archival results. |
| - t.ar.Path = t.Path |
| - t.ar.StreamSize = streamO.Count() |
| - t.ar.IndexSize = indexO.Count() |
| - t.ar.DataSize = dataO.Count() |
| - t.ar.Complete = !ss.hasMissingEntries |
| + sa.terminalIndex = int64(ss.lastIndex) |
| + sa.logEntryCount = ss.logEntryCount |
| + sa.stream.count = streamWriter.Count() |
| + sa.index.count = indexWriter.Count() |
| + sa.data.count = dataWriter.Count() |
| return |
| } |
| -func (t *archiveTask) newGSObject(c context.Context, path string, name string) (*gsObject, error) { |
| - p := t.GSBase.Concat(path, name) |
| - o := gsObject{ |
| - gs: t.GSClient, |
| - bucket: p.Bucket(), |
| - path: p.Filename(), |
| - } |
| - |
| - // Build our GS URL. Note that since buildGSPath joins with "/", the initial |
| - // token, "gs:/", will become "gs://". |
| - o.url = string(p) |
| - |
| - var err error |
| - o.Writer, err = t.GSClient.NewWriter(o.bucket, o.path) |
| - if err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "url": o.url, |
| - }.Errorf(c, "Failed to create Writer.") |
| - return nil, err |
| - } |
| - |
| - // Delete any existing object at this path. |
| - if err := o.delete(); err != nil { |
| - closeErr := o.Close() |
| - |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "closeErr": closeErr, |
| - "url": o.url, |
| - }.Errorf(c, "Could not delete object during creation.") |
| - return nil, err |
| - } |
| - return &o, nil |
| +type stagingPaths struct { |
| + staged gs.Path |
| + final gs.Path |
| + count int64 |
| } |
| -// gsObjectManger wraps a gsObject instance with metadata. |
| -type gsObject struct { |
| - gs.Writer |
| - |
| - // gs is the Client instance. |
| - gs gs.Client |
| - // bucket is the name of the object's bucket. |
| - bucket string |
| - // path is the bucket-relative path of the object. |
| - path string |
| - // url is the Google Storage URL (gs://) of this object. |
| - url string |
| +func (d *stagingPaths) clearStaged() { |
| + d.staged = "" |
| } |
| -func (o *gsObject) delete() error { |
| - return o.gs.Delete(o.bucket, o.path) |
| -} |
| - |
| -// storageSource is an archive.LogEntrySource that pulls log entries from |
| -// intermediate storage via its storage.Storage instance. |
| -type storageSource struct { |
| - context.Context |
| - |
| - st storage.Storage // the storage instance to read from |
| - path types.StreamPath // the path of the log stream |
| - contiguous bool // if true, enforce contiguous entries |
| - terminalIndex types.MessageIndex // if >= 0, discard logs beyond this |
| - |
| - buf []*logpb.LogEntry |
| - lastIndex types.MessageIndex |
| - hasMissingEntries bool // true if some log entries were missing. |
| -} |
| - |
| -func (s *storageSource) bufferEntries(start types.MessageIndex) error { |
| - bytes := 0 |
| - |
| - req := storage.GetRequest{ |
| - Path: s.path, |
| - Index: start, |
| - } |
| - return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { |
| - le := logpb.LogEntry{} |
| - if err := proto.Unmarshal(d, &le); err != nil { |
| - log.Fields{ |
| - log.ErrorKey: err, |
| - "streamIndex": idx, |
| - }.Errorf(s, "Failed to unmarshal LogEntry.") |
| - return false |
| - } |
| - s.buf = append(s.buf, &le) |
| - |
| - // Stop loading if we've reached or exceeded our buffer size. |
| - bytes += len(d) |
| - return bytes < storageBufferSize |
| - }) |
| -} |
| +func (sa *stagedArchival) finalize(c context.Context, client gs.Client, ar *logdog.ArchiveStreamRequest) error { |
| + err := parallel.FanOutIn(func(taskC chan<- func() error) { |
| + for _, d := range sa.getStagingPaths() { |
| + d := d |
| -func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { |
| - if len(s.buf) == 0 { |
| - s.buf = s.buf[:0] |
| - if err := s.bufferEntries(s.lastIndex + 1); err != nil { |
| - if err == storage.ErrDoesNotExist { |
| - log.Warningf(s, "Archive target stream does not exist in intermediate storage.") |
| - return nil, archive.ErrEndOfStream |
| + // Don't copy zero-sized streams. |
| + if d.count == 0 { |
| + continue |
| } |
| - log.WithError(err).Errorf(s, "Failed to retrieve log stream from storage.") |
| - return nil, err |
| + taskC <- func() error { |
| + if err := client.Rename(d.staged, d.final); err != nil { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "stagedPath": d.staged, |
| + "finalPath": d.final, |
| + }.Errorf(c, "Failed to rename GS object.") |
| + return err |
| + } |
| + |
| + // Clear the staged value to indicate that it no longer exists. |
| + d.clearStaged() |
| + return nil |
| + } |
| } |
| + }) |
| + if err != nil { |
| + return err |
| } |
| - if len(s.buf) == 0 { |
| - log.Fields{ |
| - "lastIndex": s.lastIndex, |
| - }.Debugf(s, "Encountered end of stream.") |
| - return nil, archive.ErrEndOfStream |
| - } |
| - |
| - var le *logpb.LogEntry |
| - le, s.buf = s.buf[0], s.buf[1:] |
| + ar.TerminalIndex = sa.terminalIndex |
| + ar.LogEntryCount = sa.logEntryCount |
| + ar.StreamUrl = string(sa.stream.final) |
| + ar.StreamSize = sa.stream.count |
| + ar.IndexUrl = string(sa.index.final) |
| + ar.IndexSize = sa.index.count |
| + ar.DataUrl = string(sa.data.final) |
| + ar.DataSize = sa.data.count |
| + return nil |
| +} |
| - // If we're enforcing a contiguous log stream, error if this LogEntry is not |
| - // contiguous. |
| - sidx := types.MessageIndex(le.StreamIndex) |
| - nidx := (s.lastIndex + 1) |
| - if sidx != nidx { |
| - s.hasMissingEntries = true |
| +func (sa *stagedArchival) cleanup(c context.Context) { |
| + for _, d := range sa.getStagingPaths() { |
| + if d.staged == "" { |
| + continue |
| + } |
| - if s.contiguous { |
| + if err := sa.GSClient.Delete(d.staged); err != nil { |
| log.Fields{ |
| - "index": sidx, |
| - "nextIndex": nidx, |
| - }.Errorf(s, "Non-contiguous log stream while enforcing.") |
| - return nil, errors.New("non-contiguous log stream") |
| + log.ErrorKey: err, |
| + "path": d.staged, |
| + }.Warningf(c, "Failed to clean up staged path.") |
| } |
| - } |
| - // If we're enforcing a maximum terminal index, return end of stream if this |
| - // LogEntry exceeds that index. |
| - if s.terminalIndex >= 0 && sidx > s.terminalIndex { |
| - log.Fields{ |
| - "index": sidx, |
| - "terminalIndex": s.terminalIndex, |
| - }.Warningf(s, "Discarding log entries beyond expected terminal index.") |
| - return nil, archive.ErrEndOfStream |
| + d.clearStaged() |
| } |
| +} |
| - s.lastIndex = sidx |
| - return le, nil |
| +func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
| + return []*stagingPaths{ |
| + &sa.stream, |
| + &sa.index, |
| + &sa.data, |
| + } |
| } |