| Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| index 2b9530ee2e288418b0aac6c622bfe7801376b33f..e35bae9f5313bbf79d97375afaeac6a4a0a679b8 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
|
| @@ -19,13 +19,18 @@ import (
|
|
|
| // ArchiveStream implements the logdog.ServicesServer interface.
|
| func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) {
|
| - if err := Auth(c); err != nil {
|
| + svc := b.GetServices()
|
| + if err := Auth(c, svc); err != nil {
|
| return nil, err
|
| }
|
|
|
| log.Fields{
|
| - "path": req.Path,
|
| - }.Infof(c, "Marking log stream as archived.")
|
| + "path": req.Path,
|
| + "complete": req.Complete(),
|
| + "terminalIndex": req.TerminalIndex,
|
| + "logEntryCount": req.LogEntryCount,
|
| + "error": req.Error,
|
| + }.Infof(c, "Received archival request.")
|
|
|
| // Verify that the request is minimially valid.
|
| path := types.StreamPath(req.Path)
|
| @@ -42,40 +47,64 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
|
|
| ls := coordinator.LogStreamFromPath(path)
|
|
|
| - // (Non-transactional) Is the log stream already archived?
|
| - switch err := ds.Get(c).Get(ls); err {
|
| - case nil:
|
| - if ls.Archived() {
|
| - log.Infof(c, "Log stream already marked as archived (non-transactional).")
|
| - return &google.Empty{}, nil
|
| - }
|
| -
|
| - case ds.ErrNoSuchEntity:
|
| - break
|
| -
|
| - default:
|
| - log.WithError(err).Errorf(c, "Failed to check for log stream archvial state.")
|
| - return nil, grpcutil.Internal
|
| - }
|
| + log.Fields{
|
| + "id": ls.HashID,
|
| + }.Infof(c, "Log stream ID.")
|
|
|
| // Post the archival results to the Coordinator.
|
| now := clock.Now(c).UTC()
|
| + var ierr error
|
| err := ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| + ierr = nil
|
| +
|
| + // Note that within this transaction, we have two return values:
|
| + // - Non-nil to abort the transaction.
|
| + // - Specific error via "ierr".
|
| di := ds.Get(c)
|
| if err := di.Get(ls); err != nil {
|
| return err
|
| }
|
| - if ls.Archived() {
|
| - log.Infof(c, "Log stream already marked as archived.")
|
| +
|
| + // If our log stream is not in LSArchiveTasked, we will reject this archive
|
| + // request with FailedPrecondition.
|
| + switch {
|
| + case ls.Archived():
|
| + // Return nil if the log stream is already archived (idempotent).
|
| + log.Warningf(c, "Log stream is already archived.")
|
| return nil
|
| +
|
| + case ls.State != coordinator.LSArchiveTasked:
|
| + log.Fields{
|
| + "state": ls.State,
|
| + }.Errorf(c, "Log stream is not in archival tasked state.")
|
| + ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stream has not tasked an archival.")
|
| + return ierr
|
| + }
|
| +
|
| + // If this request contained an error, we will record an empty archival and
|
| + // log a warning.
|
| + if req.Error != "" {
|
| + log.Fields{
|
| + "archiveError": req.Error,
|
| + }.Warningf(c, "Log stream archival indicated error. Archiving empty stream.")
|
| +
|
| + req.TerminalIndex = -1
|
| + req.LogEntryCount = 0
|
| }
|
|
|
| // Update archival information. Make sure this actually marks the stream as
|
| // archived.
|
| - ls.Updated = now
|
| ls.State = coordinator.LSArchived
|
| - ls.ArchiveWhole = req.Complete
|
| + ls.ArchivedTime = now
|
| + ls.ArchivalKey = nil // No point in wasting datastore space on this.
|
| +
|
| + if ls.TerminalIndex < 0 {
|
| + // Also set the terminated time.
|
| + ls.TerminatedTime = now
|
| + }
|
| ls.TerminalIndex = req.TerminalIndex
|
| +
|
| + ls.ArchiveLogEntryCount = req.LogEntryCount
|
| ls.ArchiveStreamURL = req.StreamUrl
|
| ls.ArchiveStreamSize = req.StreamSize
|
| ls.ArchiveIndexURL = req.IndexUrl
|
| @@ -84,7 +113,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
| ls.ArchiveDataSize = req.DataSize
|
|
|
| // Update the log stream.
|
| - if err := ls.Put(di); err != nil {
|
| + if err := di.Put(ls); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to update log stream.")
|
| return err
|
| }
|
| @@ -92,8 +121,12 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
|
| log.Infof(c, "Successfully marked stream as archived.")
|
| return nil
|
| }, nil)
|
| + if ierr != nil {
|
| + log.WithError(ierr).Errorf(c, "Failed to mark stream as archived.")
|
| + return nil, ierr
|
| + }
|
| if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to mark stream as archived.")
|
| + log.WithError(err).Errorf(c, "Internal error.")
|
| return nil, grpcutil.Internal
|
| }
|
|
|
|
|