| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package deps | |
| 6 | |
| 7 import ( | |
| 8 crand "crypto/rand" // TODO(iannucci): mock this for testing | |
| 9 "encoding/hex" | |
| 10 | |
| 11 "github.com/luci/gae/service/datastore" | |
| 12 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
| 13 "github.com/luci/luci-go/common/api/dm/service/v1" | |
| 14 "github.com/luci/luci-go/common/logging" | |
| 15 "github.com/luci/luci-go/common/mathrand" | |
| 16 google_pb "github.com/luci/luci-go/common/proto/google" | |
| 17 "golang.org/x/net/context" | |
| 18 ) | |
| 19 | |
| 20 const claimRetries = 8 | |
| 21 | |
| 22 // ClaimExecution is a temporary api which searches for and transactionally | |
| 23 // claims an execution for an Attempt whose state is NeedsExecution. | |
| 24 func (d *deps) ClaimExecution(c context.Context, _ *google_pb.Empty) (rsp *dm.Cl
aimExecutionRsp, err error) { | |
| 25 // TODO(iannucci): stuff below | |
| 26 // It's temporary!! Don't worry about it! :D | |
| 27 // Use of GET to mutate state | |
| 28 // No 3-way handshake means lost executions | |
| 29 ds := datastore.Get(c) | |
| 30 l := logging.Get(c) | |
| 31 | |
| 32 exKeyBytes := make([]byte, 32) | |
| 33 _, err = crand.Read(exKeyBytes) | |
| 34 if err != nil { | |
| 35 return | |
| 36 } | |
| 37 rsp = &dm.ClaimExecutionRsp{ | |
| 38 Auth: &dm.Execution_Auth{Token: exKeyBytes}} | |
| 39 | |
| 40 for attempts := 0; attempts < claimRetries; attempts++ { | |
| 41 q := datastore.NewQuery("Attempt").Eq("State", dm.Attempt_NEEDS_
EXECUTION).Limit(32) | |
| 42 | |
| 43 if attempts < claimRetries-1 { | |
| 44 prefixBytes := make([]byte, 2) | |
| 45 _, err = crand.Read(prefixBytes) | |
| 46 if err != nil { | |
| 47 return | |
| 48 } | |
| 49 prefix := hex.EncodeToString(prefixBytes) | |
| 50 l.Infof("Making query (prefix=%s)", prefix) | |
| 51 q = q.Gte("__key__", ds.MakeKey("Attempt", prefix)) | |
| 52 } else { | |
| 53 l.Infof("Making query (no prefix)") | |
| 54 } | |
| 55 | |
| 56 as := []*model.Attempt{} | |
| 57 err = ds.GetAll(q, &as) | |
| 58 if err != nil { | |
| 59 return | |
| 60 } | |
| 61 l.Infof("Got %d executions", len(as)) | |
| 62 if len(as) == 0 { | |
| 63 continue | |
| 64 } | |
| 65 | |
| 66 // Now find a random one which actually needs execution | |
| 67 var aid dm.Attempt_ID | |
| 68 for _, i := range mathrand.Get(c).Perm(len(as)) { | |
| 69 if as[i].State != dm.Attempt_NEEDS_EXECUTION { | |
| 70 continue | |
| 71 } | |
| 72 aid = as[i].ID | |
| 73 } | |
| 74 if aid == (dm.Attempt_ID{}) { | |
| 75 continue | |
| 76 } | |
| 77 | |
| 78 tryAgain := false | |
| 79 | |
| 80 err = ds.RunInTransaction(func(c context.Context) error { | |
| 81 ds := datastore.Get(c) | |
| 82 | |
| 83 l.Infof("In TXN for %q", aid) | |
| 84 // need to re-read this in the transaction to ensure it
really does need | |
| 85 // execution still :) | |
| 86 a := &model.Attempt{ID: aid} | |
| 87 err := ds.Get(a) | |
| 88 if err != nil { | |
| 89 return err | |
| 90 } | |
| 91 if a.State != dm.Attempt_NEEDS_EXECUTION { | |
| 92 // oops, we picked a bad one, try again in the o
uter loop. | |
| 93 tryAgain = true | |
| 94 return nil | |
| 95 } | |
| 96 | |
| 97 err = a.State.Evolve(dm.Attempt_EXECUTING) | |
| 98 if err != nil { | |
| 99 return err | |
| 100 } | |
| 101 | |
| 102 a.CurExecution++ | |
| 103 | |
| 104 ex := &model.Execution{ | |
| 105 ID: a.CurExecution, | |
| 106 Attempt: ds.KeyForObj(a), | |
| 107 State: dm.Execution_RUNNING, | |
| 108 Token: rsp.Auth.Token, | |
| 109 } | |
| 110 rsp.Auth.Id.Id = a.CurExecution | |
| 111 rsp.Auth.Id.Attempt = a.ID.Id | |
| 112 | |
| 113 err = ds.PutMulti([]interface{}{a, ex}) | |
| 114 if err != nil { | |
| 115 return err | |
| 116 } | |
| 117 | |
| 118 qst := &model.Quest{ID: aid.Quest} | |
| 119 err = datastore.GetNoTxn(c).Get(qst) | |
| 120 if err != nil { | |
| 121 return err | |
| 122 } | |
| 123 rsp.Quest = qst.ToProto() | |
| 124 return nil | |
| 125 }, nil) | |
| 126 if tryAgain || err != nil { | |
| 127 continue | |
| 128 } | |
| 129 return | |
| 130 } | |
| 131 | |
| 132 rsp = nil | |
| 133 return | |
| 134 } | |
| OLD | NEW |