Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Unified Diff: appengine/logdog/coordinator/endpoints/services/archiveStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/archiveStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/archiveStream.go b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
index 2b9530ee2e288418b0aac6c622bfe7801376b33f..e35bae9f5313bbf79d97375afaeac6a4a0a679b8 100644
--- a/appengine/logdog/coordinator/endpoints/services/archiveStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/archiveStream.go
@@ -19,13 +19,18 @@ import (
// ArchiveStream implements the logdog.ServicesServer interface.
func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamRequest) (*google.Empty, error) {
- if err := Auth(c); err != nil {
+ svc := b.GetServices()
+ if err := Auth(c, svc); err != nil {
return nil, err
}
log.Fields{
- "path": req.Path,
- }.Infof(c, "Marking log stream as archived.")
+ "path": req.Path,
+ "complete": req.Complete(),
+ "terminalIndex": req.TerminalIndex,
+ "logEntryCount": req.LogEntryCount,
+ "error": req.Error,
+ }.Infof(c, "Received archival request.")
// Verify that the request is minimially valid.
path := types.StreamPath(req.Path)
@@ -42,40 +47,64 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
ls := coordinator.LogStreamFromPath(path)
- // (Non-transactional) Is the log stream already archived?
- switch err := ds.Get(c).Get(ls); err {
- case nil:
- if ls.Archived() {
- log.Infof(c, "Log stream already marked as archived (non-transactional).")
- return &google.Empty{}, nil
- }
-
- case ds.ErrNoSuchEntity:
- break
-
- default:
- log.WithError(err).Errorf(c, "Failed to check for log stream archvial state.")
- return nil, grpcutil.Internal
- }
+ log.Fields{
+ "id": ls.HashID,
+ }.Infof(c, "Log stream ID.")
// Post the archival results to the Coordinator.
now := clock.Now(c).UTC()
+ var ierr error
err := ds.Get(c).RunInTransaction(func(c context.Context) error {
+ ierr = nil
+
+ // Note that within this transaction, we have two return values:
+ // - Non-nil to abort the transaction.
+ // - Specific error via "ierr".
di := ds.Get(c)
if err := di.Get(ls); err != nil {
return err
}
- if ls.Archived() {
- log.Infof(c, "Log stream already marked as archived.")
+
+ // If our log stream is not in LSArchiveTasked, we will reject this archive
+ // request with FailedPrecondition.
+ switch {
+ case ls.Archived():
+ // Return nil if the log stream is already archived (idempotent).
+ log.Warningf(c, "Log stream is already archived.")
return nil
+
+ case ls.State != coordinator.LSArchiveTasked:
+ log.Fields{
+ "state": ls.State,
+ }.Errorf(c, "Log stream is not in archival tasked state.")
+ ierr = grpcutil.Errf(codes.FailedPrecondition, "Log stream has not tasked an archival.")
+ return ierr
+ }
+
+ // If this request contained an error, we will record an empty archival and
+ // log a warning.
+ if req.Error != "" {
+ log.Fields{
+ "archiveError": req.Error,
+ }.Warningf(c, "Log stream archival indicated error. Archiving empty stream.")
+
+ req.TerminalIndex = -1
+ req.LogEntryCount = 0
}
// Update archival information. Make sure this actually marks the stream as
// archived.
- ls.Updated = now
ls.State = coordinator.LSArchived
- ls.ArchiveWhole = req.Complete
+ ls.ArchivedTime = now
+ ls.ArchivalKey = nil // No point in wasting datastore space on this.
+
+ if ls.TerminalIndex < 0 {
+ // Also set the terminated time.
+ ls.TerminatedTime = now
+ }
ls.TerminalIndex = req.TerminalIndex
+
+ ls.ArchiveLogEntryCount = req.LogEntryCount
ls.ArchiveStreamURL = req.StreamUrl
ls.ArchiveStreamSize = req.StreamSize
ls.ArchiveIndexURL = req.IndexUrl
@@ -84,7 +113,7 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
ls.ArchiveDataSize = req.DataSize
// Update the log stream.
- if err := ls.Put(di); err != nil {
+ if err := di.Put(ls); err != nil {
log.WithError(err).Errorf(c, "Failed to update log stream.")
return err
}
@@ -92,8 +121,12 @@ func (b *Server) ArchiveStream(c context.Context, req *logdog.ArchiveStreamReque
log.Infof(c, "Successfully marked stream as archived.")
return nil
}, nil)
+ if ierr != nil {
+ log.WithError(ierr).Errorf(c, "Failed to mark stream as archived.")
+ return nil, ierr
+ }
if err != nil {
- log.WithError(err).Errorf(c, "Failed to mark stream as archived.")
+ log.WithError(err).Errorf(c, "Internal error.")
return nil, grpcutil.Internal
}

Powered by Google App Engine
This is Rietveld 408576698