Index: appengine/cmd/dm/model/execution.go |
diff --git a/appengine/cmd/dm/model/execution.go b/appengine/cmd/dm/model/execution.go |
index c0b233512f494ee54cec0701c2f6a981ba739703..99413e1987fdf93871dcd21da9d0628d50c2a846 100644 |
--- a/appengine/cmd/dm/model/execution.go |
+++ b/appengine/cmd/dm/model/execution.go |
@@ -16,7 +16,9 @@ import ( |
"golang.org/x/net/context" |
"github.com/luci/gae/service/datastore" |
- "github.com/luci/luci-go/common/api/dm/service/v1" |
+ dm "github.com/luci/luci-go/common/api/dm/service/v1" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/cryptorand" |
"github.com/luci/luci-go/common/grpcutil" |
"github.com/luci/luci-go/common/logging" |
google_pb "github.com/luci/luci-go/common/proto/google" |
@@ -30,12 +32,31 @@ type Execution struct { |
ID uint32 `gae:"$id"` |
Attempt *datastore.Key `gae:"$parent"` |
- State dm.Execution_State |
- StateReason string `gae:",noindex"` |
+ Created time.Time |
+ Modified time.Time |
- Created time.Time |
- DistributorToken string |
- DistributorURL string `gae:",noindex"` |
+ DistributorConfigName string |
+ DistributorConfigVersion string |
+ DistributorToken string |
+ |
+ State dm.Execution_State |
+ |
+ // Only valid in the ABNORMAL_FINISHED state. |
+ AbnormalFinish dm.AbnormalFinish |
+ |
+ // Only valid in the FINISHED state. |
+ ResultPersistentState string |
+ |
+ // These are DM's internal mechanism for performing timeout actions on |
+ // Executions. |
+ // |
+ // The TimeTo* variables are filled in by the distributor when this Execution |
+ // is created. |
+ // |
+ // The Timeout is only active when the Execution is in a non-terminal state. |
+ TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING |
+ TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING |
+ TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED |
iannucci
2016/06/08 02:54:24
timeouts!
|
// Token is a randomized nonce that's used to verify that RPCs verify from the |
// expected client (the client that's currently running the Execution). The |
@@ -43,21 +64,77 @@ type Execution struct { |
// |
// When the Execution is handed to the distributor, the Token is randomly |
// generated by DM and passed to the distributor. The State of the Execution |
- // starts as Scheduled. This token may be used by the client to "activate" the |
+ // starts as SCHEDULED. This token may be used by the client to "activate" the |
// Execution with the ActivateExecution rpc. At that point, the client |
- // provides a new random token, the Execution State moves from Scheduled to |
- // Running, and Token assumes the new value. As long as the Execution State is |
- // running, the client may continue to use that new Token value to |
+ // provides a new random token, the Execution State moves from SCHEDULED to |
+ // RUNNING, and Token assumes the new value. As long as the Execution State is |
+ // RUNNING, the client may continue to use that new Token value to |
// authenticate other rpc's like AddDeps and FinishAttempt. |
// |
- // As soon as the Execution is no longer supposed to have access, Token will |
- // be nil'd out. |
+ // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED |
+ // state, this will be nil'd out. |
Token []byte `gae:",noindex"` |
} |
-// Revoke will null-out the Token and Put this Execution to the datastore. |
+// MakeExecution makes a new Execution in the SCHEDULING state, with a new |
+// random Token. |
+func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution { |
+ now := clock.Now(c).UTC() |
+ ret := &Execution{ |
+ ID: e.Id, |
+ Attempt: datastore.Get(c).MakeKey("Attempt", e.AttemptID().DMEncoded()), |
dnj (Google)
2016/06/09 18:00:56
AttemptKeyFromID?
iannucci
2016/06/15 00:46:01
good catch, I tried to find these all but apparent
|
+ |
+ Created: now, |
+ Modified: now, |
+ |
+ DistributorConfigName: cfgName, |
+ DistributorConfigVersion: cfgVers, |
+ |
+ Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), |
+ } |
+ return ret |
+} |
+ |
+// ModifyState changes the current state of this Execution and updates its |
+// Modified timestamp. |
+func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { |
+ if e.State == newState { |
+ return nil |
+ } |
+ if err := e.State.Evolve(newState); err != nil { |
+ return err |
+ } |
+ now := clock.Now(c).UTC() |
+ if now.After(e.Modified) { |
+ e.Modified = now |
+ } else { |
+ // Microsecond is the smallest granularity that datastore can store |
+ // timestamps, so use that to disambiguate: the goal here is that any |
+ // modification always increments the modified time, and never decrements |
+ // it. |
+ e.Modified = e.Modified.Add(time.Microsecond) |
+ } |
+ return nil |
+} |
+ |
+// MakeRandomToken creates a cryptographically random byte slice of the |
+// specified length. It panics if the specified length cannot be read in full. |
+func MakeRandomToken(c context.Context, l uint32) []byte { |
+ rtok := make([]byte, l) |
+ if _, err := cryptorand.Read(c, rtok); err != nil { |
+ panic(err) |
+ } |
+ return rtok |
+} |
+ |
+// Revoke will null-out the Token and Put this Execution to the datastore. This |
dnj (Google)
2016/06/09 18:00:56
nit: "nil-out" in above documentation, "null-out"
iannucci
2016/06/15 00:46:01
Done.
|
+// action requires the Execution to be in the RUNNING state, and causes it to |
+// enter the STOPPING state. |
func (e *Execution) Revoke(c context.Context) error { |
e.Token = nil |
+ if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { |
+ return err |
+ } |
return datastore.Get(c).Put(e) |
} |
@@ -69,7 +146,8 @@ func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec |
err = datastore.Get(c).GetMulti([]interface{}{a, e}) |
if err != nil { |
- err = fmt.Errorf("couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) |
+ err = grpcutil.Errf(codes.Internal, |
+ "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) |
return |
} |
@@ -102,6 +180,14 @@ func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a |
return |
} |
+func makeError(err error, msg string) error { |
+ code := grpcutil.Code(err) |
+ if code == codes.Unknown { |
+ code = codes.Unauthenticated |
dnj (Google)
2016/06/09 18:00:56
Does this make sense? Unauthenticated specifically
iannucci
2016/06/15 00:46:01
Yeah, basically: unless we specifically tagged thi
|
+ } |
+ return grpcutil.Errf(code, msg) |
+} |
+ |
// AuthenticateExecution verifies that the Attempt is executing, and that evkey |
// matches the execution key of the current Execution for this Attempt. |
// |
@@ -110,7 +196,7 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem |
a, e, err = verifyExecutionAndCheckExTok(c, auth) |
if err != nil { |
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
- err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") |
+ err = makeError(err, "requires execution Auth") |
} |
return a, e, err |
} |
@@ -122,14 +208,14 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem |
func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { |
if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
- err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") |
+ err = makeError(err, "requires execution Auth") |
return |
} |
err = e.Revoke(c) |
if err != nil { |
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution") |
- err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") |
+ err = makeError(err, "unable to invalidate Auth") |
} |
return |
} |
@@ -146,7 +232,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
} |
switch e.State { |
- case dm.Execution_SCHEDULED: |
+ case dm.Execution_SCHEDULING: |
if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
err = errors.New("incorrect ActivationToken") |
return |
@@ -155,6 +241,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
e.State.MustEvolve(dm.Execution_RUNNING) |
e.Token = actTok |
err = datastore.Get(c).Put(e) |
+ logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id) |
case dm.Execution_RUNNING: |
if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
@@ -164,6 +251,8 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
// by the same client, so there's no error, or it's wrong which means it's |
// a retry by a different client. |
+ logging.Infof(c, "already activated execution %s", auth.Id) |
+ |
default: |
err = fmt.Errorf("Execution is in wrong state") |
} |
@@ -180,28 +269,58 @@ func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by |
a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
if err != nil { |
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution") |
- err = grpcutil.Errf(codes.Unauthenticated, "failed to activate execution Auth") |
+ err = makeError(err, "failed to activate execution Auth") |
} |
return a, e, err |
} |
+// GetEID gets an Execution_ID for this Execution. It panics if the Execution |
+// is in an invalid state. |
+func (e *Execution) GetEID() *dm.Execution_ID { |
+ aid := &dm.Attempt_ID{} |
+ if e.ID == 0 { |
+ panic("cannot create valid Execution_ID with 0-value ID field") |
+ } |
+ if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
+ panic(err) |
+ } |
+ return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) |
+} |
+ |
// ToProto returns a dm proto version of this Execution. |
func (e *Execution) ToProto(includeID bool) *dm.Execution { |
- ret := &dm.Execution{ |
- Data: &dm.Execution_Data{ |
- State: e.State, |
- StateReason: e.StateReason, |
- Created: google_pb.NewTimestamp(e.Created), |
- DistributorToken: e.DistributorToken, |
- DistributorInfoUrl: e.DistributorURL, |
- }, |
- } |
+ ret := &dm.Execution{Data: e.DataProto()} |
if includeID { |
- aid := &dm.Attempt_ID{} |
- if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
- panic(err) |
- } |
- ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) |
+ ret.Id = e.GetEID() |
+ } |
+ return ret |
+} |
+ |
+// DataProto returns an Execution.Data message for this Execution. |
+// |
+// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for |
+// package cyclical import reasons. |
+func (e *Execution) DataProto() (ret *dm.Execution_Data) { |
+ switch e.State { |
+ case dm.Execution_SCHEDULING: |
+ ret = dm.NewExecutionScheduling().Data |
+ case dm.Execution_RUNNING: |
+ ret = dm.NewExecutionRunning().Data |
+ case dm.Execution_STOPPING: |
+ ret = dm.NewExecutionStopping().Data |
+ case dm.Execution_FINISHED: |
+ ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).Data |
+ case dm.Execution_ABNORMAL_FINISHED: |
+ ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data |
+ default: |
+ panic(fmt.Errorf("unknown Execution_State: %s", e.State)) |
+ } |
+ ret.Created = google_pb.NewTimestamp(e.Created) |
+ ret.Modified = google_pb.NewTimestamp(e.Modified) |
+ ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ |
+ ConfigName: e.DistributorConfigName, |
+ ConfigVersion: e.DistributorConfigVersion, |
+ Token: e.DistributorToken, |
} |
return ret |
} |