OLD | NEW |
| (Empty) |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package deps | |
6 | |
7 import ( | |
8 crand "crypto/rand" // TODO(iannucci): mock this for testing | |
9 "encoding/hex" | |
10 | |
11 "github.com/luci/gae/service/datastore" | |
12 "github.com/luci/luci-go/appengine/cmd/dm/model" | |
13 "github.com/luci/luci-go/common/api/dm/service/v1" | |
14 "github.com/luci/luci-go/common/logging" | |
15 "github.com/luci/luci-go/common/mathrand" | |
16 google_pb "github.com/luci/luci-go/common/proto/google" | |
17 "golang.org/x/net/context" | |
18 ) | |
19 | |
20 const claimRetries = 8 | |
21 | |
22 // ClaimExecution is a temporary api which searches for and transactionally | |
23 // claims an execution for an Attempt whose state is NeedsExecution. | |
24 func (d *deps) ClaimExecution(c context.Context, _ *google_pb.Empty) (rsp *dm.Cl
aimExecutionRsp, err error) { | |
25 // TODO(iannucci): stuff below | |
26 // It's temporary!! Don't worry about it! :D | |
27 // Use of GET to mutate state | |
28 // No 3-way handshake means lost executions | |
29 ds := datastore.Get(c) | |
30 l := logging.Get(c) | |
31 | |
32 exKeyBytes := make([]byte, 32) | |
33 _, err = crand.Read(exKeyBytes) | |
34 if err != nil { | |
35 return | |
36 } | |
37 rsp = &dm.ClaimExecutionRsp{ | |
38 Auth: &dm.Execution_Auth{Token: exKeyBytes}} | |
39 | |
40 for attempts := 0; attempts < claimRetries; attempts++ { | |
41 q := datastore.NewQuery("Attempt").Eq("State", dm.Attempt_NEEDS_
EXECUTION).Limit(32) | |
42 | |
43 if attempts < claimRetries-1 { | |
44 prefixBytes := make([]byte, 2) | |
45 _, err = crand.Read(prefixBytes) | |
46 if err != nil { | |
47 return | |
48 } | |
49 prefix := hex.EncodeToString(prefixBytes) | |
50 l.Infof("Making query (prefix=%s)", prefix) | |
51 q = q.Gte("__key__", ds.MakeKey("Attempt", prefix)) | |
52 } else { | |
53 l.Infof("Making query (no prefix)") | |
54 } | |
55 | |
56 as := []*model.Attempt{} | |
57 err = ds.GetAll(q, &as) | |
58 if err != nil { | |
59 return | |
60 } | |
61 l.Infof("Got %d executions", len(as)) | |
62 if len(as) == 0 { | |
63 continue | |
64 } | |
65 | |
66 // Now find a random one which actually needs execution | |
67 var aid dm.Attempt_ID | |
68 for _, i := range mathrand.Get(c).Perm(len(as)) { | |
69 if as[i].State != dm.Attempt_NEEDS_EXECUTION { | |
70 continue | |
71 } | |
72 aid = as[i].ID | |
73 } | |
74 if aid == (dm.Attempt_ID{}) { | |
75 continue | |
76 } | |
77 | |
78 tryAgain := false | |
79 | |
80 err = ds.RunInTransaction(func(c context.Context) error { | |
81 ds := datastore.Get(c) | |
82 | |
83 l.Infof("In TXN for %q", aid) | |
84 // need to re-read this in the transaction to ensure it
really does need | |
85 // execution still :) | |
86 a := &model.Attempt{ID: aid} | |
87 err := ds.Get(a) | |
88 if err != nil { | |
89 return err | |
90 } | |
91 if a.State != dm.Attempt_NEEDS_EXECUTION { | |
92 // oops, we picked a bad one, try again in the o
uter loop. | |
93 tryAgain = true | |
94 return nil | |
95 } | |
96 | |
97 err = a.State.Evolve(dm.Attempt_EXECUTING) | |
98 if err != nil { | |
99 return err | |
100 } | |
101 | |
102 a.CurExecution++ | |
103 | |
104 ex := &model.Execution{ | |
105 ID: a.CurExecution, | |
106 Attempt: ds.KeyForObj(a), | |
107 State: dm.Execution_RUNNING, | |
108 Token: rsp.Auth.Token, | |
109 } | |
110 rsp.Auth.Id.Id = a.CurExecution | |
111 rsp.Auth.Id.Attempt = a.ID.Id | |
112 | |
113 err = ds.PutMulti([]interface{}{a, ex}) | |
114 if err != nil { | |
115 return err | |
116 } | |
117 | |
118 qst := &model.Quest{ID: aid.Quest} | |
119 err = datastore.GetNoTxn(c).Get(qst) | |
120 if err != nil { | |
121 return err | |
122 } | |
123 rsp.Quest = qst.ToProto() | |
124 return nil | |
125 }, nil) | |
126 if tryAgain || err != nil { | |
127 continue | |
128 } | |
129 return | |
130 } | |
131 | |
132 rsp = nil | |
133 return | |
134 } | |
OLD | NEW |