Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(646)

Unified Diff: appengine/cmd/dm/model/execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/dm/model/attempt_test.go ('k') | appengine/cmd/dm/model/execution_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/dm/model/execution.go
diff --git a/appengine/cmd/dm/model/execution.go b/appengine/cmd/dm/model/execution.go
index 17bbdfdb875e5a7b31d64a65f1ab766cbac2e96d..176749a613558e2ccd8e9a422a67c7c291b03c6d 100644
--- a/appengine/cmd/dm/model/execution.go
+++ b/appengine/cmd/dm/model/execution.go
@@ -16,7 +16,9 @@ import (
"golang.org/x/net/context"
"github.com/luci/gae/service/datastore"
- "github.com/luci/luci-go/common/api/dm/service/v1"
+ dm "github.com/luci/luci-go/common/api/dm/service/v1"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/cryptorand"
"github.com/luci/luci-go/common/grpcutil"
"github.com/luci/luci-go/common/logging"
google_pb "github.com/luci/luci-go/common/proto/google"
@@ -30,12 +32,31 @@ type Execution struct {
ID uint32 `gae:"$id"`
Attempt *datastore.Key `gae:"$parent"`
- State dm.Execution_State
- StateReason string `gae:",noindex"`
+ Created time.Time
+ Modified time.Time
- Created time.Time
- DistributorToken string
- DistributorURL string `gae:",noindex"`
+ DistributorConfigName string
+ DistributorConfigVersion string
+ DistributorToken string
+
+ State dm.Execution_State
+
+ // Only valid in the ABNORMAL_FINISHED state.
+ AbnormalFinish dm.AbnormalFinish
+
+ // Only valid in the FINISHED state.
+ ResultPersistentState []byte `gae:",noindex"`
+
+ // These are DM's internal mechanism for performing timeout actions on
+ // Executions.
+ //
+ // The TimeTo* variables are filled in by the distributor when this Execution
+ // is created.
+ //
+ // The Timeout is only active when the Execution is in a non-terminal state.
+ TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING
+ TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING
+ TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED
// Token is a randomized nonce that's used to verify that RPCs verify from the
// expected client (the client that's currently running the Execution). The
@@ -43,21 +64,77 @@ type Execution struct {
//
// When the Execution is handed to the distributor, the Token is randomly
// generated by DM and passed to the distributor. The State of the Execution
- // starts as Scheduled. This token may be used by the client to "activate" the
+ // starts as SCHEDULED. This token may be used by the client to "activate" the
// Execution with the ActivateExecution rpc. At that point, the client
- // provides a new random token, the Execution State moves from Scheduled to
- // Running, and Token assumes the new value. As long as the Execution State is
- // running, the client may continue to use that new Token value to
+ // provides a new random token, the Execution State moves from SCHEDULED to
+ // RUNNING, and Token assumes the new value. As long as the Execution State is
+ // RUNNING, the client may continue to use that new Token value to
// authenticate other rpc's like AddDeps and FinishAttempt.
//
- // As soon as the Execution is no longer supposed to have access, Token will
- // be nil'd out.
+ // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED
+ // state, this will be nil'd out.
Token []byte `gae:",noindex"`
}
-// Revoke will null-out the Token and Put this Execution to the datastore.
+// MakeExecution makes a new Execution in the SCHEDULING state, with a new
+// random Token.
+func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution {
+ now := clock.Now(c).UTC()
+ ret := &Execution{
+ ID: e.Id,
+ Attempt: AttemptKeyFromID(c, e.AttemptID()),
+
+ Created: now,
+ Modified: now,
+
+ DistributorConfigName: cfgName,
+ DistributorConfigVersion: cfgVers,
+
+ Token: MakeRandomToken(c, dm.MinimumActivationTokenLength),
+ }
+ return ret
+}
+
+// ModifyState changes the current state of this Execution and updates its
+// Modified timestamp.
+func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error {
+ if e.State == newState {
+ return nil
+ }
+ if err := e.State.Evolve(newState); err != nil {
+ return err
+ }
+ now := clock.Now(c).UTC()
+ if now.After(e.Modified) {
+ e.Modified = now
+ } else {
+ // Microsecond is the smallest granularity that datastore can store
+ // timestamps, so use that to disambiguate: the goal here is that any
+ // modification always increments the modified time, and never decrements
+ // it.
+ e.Modified = e.Modified.Add(time.Microsecond)
+ }
+ return nil
+}
+
+// MakeRandomToken creates a cryptographically random byte slice of the
+// specified length. It panics if the specified length cannot be read in full.
+func MakeRandomToken(c context.Context, l uint32) []byte {
+ rtok := make([]byte, l)
+ if _, err := cryptorand.Read(c, rtok); err != nil {
+ panic(err)
+ }
+ return rtok
+}
+
+// Revoke will clear the Token and Put this Execution to the datastore. This
+// action requires the Execution to be in the RUNNING state, and causes it to
+// enter the STOPPING state.
func (e *Execution) Revoke(c context.Context) error {
e.Token = nil
+ if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil {
+ return err
+ }
return datastore.Get(c).Put(e)
}
@@ -69,7 +146,8 @@ func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec
err = datastore.Get(c).Get(a, e)
if err != nil {
- err = fmt.Errorf("couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
+ err = grpcutil.Errf(codes.Internal,
+ "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
return
}
@@ -102,6 +180,14 @@ func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a
return
}
+func makeError(err error, msg string) error {
+ code := grpcutil.Code(err)
+ if code == codes.Unknown {
+ code = codes.PermissionDenied
+ }
+ return grpcutil.Errf(code, msg)
+}
+
// AuthenticateExecution verifies that the Attempt is executing, and that evkey
// matches the execution key of the current Execution for this Attempt.
//
@@ -110,7 +196,7 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
a, e, err = verifyExecutionAndCheckExTok(c, auth)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
- err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth")
+ err = makeError(err, "requires execution Auth")
}
return a, e, err
}
@@ -122,14 +208,14 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem
func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
- err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth")
+ err = makeError(err, "requires execution Auth")
return
}
err = e.Revoke(c)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution")
- err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth")
+ err = makeError(err, "unable to invalidate Auth")
}
return
}
@@ -146,7 +232,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
}
switch e.State {
- case dm.Execution_SCHEDULED:
+ case dm.Execution_SCHEDULING:
if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
err = errors.New("incorrect ActivationToken")
return
@@ -155,6 +241,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
e.State.MustEvolve(dm.Execution_RUNNING)
e.Token = actTok
err = datastore.Get(c).Put(e)
+ logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id)
case dm.Execution_RUNNING:
if subtle.ConstantTimeCompare(e.Token, actTok) != 1 {
@@ -164,6 +251,8 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT
// by the same client, so there's no error, or it's wrong which means it's
// a retry by a different client.
+ logging.Infof(c, "already activated execution %s", auth.Id)
+
default:
err = fmt.Errorf("Execution is in wrong state")
}
@@ -180,28 +269,58 @@ func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by
a, e, err = verifyExecutionAndActivate(c, auth, actToken)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution")
- err = grpcutil.Errf(codes.Unauthenticated, "failed to activate execution Auth")
+ err = makeError(err, "failed to activate execution Auth")
}
return a, e, err
}
+// GetEID gets an Execution_ID for this Execution. It panics if the Execution
+// is in an invalid state.
+func (e *Execution) GetEID() *dm.Execution_ID {
+ aid := &dm.Attempt_ID{}
+ if e.ID == 0 {
+ panic("cannot create valid Execution_ID with 0-value ID field")
+ }
+ if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
+ panic(err)
+ }
+ return dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
+}
+
// ToProto returns a dm proto version of this Execution.
func (e *Execution) ToProto(includeID bool) *dm.Execution {
- ret := &dm.Execution{
- Data: &dm.Execution_Data{
- State: e.State,
- StateReason: e.StateReason,
- Created: google_pb.NewTimestamp(e.Created),
- DistributorToken: e.DistributorToken,
- DistributorInfoUrl: e.DistributorURL,
- },
- }
+ ret := &dm.Execution{Data: e.DataProto()}
if includeID {
- aid := &dm.Attempt_ID{}
- if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
- panic(err)
- }
- ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID)
+ ret.Id = e.GetEID()
+ }
+ return ret
+}
+
+// DataProto returns an Execution.Data message for this Execution.
+//
+// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for
+// package cyclical import reasons.
+func (e *Execution) DataProto() (ret *dm.Execution_Data) {
+ switch e.State {
+ case dm.Execution_SCHEDULING:
+ ret = dm.NewExecutionScheduling().Data
+ case dm.Execution_RUNNING:
+ ret = dm.NewExecutionRunning().Data
+ case dm.Execution_STOPPING:
+ ret = dm.NewExecutionStopping().Data
+ case dm.Execution_FINISHED:
+ ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).Data
+ case dm.Execution_ABNORMAL_FINISHED:
+ ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data
+ default:
+ panic(fmt.Errorf("unknown Execution_State: %s", e.State))
+ }
+ ret.Created = google_pb.NewTimestamp(e.Created)
+ ret.Modified = google_pb.NewTimestamp(e.Modified)
+ ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{
+ ConfigName: e.DistributorConfigName,
+ ConfigVersion: e.DistributorConfigVersion,
+ Token: e.DistributorToken,
}
return ret
}
« no previous file with comments | « appengine/cmd/dm/model/attempt_test.go ('k') | appengine/cmd/dm/model/execution_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698