Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2210)

Unified Diff: appengine/logdog/coordinator/endpoints/services/terminateStream.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
index b2afa56a7a297fb18048f2501f721524975111c0..3257619c50b57d4c39857015cf51a1d5908541df 100644
--- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
+++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
@@ -6,9 +6,9 @@ package services
import (
"crypto/subtle"
- "errors"
ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
"github.com/luci/luci-go/appengine/logdog/coordinator"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock"
@@ -17,58 +17,103 @@ import (
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/proto/google"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
-var errAlreadyUpdated = errors.New("already updated")
-
// TerminateStream is an idempotent stream state terminate operation.
-func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) {
- if err := Auth(c); err != nil {
+func (s *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) {
+ svc := s.GetServices()
+ if err := Auth(c, svc); err != nil {
return nil, err
}
+ log.Fields{
+ "path": req.Path,
+ "terminalIndex": req.TerminalIndex,
+ }.Infof(c, "Request to terminate log stream.")
+
+ if req.TerminalIndex < 0 {
+ return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.")
+ }
+
path := types.StreamPath(req.Path)
if err := path.Validate(); err != nil {
return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (%s): %s", req.Path, err)
}
- c = log.SetField(c, "path", req.Path)
- if req.TerminalIndex < 0 {
- return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.")
+ _, cfg, err := svc.Config(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to load configuration.")
+ return nil, grpcutil.Internal
+ }
+
+ ap, err := svc.ArchivalPublisher(c)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get archival publisher instance.")
+ return nil, grpcutil.Internal
}
// Initialize our log stream. This cannot fail since we have already validated
// req.Path.
ls := coordinator.LogStreamFromPath(path)
- switch err := updateTerminalIndex(c, ls, req); err {
- case errAlreadyUpdated:
- return &google.Empty{}, nil
-
- // To be confirmed/resolved transactionally.
- case nil:
- break
- default:
- // Because we're not in a transaction, forgive a "not found" status.
- if grpc.Code(err) != codes.NotFound {
- log.WithError(err).Errorf(c, "Failed to check LogStream status.")
- return nil, err
- }
+
+ // Initialize our archival parameters.
+ params := coordinator.ArchivalParams{
+ RequestID: info.Get(c).RequestID(),
+ SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
+ CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
}
- // Transactionally update.
- now := clock.Now(c).UTC()
- err := ds.Get(c).RunInTransaction(func(c context.Context) error {
- di := ds.Get(c)
+ // Transactionally validate and update the terminal index.
+ err = ds.Get(c).RunInTransaction(func(c context.Context) error {
+ if err := ds.Get(c).Get(ls); err != nil {
+ if err == ds.ErrNoSuchEntity {
+ log.Debugf(c, "LogEntry not found.")
+ return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
+ }
+
+ log.WithError(err).Errorf(c, "Failed to load LogEntry.")
+ return grpcutil.Internal
+ }
+
+ switch {
+ case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1:
+ log.Errorf(c, "Secrets do not match.")
+ return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
+
+ case ls.State > coordinator.LSStreaming:
+ // Succeed if this is non-conflicting (idempotent).
+ if ls.TerminalIndex == req.TerminalIndex {
+ log.Fields{
+ "state": ls.State.String(),
+ "terminalIndex": ls.TerminalIndex,
+ }.Infof(c, "Log stream is already terminated.")
+ return nil
+ }
+
+ log.Fields{
+ "state": ls.State.String(),
+ "terminalIndex": ls.TerminalIndex,
+ }.Warningf(c, "Log stream is not in streaming state.")
+ return grpcutil.Errf(codes.FailedPrecondition, "Log stream is not in streaming state.")
- // Load the log stream state.
- switch err := updateTerminalIndex(c, ls, req); err {
- case nil:
- ls.Updated = now
- ls.State = coordinator.LSTerminated
+ default:
+ // Everything looks good, let's proceed...
+ ls.TerminalIndex = req.TerminalIndex
+ ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC())
+
+ // Create an archival task.
+ if err := params.PublishTask(c, ap, ls); err != nil {
+ if err == coordinator.ErrArchiveTasked {
dnj 2016/04/11 17:20:04 Note that this will never happen, since this error
iannucci 2016/04/19 00:55:28 Acknowledged.
+ log.Warningf(c, "Archival has already been tasked for this stream.")
+ return nil
+ }
+
+ log.WithError(err).Errorf(c, "Failed to create archive task.")
+ return grpcutil.Internal
+ }
- if err := ls.Put(di); err != nil {
+ if err := ds.Get(c).Put(ls); err != nil {
log.Fields{
log.ErrorKey: err,
}.Errorf(c, "Failed to Put() LogStream.")
@@ -77,14 +122,8 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
log.Fields{
"terminalIndex": ls.TerminalIndex,
- }.Infof(c, "Terminal index was set.")
+ }.Infof(c, "Terminal index was set and archival was dispatched.")
return nil
-
- case errAlreadyUpdated:
- return nil
-
- default:
- return err
}
}, nil)
if err != nil {
@@ -96,39 +135,3 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
return &google.Empty{}, nil
}
-
-func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logdog.TerminateStreamRequest) error {
- if err := ds.Get(c).Get(ls); err != nil {
- if err == ds.ErrNoSuchEntity {
- log.Debugf(c, "LogEntry not found.")
- return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path)
- }
-
- log.WithError(err).Errorf(c, "Failed to load LogEntry.")
- return grpcutil.Internal
- }
-
- if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 {
- log.Errorf(c, "Secrets do not match.")
- return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
- }
-
- switch {
- case ls.TerminalIndex == req.TerminalIndex:
- // Idempotent: already updated to this value.
- log.Debugf(c, "Log stream is already updated (idempotent).")
- return errAlreadyUpdated
-
- case ls.Terminated():
- // Terminated, but with a different value.
- log.Fields{
- "current": ls.TerminalIndex,
- "requested": req.TerminalIndex,
- }.Warningf(c, "Refusing to change terminal index.")
- return grpcutil.Errf(codes.AlreadyExists, "Terminal index is already set.")
-
- default:
- ls.TerminalIndex = req.TerminalIndex
- return nil
- }
-}

Powered by Google App Engine
This is Rietveld 408576698