Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| index 2b9530ee2e288418b0aac6c622bfe7801376b33f..e35bae9f5313bbf79d97375afaeac6a4a0a679b8 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go |
| @@ -19,13 +19,18 @@ import ( |
| // ArchiveStream implements the logdog.ServicesServer interface. |
| func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) { |
| - if err := Auth(c); err != nil { |
| + svc := b.GetServices() |
| + if err := Auth(c, svc); err != nil { |
| return nil, err |
| } |
| log.Fields{ |
| - "path": req.Path, |
| - }.Infof(c, "Marking log stream as archived.") |
| + "path": req.Path, |
| + "complete": req.Complete(), |
| + "terminalIndex": req.TerminalIndex, |
| + "logEntryCount": req.LogEntryCount, |
| + "error": req.Error, |
| + }.Infof(c, "Received archival request.") |
|
dnj
2016/04/11 17:20:04
Makes it much easier to find logs in GAE. I recomm
|
| // Verify that the request is minimially valid. |
| path := types.StreamPath(req.Path) |
| @@ -42,40 +47,64 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| ls := coordinator.LogStreamFromPath(path) |
| - // (Non-transactional) Is the log stream already archived? |
| - switch err := ds.Get(c).Get(ls); err { |
| - case nil: |
| - if ls.Archived() { |
| - log.Infof(c, "Log stream already marked as archived (non-transactional).") |
| - return &google.Empty{}, nil |
| - } |
| - |
| - case ds.ErrNoSuchEntity: |
| - break |
| - |
| - default: |
| - log.WithError(err).Errorf(c, "Failed to check for log stream archvial state.") |
| - return nil, grpcutil.Internal |
| - } |
| + log.Fields{ |
| + "id": ls.HashID, |
| + }.Infof(c, "Log stream ID.") |
| // Post the archival results to the Coordinator. |
| now := clock.Now(c).UTC() |
| + var ierr error |
| err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| + ierr = nil |
| + |
| + // Note that within this transaction, we have two return values: |
| + // - Non-nil to abort the transaction. |
| + // - Specific error via "ierr". |
| di := ds.Get(c) |
| if err := di.Get(ls); err != nil { |
| return err |
| } |
| - if ls.Archived() { |
| - log.Infof(c, "Log stream already marked as archived.") |
| + |
| + // If our log stream is not in LSArchiveTasked, we will reject this archive |
| + // request with FailedPrecondition. |
| + switch { |
| + case ls.Archived(): |
| + // Return nil if the log stream is already archived (idempotent). |
| + log.Warningf(c, "Log stream is already archived.") |
| return nil |
| + |
| + case ls.State != coordinator.LSArchiveTasked: |
| + log.Fields{ |
| + "state": ls.State, |
| + }.Errorf(c, "Log stream is not in archival tasked state.") |
| + ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stream has not tasked an archival.") |
| + return ierr |
| + } |
| + |
| + // If this request contained an error, we will record an empty archival and |
| + // log a warning. |
| + if req.Error != "" { |
| + log.Fields{ |
| + "archiveError": req.Error, |
| + }.Warningf(c, "Log stream archival indicated error. Archiving empty stream.") |
| + |
| + req.TerminalIndex = -1 |
| + req.LogEntryCount = 0 |
| } |
| // Update archival information. Make sure this actually marks the stream as |
| // archived. |
| - ls.Updated = now |
| ls.State = coordinator.LSArchived |
| - ls.ArchiveWhole = req.Complete |
| + ls.ArchivedTime = now |
| + ls.ArchivalKey = nil // No point in wasting datastore space on this. |
| + |
| + if ls.TerminalIndex < 0 { |
| + // Also set the terminated time. |
| + ls.TerminatedTime = now |
| + } |
| ls.TerminalIndex = req.TerminalIndex |
| + |
| + ls.ArchiveLogEntryCount = req.LogEntryCount |
| ls.ArchiveStreamURL = req.StreamUrl |
| ls.ArchiveStreamSize = req.StreamSize |
| ls.ArchiveIndexURL = req.IndexUrl |
| @@ -84,7 +113,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| ls.ArchiveDataSize = req.DataSize |
| // Update the log stream. |
| - if err := ls.Put(di); err != nil { |
| + if err := di.Put(ls); err != nil { |
| log.WithError(err).Errorf(c, "Failed to update log stream.") |
| return err |
| } |
| @@ -92,8 +121,12 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque |
| log.Infof(c, "Successfully marked stream as archived.") |
| return nil |
| }, nil) |
| + if ierr != nil { |
| + log.WithError(ierr).Errorf(c, "Failed to mark stream as archived.") |
| + return nil, ierr |
| + } |
| if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to mark stream as archived.") |
| + log.WithError(err).Errorf(c, "Internal error.") |
| return nil, grpcutil.Internal |
| } |