Chromium Code Reviews| Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go |
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
| index b2afa56a7a297fb18048f2501f721524975111c0..3257619c50b57d4c39857015cf51a1d5908541df 100644 |
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go |
| @@ -6,9 +6,9 @@ package services |
| import ( |
| "crypto/subtle" |
| - "errors" |
| ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/info" |
| "github.com/luci/luci-go/appengine/logdog/coordinator" |
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| "github.com/luci/luci-go/common/clock" |
| @@ -17,58 +17,103 @@ import ( |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/proto/google" |
| "golang.org/x/net/context" |
| - "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| ) |
| -var errAlreadyUpdated = errors.New("already updated") |
| - |
| // TerminateStream is an idempotent stream state terminate operation. |
| -func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) { |
| - if err := Auth(c); err != nil { |
| +func (s *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) { |
| + svc := s.GetServices() |
| + if err := Auth(c, svc); err != nil { |
| return nil, err |
| } |
| + log.Fields{ |
| + "path": req.Path, |
| + "terminalIndex": req.TerminalIndex, |
| + }.Infof(c, "Request to terminate log stream.") |
| + |
| + if req.TerminalIndex < 0 { |
| + return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.") |
| + } |
| + |
| path := types.StreamPath(req.Path) |
| if err := path.Validate(); err != nil { |
| return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (%s): %s", req.Path, err) |
| } |
| - c = log.SetField(c, "path", req.Path) |
| - if req.TerminalIndex < 0 { |
| - return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.") |
| + _, cfg, err := svc.Config(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to load configuration.") |
| + return nil, grpcutil.Internal |
| + } |
| + |
| + ap, err := svc.ArchivalPublisher(c) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get archival publisher instance.") |
| + return nil, grpcutil.Internal |
| } |
| // Initialize our log stream. This cannot fail since we have already validated |
| // req.Path. |
| ls := coordinator.LogStreamFromPath(path) |
| - switch err := updateTerminalIndex(c, ls, req); err { |
| - case errAlreadyUpdated: |
| - return &google.Empty{}, nil |
| - |
| - // To be confirmed/resolved transactionally. |
| - case nil: |
| - break |
| - default: |
| - // Because we're not in a transaction, forgive a "not found" status. |
| - if grpc.Code(err) != codes.NotFound { |
| - log.WithError(err).Errorf(c, "Failed to check LogStream status.") |
| - return nil, err |
| - } |
| + |
| + // Initialize our archival parameters. |
| + params := coordinator.ArchivalParams{ |
| + RequestID: info.Get(c).RequestID(), |
| + SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(), |
| + CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(), |
| } |
| - // Transactionally update. |
| - now := clock.Now(c).UTC() |
| - err := ds.Get(c).RunInTransaction(func(c context.Context) error { |
| - di := ds.Get(c) |
| + // Transactionally validate and update the terminal index. |
| + err = ds.Get(c).RunInTransaction(func(c context.Context) error { |
| + if err := ds.Get(c).Get(ls); err != nil { |
| + if err == ds.ErrNoSuchEntity { |
| + log.Debugf(c, "LogEntry not found.") |
| + return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path) |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to load LogEntry.") |
| + return grpcutil.Internal |
| + } |
| + |
| + switch { |
| + case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1: |
| + log.Errorf(c, "Secrets do not match.") |
| + return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.") |
| + |
| + case ls.State > coordinator.LSStreaming: |
| + // Succeed if this is non-conflicting (idempotent). |
| + if ls.TerminalIndex == req.TerminalIndex { |
| + log.Fields{ |
| + "state": ls.State.String(), |
| + "terminalIndex": ls.TerminalIndex, |
| + }.Infof(c, "Log stream is already terminated.") |
| + return nil |
| + } |
| + |
| + log.Fields{ |
| + "state": ls.State.String(), |
| + "terminalIndex": ls.TerminalIndex, |
| + }.Warningf(c, "Log stream is not in streaming state.") |
| + return grpcutil.Errf(codes.FailedPrecondition, "Log stream is not in streaming state.") |
| - // Load the log stream state. |
| - switch err := updateTerminalIndex(c, ls, req); err { |
| - case nil: |
| - ls.Updated = now |
| - ls.State = coordinator.LSTerminated |
| + default: |
| + // Everything looks good, let's proceed... |
| + ls.TerminalIndex = req.TerminalIndex |
| + ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC()) |
| + |
| + // Create an archival task. |
| + if err := params.PublishTask(c, ap, ls); err != nil { |
| + if err == coordinator.ErrArchiveTasked { |
|
dnj
2016/04/11 17:20:04
Note that this will never happen, since this error
iannucci
2016/04/19 00:55:28
Acknowledged.
|
| + log.Warningf(c, "Archival has already been tasked for this stream.") |
| + return nil |
| + } |
| + |
| + log.WithError(err).Errorf(c, "Failed to create archive task.") |
| + return grpcutil.Internal |
| + } |
| - if err := ls.Put(di); err != nil { |
| + if err := ds.Get(c).Put(ls); err != nil { |
| log.Fields{ |
| log.ErrorKey: err, |
| }.Errorf(c, "Failed to Put() LogStream.") |
| @@ -77,14 +122,8 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR |
| log.Fields{ |
| "terminalIndex": ls.TerminalIndex, |
| - }.Infof(c, "Terminal index was set.") |
| + }.Infof(c, "Terminal index was set and archival was dispatched.") |
| return nil |
| - |
| - case errAlreadyUpdated: |
| - return nil |
| - |
| - default: |
| - return err |
| } |
| }, nil) |
| if err != nil { |
| @@ -96,39 +135,3 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR |
| return &google.Empty{}, nil |
| } |
| - |
| -func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logdog.TerminateStreamRequest) error { |
| - if err := ds.Get(c).Get(ls); err != nil { |
| - if err == ds.ErrNoSuchEntity { |
| - log.Debugf(c, "LogEntry not found.") |
| - return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path) |
| - } |
| - |
| - log.WithError(err).Errorf(c, "Failed to load LogEntry.") |
| - return grpcutil.Internal |
| - } |
| - |
| - if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 { |
| - log.Errorf(c, "Secrets do not match.") |
| - return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.") |
| - } |
| - |
| - switch { |
| - case ls.TerminalIndex == req.TerminalIndex: |
| - // Idempotent: already updated to this value. |
| - log.Debugf(c, "Log stream is already updated (idempotent).") |
| - return errAlreadyUpdated |
| - |
| - case ls.Terminated(): |
| - // Terminated, but with a different value. |
| - log.Fields{ |
| - "current": ls.TerminalIndex, |
| - "requested": req.TerminalIndex, |
| - }.Warningf(c, "Refusing to change terminal index.") |
| - return grpcutil.Errf(codes.AlreadyExists, "Terminal index is already set.") |
| - |
| - default: |
| - ls.TerminalIndex = req.TerminalIndex |
| - return nil |
| - } |
| -} |