| Index: appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/terminateStream.go b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| index b2afa56a7a297fb18048f2501f721524975111c0..3257619c50b57d4c39857015cf51a1d5908541df 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/terminateStream.go
|
| @@ -6,9 +6,9 @@ package services
|
|
|
| import (
|
| "crypto/subtle"
|
| - "errors"
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/info"
|
| "github.com/luci/luci-go/appengine/logdog/coordinator"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock"
|
| @@ -17,58 +17,103 @@ import (
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/proto/google"
|
| "golang.org/x/net/context"
|
| - "google.golang.org/grpc"
|
| "google.golang.org/grpc/codes"
|
| )
|
|
|
| -var errAlreadyUpdated = errors.New("already updated")
|
| -
|
| // TerminateStream is an idempotent stream state terminate operation.
|
| -func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) {
|
| - if err := Auth(c); err != nil {
|
| +func (s *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamRequest) (*google.Empty, error) {
|
| + svc := s.GetServices()
|
| + if err := Auth(c, svc); err != nil {
|
| return nil, err
|
| }
|
|
|
| + log.Fields{
|
| + "path": req.Path,
|
| + "terminalIndex": req.TerminalIndex,
|
| + }.Infof(c, "Request to terminate log stream.")
|
| +
|
| + if req.TerminalIndex < 0 {
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.")
|
| + }
|
| +
|
| path := types.StreamPath(req.Path)
|
| if err := path.Validate(); err != nil {
|
| return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (%s): %s", req.Path, err)
|
| }
|
| - c = log.SetField(c, "path", req.Path)
|
|
|
| - if req.TerminalIndex < 0 {
|
| - return nil, grpcutil.Errf(codes.InvalidArgument, "Negative terminal index.")
|
| + _, cfg, err := svc.Config(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to load configuration.")
|
| + return nil, grpcutil.Internal
|
| + }
|
| +
|
| + ap, err := svc.ArchivalPublisher(c)
|
| + if err != nil {
|
| + log.WithError(err).Errorf(c, "Failed to get archival publisher instance.")
|
| + return nil, grpcutil.Internal
|
| }
|
|
|
| // Initialize our log stream. This cannot fail since we have already validated
|
| // req.Path.
|
| ls := coordinator.LogStreamFromPath(path)
|
| - switch err := updateTerminalIndex(c, ls, req); err {
|
| - case errAlreadyUpdated:
|
| - return &google.Empty{}, nil
|
| -
|
| - // To be confirmed/resolved transactionally.
|
| - case nil:
|
| - break
|
| - default:
|
| - // Because we're not in a transaction, forgive a "not found" status.
|
| - if grpc.Code(err) != codes.NotFound {
|
| - log.WithError(err).Errorf(c, "Failed to check LogStream status.")
|
| - return nil, err
|
| - }
|
| +
|
| + // Initialize our archival parameters.
|
| + params := coordinator.ArchivalParams{
|
| + RequestID: info.Get(c).RequestID(),
|
| + SettleDelay: cfg.Coordinator.ArchiveSettleDelay.Duration(),
|
| + CompletePeriod: cfg.Coordinator.ArchiveDelayMax.Duration(),
|
| }
|
|
|
| - // Transactionally update.
|
| - now := clock.Now(c).UTC()
|
| - err := ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| - di := ds.Get(c)
|
| + // Transactionally validate and update the terminal index.
|
| + err = ds.Get(c).RunInTransaction(func(c context.Context) error {
|
| + if err := ds.Get(c).Get(ls); err != nil {
|
| + if err == ds.ErrNoSuchEntity {
|
| + log.Debugf(c, "LogEntry not found.")
|
| + return grpcutil.Errf(codes.NotFound, "Log stream %q is not registered", req.Path)
|
| + }
|
| +
|
| + log.WithError(err).Errorf(c, "Failed to load LogEntry.")
|
| + return grpcutil.Internal
|
| + }
|
| +
|
| + switch {
|
| + case subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1:
|
| + log.Errorf(c, "Secrets do not match.")
|
| + return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
|
| +
|
| + case ls.State > coordinator.LSStreaming:
|
| + // Succeed if this is non-conflicting (idempotent).
|
| + if ls.TerminalIndex == req.TerminalIndex {
|
| + log.Fields{
|
| + "state": ls.State.String(),
|
| + "terminalIndex": ls.TerminalIndex,
|
| + }.Infof(c, "Log stream is already terminated.")
|
| + return nil
|
| + }
|
| +
|
| + log.Fields{
|
| + "state": ls.State.String(),
|
| + "terminalIndex": ls.TerminalIndex,
|
| + }.Warningf(c, "Log stream is not in streaming state.")
|
| + return grpcutil.Errf(codes.FailedPrecondition, "Log stream is not in streaming state.")
|
|
|
| - // Load the log stream state.
|
| - switch err := updateTerminalIndex(c, ls, req); err {
|
| - case nil:
|
| - ls.Updated = now
|
| - ls.State = coordinator.LSTerminated
|
| + default:
|
| + // Everything looks good, let's proceed...
|
| + ls.TerminalIndex = req.TerminalIndex
|
| + ls.TerminatedTime = ds.RoundTime(clock.Now(c).UTC())
|
| +
|
| + // Create an archival task.
|
| + if err := params.PublishTask(c, ap, ls); err != nil {
|
| + if err == coordinator.ErrArchiveTasked {
|
| + log.Warningf(c, "Archival has already been tasked for this stream.")
|
| + return nil
|
| + }
|
| +
|
| + log.WithError(err).Errorf(c, "Failed to create archive task.")
|
| + return grpcutil.Internal
|
| + }
|
|
|
| - if err := ls.Put(di); err != nil {
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| }.Errorf(c, "Failed to Put() LogStream.")
|
| @@ -77,14 +122,8 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
|
|
| log.Fields{
|
| "terminalIndex": ls.TerminalIndex,
|
| - }.Infof(c, "Terminal index was set.")
|
| + }.Infof(c, "Terminal index was set and archival was dispatched.")
|
| return nil
|
| -
|
| - case errAlreadyUpdated:
|
| - return nil
|
| -
|
| - default:
|
| - return err
|
| }
|
| }, nil)
|
| if err != nil {
|
| @@ -96,39 +135,3 @@ func (b *Server) TerminateStream(c context.Context, req *logdog.TerminateStreamR
|
|
|
| return &google.Empty{}, nil
|
| }
|
| -
|
| -func updateTerminalIndex(c context.Context, ls *coordinator.LogStream, req *logdog.TerminateStreamRequest) error {
|
| - if err := ds.Get(c).Get(ls); err != nil {
|
| - if err == ds.ErrNoSuchEntity {
|
| - log.Debugf(c, "LogEntry not found.")
|
| - return grpcutil.Errf(codes.NotFound, "Log stream [%s] is not registered", req.Path)
|
| - }
|
| -
|
| - log.WithError(err).Errorf(c, "Failed to load LogEntry.")
|
| - return grpcutil.Internal
|
| - }
|
| -
|
| - if subtle.ConstantTimeCompare(ls.Secret, req.Secret) != 1 {
|
| - log.Errorf(c, "Secrets do not match.")
|
| - return grpcutil.Errf(codes.InvalidArgument, "Request secret doesn't match the stream secret.")
|
| - }
|
| -
|
| - switch {
|
| - case ls.TerminalIndex == req.TerminalIndex:
|
| - // Idempotent: already updated to this value.
|
| - log.Debugf(c, "Log stream is already updated (idempotent).")
|
| - return errAlreadyUpdated
|
| -
|
| - case ls.Terminated():
|
| - // Terminated, but with a different value.
|
| - log.Fields{
|
| - "current": ls.TerminalIndex,
|
| - "requested": req.TerminalIndex,
|
| - }.Warningf(c, "Refusing to change terminal index.")
|
| - return grpcutil.Errf(codes.AlreadyExists, "Terminal index is already set.")
|
| -
|
| - default:
|
| - ls.TerminalIndex = req.TerminalIndex
|
| - return nil
|
| - }
|
| -}
|
|
|