Chromium Code Reviews| Index: appengine/cmd/dm/model/execution.go | 
| diff --git a/appengine/cmd/dm/model/execution.go b/appengine/cmd/dm/model/execution.go | 
| index c0b233512f494ee54cec0701c2f6a981ba739703..99413e1987fdf93871dcd21da9d0628d50c2a846 100644 | 
| --- a/appengine/cmd/dm/model/execution.go | 
| +++ b/appengine/cmd/dm/model/execution.go | 
| @@ -16,7 +16,9 @@ import ( | 
| "golang.org/x/net/context" | 
| "github.com/luci/gae/service/datastore" | 
| - "github.com/luci/luci-go/common/api/dm/service/v1" | 
| + dm "github.com/luci/luci-go/common/api/dm/service/v1" | 
| + "github.com/luci/luci-go/common/clock" | 
| + "github.com/luci/luci-go/common/cryptorand" | 
| "github.com/luci/luci-go/common/grpcutil" | 
| "github.com/luci/luci-go/common/logging" | 
| google_pb "github.com/luci/luci-go/common/proto/google" | 
| @@ -30,12 +32,31 @@ type Execution struct { | 
| ID uint32 `gae:"$id"` | 
| Attempt *datastore.Key `gae:"$parent"` | 
| - State dm.Execution_State | 
| - StateReason string `gae:",noindex"` | 
| + Created time.Time | 
| + Modified time.Time | 
| - Created time.Time | 
| - DistributorToken string | 
| - DistributorURL string `gae:",noindex"` | 
| + DistributorConfigName string | 
| + DistributorConfigVersion string | 
| + DistributorToken string | 
| + | 
| + State dm.Execution_State | 
| + | 
| + // Only valid in the ABNORMAL_FINISHED state. | 
| + AbnormalFinish dm.AbnormalFinish | 
| + | 
| + // Only valid in the FINISHED state. | 
| + ResultPersistentState string | 
| + | 
| + // These are DM's internal mechanism for performing timeout actions on | 
| + // Executions. | 
| + // | 
| + // The TimeTo* variables are filled in by the distributor when this Execution | 
| + // is created. | 
| + // | 
| + // The Timeout is only active when the Execution is in a non-terminal state. | 
| + TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING | 
| + TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING | 
| + TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED | 
| 
 
iannucci
2016/06/08 02:54:24
timeouts!
 
 | 
| // Token is a randomized nonce that's used to verify that RPCs verify from the | 
| // expected client (the client that's currently running the Execution). The | 
| @@ -43,21 +64,77 @@ type Execution struct { | 
| // | 
| // When the Execution is handed to the distributor, the Token is randomly | 
| // generated by DM and passed to the distributor. The State of the Execution | 
| - // starts as Scheduled. This token may be used by the client to "activate" the | 
| + // starts as SCHEDULED. This token may be used by the client to "activate" the | 
| // Execution with the ActivateExecution rpc. At that point, the client | 
| - // provides a new random token, the Execution State moves from Scheduled to | 
| - // Running, and Token assumes the new value. As long as the Execution State is | 
| - // running, the client may continue to use that new Token value to | 
| + // provides a new random token, the Execution State moves from SCHEDULED to | 
| + // RUNNING, and Token assumes the new value. As long as the Execution State is | 
| + // RUNNING, the client may continue to use that new Token value to | 
| // authenticate other rpc's like AddDeps and FinishAttempt. | 
| // | 
| - // As soon as the Execution is no longer supposed to have access, Token will | 
| - // be nil'd out. | 
| + // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED | 
| + // state, this will be nil'd out. | 
| Token []byte `gae:",noindex"` | 
| } | 
| -// Revoke will null-out the Token and Put this Execution to the datastore. | 
| +// MakeExecution makes a new Execution in the SCHEDULING state, with a new | 
| +// random Token. | 
| +func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution { | 
| + now := clock.Now(c).UTC() | 
| + ret := &Execution{ | 
| + ID: e.Id, | 
| + Attempt: datastore.Get(c).MakeKey("Attempt", e.AttemptID().DMEncoded()), | 
| 
 
dnj (Google)
2016/06/09 18:00:56
AttemptKeyFromID?
 
iannucci
2016/06/15 00:46:01
good catch, I tried to find these all but apparent
 
 | 
| + | 
| + Created: now, | 
| + Modified: now, | 
| + | 
| + DistributorConfigName: cfgName, | 
| + DistributorConfigVersion: cfgVers, | 
| + | 
| + Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), | 
| + } | 
| + return ret | 
| +} | 
| + | 
| +// ModifyState changes the current state of this Execution and updates its | 
| +// Modified timestamp. | 
| +func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { | 
| + if e.State == newState { | 
| + return nil | 
| + } | 
| + if err := e.State.Evolve(newState); err != nil { | 
| + return err | 
| + } | 
| + now := clock.Now(c).UTC() | 
| + if now.After(e.Modified) { | 
| + e.Modified = now | 
| + } else { | 
| + // Microsecond is the smallest granularity that datastore can store | 
| + // timestamps, so use that to disambiguate: the goal here is that any | 
| + // modification always increments the modified time, and never decrements | 
| + // it. | 
| + e.Modified = e.Modified.Add(time.Microsecond) | 
| + } | 
| + return nil | 
| +} | 
| + | 
| +// MakeRandomToken creates a cryptographically random byte slice of the | 
| +// specified length. It panics if the specified length cannot be read in full. | 
| +func MakeRandomToken(c context.Context, l uint32) []byte { | 
| + rtok := make([]byte, l) | 
| + if _, err := cryptorand.Read(c, rtok); err != nil { | 
| + panic(err) | 
| + } | 
| + return rtok | 
| +} | 
| + | 
| +// Revoke will null-out the Token and Put this Execution to the datastore. This | 
| 
 
dnj (Google)
2016/06/09 18:00:56
nit: "nil-out" in above documentation, "null-out"
 
iannucci
2016/06/15 00:46:01
Done.
 
 | 
| +// action requires the Execution to be in the RUNNING state, and causes it to | 
| +// enter the STOPPING state. | 
| func (e *Execution) Revoke(c context.Context) error { | 
| e.Token = nil | 
| + if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { | 
| + return err | 
| + } | 
| return datastore.Get(c).Put(e) | 
| } | 
| @@ -69,7 +146,8 @@ func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec | 
| err = datastore.Get(c).GetMulti([]interface{}{a, e}) | 
| if err != nil { | 
| - err = fmt.Errorf("couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) | 
| + err = grpcutil.Errf(codes.Internal, | 
| + "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) | 
| return | 
| } | 
| @@ -102,6 +180,14 @@ func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a | 
| return | 
| } | 
| +func makeError(err error, msg string) error { | 
| + code := grpcutil.Code(err) | 
| + if code == codes.Unknown { | 
| + code = codes.Unauthenticated | 
| 
 
dnj (Google)
2016/06/09 18:00:56
Does this make sense? Unauthenticated specifically
 
iannucci
2016/06/15 00:46:01
Yeah, basically: unless we specifically tagged thi
 
 | 
| + } | 
| + return grpcutil.Errf(code, msg) | 
| +} | 
| + | 
| // AuthenticateExecution verifies that the Attempt is executing, and that evkey | 
| // matches the execution key of the current Execution for this Attempt. | 
| // | 
| @@ -110,7 +196,7 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem | 
| a, e, err = verifyExecutionAndCheckExTok(c, auth) | 
| if err != nil { | 
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") | 
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") | 
| + err = makeError(err, "requires execution Auth") | 
| } | 
| return a, e, err | 
| } | 
| @@ -122,14 +208,14 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem | 
| func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { | 
| if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { | 
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") | 
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") | 
| + err = makeError(err, "requires execution Auth") | 
| return | 
| } | 
| err = e.Revoke(c) | 
| if err != nil { | 
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution") | 
| - err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") | 
| + err = makeError(err, "unable to invalidate Auth") | 
| } | 
| return | 
| } | 
| @@ -146,7 +232,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT | 
| } | 
| switch e.State { | 
| - case dm.Execution_SCHEDULED: | 
| + case dm.Execution_SCHEDULING: | 
| if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { | 
| err = errors.New("incorrect ActivationToken") | 
| return | 
| @@ -155,6 +241,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT | 
| e.State.MustEvolve(dm.Execution_RUNNING) | 
| e.Token = actTok | 
| err = datastore.Get(c).Put(e) | 
| + logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id) | 
| case dm.Execution_RUNNING: | 
| if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { | 
| @@ -164,6 +251,8 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT | 
| // by the same client, so there's no error, or it's wrong which means it's | 
| // a retry by a different client. | 
| + logging.Infof(c, "already activated execution %s", auth.Id) | 
| + | 
| default: | 
| err = fmt.Errorf("Execution is in wrong state") | 
| } | 
| @@ -180,28 +269,58 @@ func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by | 
| a, e, err = verifyExecutionAndActivate(c, auth, actToken) | 
| if err != nil { | 
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution") | 
| - err = grpcutil.Errf(codes.Unauthenticated, "failed to activate execution Auth") | 
| + err = makeError(err, "failed to activate execution Auth") | 
| } | 
| return a, e, err | 
| } | 
| +// GetEID gets an Execution_ID for this Execution. It panics if the Execution | 
| +// is in an invalid state. | 
| +func (e *Execution) GetEID() *dm.Execution_ID { | 
| + aid := &dm.Attempt_ID{} | 
| + if e.ID == 0 { | 
| + panic("cannot create valid Execution_ID with 0-value ID field") | 
| + } | 
| + if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | 
| + panic(err) | 
| + } | 
| + return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | 
| +} | 
| + | 
| // ToProto returns a dm proto version of this Execution. | 
| func (e *Execution) ToProto(includeID bool) *dm.Execution { | 
| - ret := &dm.Execution{ | 
| - Data: &dm.Execution_Data{ | 
| - State: e.State, | 
| - StateReason: e.StateReason, | 
| - Created: google_pb.NewTimestamp(e.Created), | 
| - DistributorToken: e.DistributorToken, | 
| - DistributorInfoUrl: e.DistributorURL, | 
| - }, | 
| - } | 
| + ret := &dm.Execution{Data: e.DataProto()} | 
| if includeID { | 
| - aid := &dm.Attempt_ID{} | 
| - if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { | 
| - panic(err) | 
| - } | 
| - ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) | 
| + ret.Id = e.GetEID() | 
| + } | 
| + return ret | 
| +} | 
| + | 
| +// DataProto returns an Execution.Data message for this Execution. | 
| +// | 
| +// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for | 
| +// package cyclical import reasons. | 
| +func (e *Execution) DataProto() (ret *dm.Execution_Data) { | 
| + switch e.State { | 
| + case dm.Execution_SCHEDULING: | 
| + ret = dm.NewExecutionScheduling().Data | 
| + case dm.Execution_RUNNING: | 
| + ret = dm.NewExecutionRunning().Data | 
| + case dm.Execution_STOPPING: | 
| + ret = dm.NewExecutionStopping().Data | 
| + case dm.Execution_FINISHED: | 
| + ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).Data | 
| + case dm.Execution_ABNORMAL_FINISHED: | 
| + ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data | 
| + default: | 
| + panic(fmt.Errorf("unknown Execution_State: %s", e.State)) | 
| + } | 
| + ret.Created = google_pb.NewTimestamp(e.Created) | 
| + ret.Modified = google_pb.NewTimestamp(e.Modified) | 
| + ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ | 
| + ConfigName: e.DistributorConfigName, | 
| + ConfigVersion: e.DistributorConfigVersion, | 
| + Token: e.DistributorToken, | 
| } | 
| return ret | 
| } |