| Index: appengine/cmd/dm/model/execution.go
|
| diff --git a/appengine/cmd/dm/model/execution.go b/appengine/cmd/dm/model/execution.go
|
| index 17bbdfdb875e5a7b31d64a65f1ab766cbac2e96d..176749a613558e2ccd8e9a422a67c7c291b03c6d 100644
|
| --- a/appengine/cmd/dm/model/execution.go
|
| +++ b/appengine/cmd/dm/model/execution.go
|
| @@ -16,7 +16,9 @@ import (
|
| "golang.org/x/net/context"
|
|
|
| "github.com/luci/gae/service/datastore"
|
| - "github.com/luci/luci-go/common/api/dm/service/v1"
|
| + dm "github.com/luci/luci-go/common/api/dm/service/v1"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/cryptorand"
|
| "github.com/luci/luci-go/common/grpcutil"
|
| "github.com/luci/luci-go/common/logging"
|
| google_pb "github.com/luci/luci-go/common/proto/google"
|
| @@ -30,12 +32,31 @@ type Execution struct {
|
| ID uint32 `gae:"$id"`
|
| Attempt *datastore.Key `gae:"$parent"`
|
|
|
| - State dm.Execution_State
|
| - StateReason string `gae:",noindex"`
|
| + Created time.Time
|
| + Modified time.Time
|
|
|
| - Created time.Time
|
| - DistributorToken string
|
| - DistributorURL string `gae:",noindex"`
|
| + DistributorConfigName string
|
| + DistributorConfigVersion string
|
| + DistributorToken string
|
| +
|
| + State dm.Execution_State
|
| +
|
| + // Only valid in the ABNORMAL_FINISHED state.
|
| + AbnormalFinish dm.AbnormalFinish
|
| +
|
| + // Only valid in the FINISHED state.
|
| + ResultPersistentState []byte `gae:",noindex"`
|
| +
|
| + // These are DM's internal mechanism for performing timeout actions on
|
| + // Executions.
|
| + //
|
| + // The TimeTo* variables are filled in by the distributor when this Execution
|
| + // is created.
|
| + //
|
| + // The Timeout is only active when the Execution is in a non-terminal state.
|
| + TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING
|
| + TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING
|
| + TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED
|
|
|
| // Token is a randomized nonce that's used to verify that RPCs verify from the
|
| // expected client (the client that's currently running the Execution). The
|
| @@ -43,21 +64,77 @@ type Execution struct {
|
| //
|
| // When the Execution is handed to the distributor, the Token is randomly
|
| // generated by DM and passed to the distributor. The State of the Execution
|
| - // starts as Scheduled. This token may be used by the client to "activate" the
|
| + // starts as SCHEDULED. This token may be used by the client to "activate" the
|
| // Execution with the ActivateExecution rpc. At that point, the client
|
| - // provides a new random token, the Execution State moves from Scheduled to
|
| - // Running, and Token assumes the new value. As long as the Execution State is
|
| - // running, the client may continue to use that new Token value to
|
| + // provides a new random token, the Execution State moves from SCHEDULED to
|
| + // RUNNING, and Token assumes the new value. As long as the Execution State is
|
| + // RUNNING, the client may continue to use that new Token value to
|
| // authenticate other rpc's like AddDeps and FinishAttempt.
|
| //
|
| - // As soon as the Execution is no longer supposed to have access, Token will
|
| - // be nil'd out.
|
| + // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED
|
| + // state, this will be nil'd out.
|
| Token []byte `gae:",noindex"`
|
| }
|
|
|
| -// Revoke will null-out the Token and Put this Execution to the datastore.
|
| +// MakeExecution makes a new Execution in the SCHEDULING state, with a new
|
| +// random Token.
|
| +func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution {
|
| + now := clock.Now(c).UTC()
|
| + ret := &Execution{
|
| + ID: e.Id,
|
| + Attempt: AttemptKeyFromID(c, e.AttemptID()),
|
| +
|
| + Created: now,
|
| + Modified: now,
|
| +
|
| + DistributorConfigName: cfgName,
|
| + DistributorConfigVersion: cfgVers,
|
| +
|
| + Token: MakeRandomToken(c, dm.MinimumActivationTokenLength),
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +// ModifyState changes the current state of this Execution and updates its
|
| +// Modified timestamp.
|
| +func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error {
|
| + if e.State == newState {
|
| + return nil
|
| + }
|
| + if err := e.State.Evolve(newState); err != nil {
|
| + return err
|
| + }
|
| + now := clock.Now(c).UTC()
|
| + if now.After(e.Modified) {
|
| + e.Modified = now
|
| + } else {
|
| + // Microsecond is the smallest granularity that datastore can store
|
| + // timestamps, so use that to disambiguate: the goal here is that any
|
| + // modification always increments the modified time, and never decrements
|
| + // it.
|
| + e.Modified = e.Modified.Add(time.Microsecond)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +// MakeRandomToken creates a cryptographically random byte slice of the
|
| +// specified length. It panics if the specified length cannot be read in full.
|
| +func MakeRandomToken(c context.Context, l uint32) []byte {
|
| + rtok := make([]byte, l)
|
| + if _, err := cryptorand.Read(c, rtok); err != nil {
|
| + panic(err)
|
| + }
|
| + return rtok
|
| +}
|
| +
|
| +// Revoke will clear the Token and Put this Execution to the datastore. This
|
| +// action requires the Execution to be in the RUNNING state, and causes it to
|
| +// enter the STOPPING state.
|
| func (e *Execution) Revoke(c context.Context) error {
|
| e.Token = nil
|
| + if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil {
|
| + return err
|
| + }
|
| return datastore.Get(c).Put(e)
|
| }
|
|
|
| @@ -69,7 +146,8 @@ func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec
|
| err = datastore.Get(c).Get(a, e)
|
|
|
| if err != nil {
|
| - err = fmt.Errorf("couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
|
| + err = grpcutil.Errf(codes.Internal,
|
| + "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
|
| return
|
| }
|
|
|
| @@ -102,6 +180,14 @@ func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a
|
| return
|
| }
|
|
|
| +func makeError(err error, msg string) error {
|
| + code := grpcutil.Code(err)
|
| + if code == codes.Unknown {
|
| + code = codes.PermissionDenied
|
| + }
|
| + return grpcutil.Errf(code, msg)
|
| +}
|
| +
|
| // AuthenticateExecution verifies that the Attempt is executing, and that evkey
|
| // matches the execution key of the current Execution for this Attempt.
|
| //
|
| @@ -110,7 +196,7 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
|
| a, e, err = verifyExecutionAndCheckExTok(c, auth)
|
| if err != nil {
|
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
|
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth")
|
| + err = makeError(err, "requires execution Auth")
|
| }
|
| return a, e, err
|
| }
|
| @@ -122,14 +208,14 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
|
| func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
|
| if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil {
|
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
|
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth")
|
| + err = makeError(err, "requires execution Auth")
|
| return
|
| }
|
|
|
| err = e.Revoke(c)
|
| if err != nil {
|
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution")
|
| - err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth")
|
| + err = makeError(err, "unable to invalidate Auth")
|
| }
|
| return
|
| }
|
| @@ -146,7 +232,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
|
| }
|
|
|
| switch e.State {
|
| - case dm.Execution_SCHEDULED:
|
| + case dm.Execution_SCHEDULING:
|
| if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
|
| err = errors.New("incorrect ActivationToken")
|
| return
|
| @@ -155,6 +241,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
|
| e.State.MustEvolve(dm.Execution_RUNNING)
|
| e.Token = actTok
|
| err = datastore.Get(c).Put(e)
|
| + logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id)
|
|
|
| case dm.Execution_RUNNING:
|
| if subtle.ConstantTimeCompare(e.Token, actTok) != 1 {
|
| @@ -164,6 +251,8 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
|
| // by the same client, so there's no error, or it's wrong which means it's
|
| // a retry by a different client.
|
|
|
| + logging.Infof(c, "already activated execution %s", auth.Id)
|
| +
|
| default:
|
| err = fmt.Errorf("Execution is in wrong state")
|
| }
|
| @@ -180,28 +269,58 @@ func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by
|
| a, e, err = verifyExecutionAndActivate(c, auth, actToken)
|
| if err != nil {
|
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution")
|
| - err = grpcutil.Errf(codes.Unauthenticated, "failed to activate execution Auth")
|
| + err = makeError(err, "failed to activate execution Auth")
|
| }
|
| return a, e, err
|
| }
|
|
|
| +// GetEID gets an Execution_ID for this Execution. It panics if the Execution
|
| +// is in an invalid state.
|
| +func (e *Execution) GetEID() *dm.Execution_ID {
|
| + aid := &dm.Attempt_ID{}
|
| + if e.ID == 0 {
|
| + panic("cannot create valid Execution_ID with 0-value ID field")
|
| + }
|
| + if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
|
| + panic(err)
|
| + }
|
| + return dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
|
| +}
|
| +
|
| // ToProto returns a dm proto version of this Execution.
|
| func (e *Execution) ToProto(includeID bool) *dm.Execution {
|
| - ret := &dm.Execution{
|
| - Data: &dm.Execution_Data{
|
| - State: e.State,
|
| - StateReason: e.StateReason,
|
| - Created: google_pb.NewTimestamp(e.Created),
|
| - DistributorToken: e.DistributorToken,
|
| - DistributorInfoUrl: e.DistributorURL,
|
| - },
|
| - }
|
| + ret := &dm.Execution{Data: e.DataProto()}
|
| if includeID {
|
| - aid := &dm.Attempt_ID{}
|
| - if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
|
| - panic(err)
|
| - }
|
| - ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
|
| + ret.Id = e.GetEID()
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +// DataProto returns an Execution.Data message for this Execution.
|
| +//
|
| +// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for
|
| +// package cyclical import reasons.
|
| +func (e *Execution) DataProto() (ret *dm.Execution_Data) {
|
| + switch e.State {
|
| + case dm.Execution_SCHEDULING:
|
| + ret = dm.NewExecutionScheduling().Data
|
| + case dm.Execution_RUNNING:
|
| + ret = dm.NewExecutionRunning().Data
|
| + case dm.Execution_STOPPING:
|
| + ret = dm.NewExecutionStopping().Data
|
| + case dm.Execution_FINISHED:
|
| + ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).Data
|
| + case dm.Execution_ABNORMAL_FINISHED:
|
| + ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data
|
| + default:
|
| + panic(fmt.Errorf("unknown Execution_State: %s", e.State))
|
| + }
|
| + ret.Created = google_pb.NewTimestamp(e.Created)
|
| + ret.Modified = google_pb.NewTimestamp(e.Modified)
|
| + ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{
|
| + ConfigName: e.DistributorConfigName,
|
| + ConfigVersion: e.DistributorConfigVersion,
|
| + Token: e.DistributorToken,
|
| }
|
| return ret
|
| }
|
|
|