Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(587)

Side by Side Diff: appengine/cmd/dm/deps/walk_graph_test.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: fix imports and make dummy.go a real file Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/dm/deps/walk_graph.go ('k') | appengine/cmd/dm/distributor/config.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package deps 5 package deps
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "testing" 9 "testing"
10 "time" 10 "time"
11 11
12 "github.com/luci/gae/service/datastore" 12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor/fake"
13 "github.com/luci/luci-go/appengine/cmd/dm/model" 15 "github.com/luci/luci-go/appengine/cmd/dm/model"
14 "github.com/luci/luci-go/appengine/tumble"
15 dm "github.com/luci/luci-go/common/api/dm/service/v1" 16 dm "github.com/luci/luci-go/common/api/dm/service/v1"
16 "github.com/luci/luci-go/common/clock" 17 "github.com/luci/luci-go/common/clock"
17 "github.com/luci/luci-go/common/clock/testclock" 18 "github.com/luci/luci-go/common/clock/testclock"
18 google_pb "github.com/luci/luci-go/common/proto/google" 19 google_pb "github.com/luci/luci-go/common/proto/google"
19 . "github.com/smartystreets/goconvey/convey" 20 . "github.com/smartystreets/goconvey/convey"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
21 ) 22 )
22 23
23 type breakFwdDepLoads struct { 24 type breakFwdDepLoads struct {
24 datastore.RawInterface 25 datastore.RawInterface
25 } 26 }
26 27
27 func (b breakFwdDepLoads) GetMulti(keys []*datastore.Key, mg datastore.MultiMeta Getter, cb datastore.GetMultiCB) error { 28 func (b breakFwdDepLoads) GetMulti(keys []*datastore.Key, mg datastore.MultiMeta Getter, cb datastore.GetMultiCB) error {
28 for _, k := range keys { 29 for _, k := range keys {
29 if k.Kind() == "FwdDep" { 30 if k.Kind() == "FwdDep" {
30 return fmt.Errorf("Loading FwdDeps is currently broken") 31 return fmt.Errorf("Loading FwdDeps is currently broken")
31 } 32 }
32 } 33 }
33 return b.RawInterface.GetMulti(keys, mg, cb) 34 return b.RawInterface.GetMulti(keys, mg, cb)
34 } 35 }
35 36
36 func TestWalkGraph(t *testing.T) { 37 func TestWalkGraph(t *testing.T) {
37 t.Parallel() 38 t.Parallel()
38 39
39 Convey("WalkGraph", t, func() { 40 Convey("WalkGraph", t, func() {
40 » » ttest := &tumble.Testing{} 41 » » ttest, c, dist, s := testSetup()
41 » » c := ttest.Context()
42 42
43 ds := datastore.Get(c) 43 ds := datastore.Get(c)
44 s := newDecoratedDeps()
45 44
46 req := &dm.WalkGraphReq{ 45 req := &dm.WalkGraphReq{
47 Query: dm.AttemptListQueryL(map[string][]uint32{"quest": {1}}), 46 Query: dm.AttemptListQueryL(map[string][]uint32{"quest": {1}}),
48 Limit: &dm.WalkGraphReq_Limit{MaxDepth: 1}, 47 Limit: &dm.WalkGraphReq_Limit{MaxDepth: 1},
49 } 48 }
50 So(req.Normalize(), ShouldBeNil) 49 So(req.Normalize(), ShouldBeNil)
51 50
52 Convey("no attempt", func() { 51 Convey("no attempt", func() {
53 » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 52 » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphData{
54 Quests: map[string]*dm.Quest{"quest": { 53 Quests: map[string]*dm.Quest{"quest": {
55 Attempts: map[uint32]*dm.Attempt{ 54 Attempts: map[uint32]*dm.Attempt{
56 1: {DNE: true}, 55 1: {DNE: true},
57 }, 56 },
58 }}, 57 }},
59 }) 58 })
60 }) 59 })
61 60
62 Convey("good", func() { 61 Convey("good", func() {
63 ds.Testable().Consistent(true) 62 ds.Testable().Consistent(true)
64 63
65 » » » wDesc := &dm.Quest_Desc{ 64 » » » wDesc := fake.QuestDesc("w")
66 » » » » DistributorConfigName: "foof", 65 » » » w := s.ensureQuest(c, "w", 1)
67 » » » » JsonPayload: `{"name":"w"}`,
68 » » » }
69 » » » w := ensureQuest(c, "w", 1)
70 ttest.Drain(c) 66 ttest.Drain(c)
71 aid := dm.NewAttemptID(w, 1)
72 67
73 req.Query.AttemptList = dm.NewAttemptList( 68 req.Query.AttemptList = dm.NewAttemptList(
74 map[string][]uint32{w: {1}}) 69 map[string][]uint32{w: {1}})
75 70
76 Convey("include nothing", func() { 71 Convey("include nothing", func() {
77 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 72 » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphDa ta{
78 Quests: map[string]*dm.Quest{ 73 Quests: map[string]*dm.Quest{
79 w: { 74 w: {
80 Attempts: map[uint32]*dm .Attempt{1: {}}, 75 Attempts: map[uint32]*dm .Attempt{1: {}},
81 }, 76 },
82 }, 77 },
83 }) 78 })
84 }) 79 })
85 80
86 Convey("quest dne", func() { 81 Convey("quest dne", func() {
87 req.Include.QuestData = true 82 req.Include.QuestData = true
88 req.Limit.MaxDepth = 1 83 req.Limit.MaxDepth = 1
89 req.Query.AttemptList = dm.NewAttemptList( 84 req.Query.AttemptList = dm.NewAttemptList(
90 map[string][]uint32{"noex": {1}}) 85 map[string][]uint32{"noex": {1}})
91 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 86 » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphDa ta{
92 Quests: map[string]*dm.Quest{ 87 Quests: map[string]*dm.Quest{
93 "noex": { 88 "noex": {
94 DNE: true, 89 DNE: true,
95 Attempts: map[uint32]*dm .Attempt{1: {DNE: true}}, 90 Attempts: map[uint32]*dm .Attempt{1: {DNE: true}},
96 }, 91 },
97 }, 92 },
98 }) 93 })
99 }) 94 })
100 95
101 Convey("no dependencies", func() { 96 Convey("no dependencies", func() {
102 req.Include.AttemptData = true 97 req.Include.AttemptData = true
103 req.Include.QuestData = true 98 req.Include.QuestData = true
104 req.Include.NumExecutions = 128 99 req.Include.NumExecutions = 128
105 100 » » » » tok := string(fake.MkToken(dm.NewExecutionID(w, 1, 1)))
106 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 101 » » » » aExpect := dm.NewAttemptExecuting(1)
102 » » » » aExpect.Executions = map[uint32]*dm.Execution{1: dm.NewExecutionScheduling()}
103 » » » » aExpect.Executions[1].Data.DistributorInfo = &dm .Execution_Data_DistributorInfo{
104 » » » » » ConfigName: "fakeDistributor",
105 » » » » » ConfigVersion: "testing",
106 » » » » » Token: tok,
107 » » » » }
108 » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphDa ta{
107 Quests: map[string]*dm.Quest{ 109 Quests: map[string]*dm.Quest{
108 w: { 110 w: {
109 Data: &dm.Quest_Data{ 111 Data: &dm.Quest_Data{
110 Desc: wDesc, 112 Desc: wDesc,
111 }, 113 },
112 » » » » » » » Attempts: map[uint32]*dm .Attempt{ 114 » » » » » » » Attempts: map[uint32]*dm .Attempt{1: aExpect},
113 » » » » » » » » 1: dm.NewAttempt NeedsExecution(time.Time{}),
114 » » » » » » » },
115 }, 115 },
116 }, 116 },
117 }) 117 })
118 }) 118 })
119 119
120 Convey("finished", func() { 120 Convey("finished", func() {
121 » » » » _, err := s.FinishAttempt(c, &dm.FinishAttemptRe q{ 121 » » » » wEx := dm.NewExecutionID(w, 1, 1)
122 » » » » » Auth: activate(c, execute(c, aid)) , 122 » » » » dist.RunTask(c, wEx, func(tsk *fake.Task) error {
123 » » » » » JsonResult: `{"data": ["very", "yes"]}`, 123 » » » » » tsk.MustActivate(c, s).Finish(
124 » » » » » Expiration: google_pb.NewTimestamp(clock .Now(c).Add(time.Hour * 24 * 4)), 124 » » » » » » `{"data": ["very", "yes"]}`, clo ck.Now(c).Add(time.Hour*24*4))
125 » » » » » tsk.State = []byte("distributorState")
126 » » » » » return nil
125 }) 127 })
126 » » » » So(err, ShouldBeNil) 128 » » » » ttest.Drain(c)
127
128 » » » » ex := &model.Execution{ID: 1, Attempt: ds.MakeKe y("Attempt", aid.DMEncoded())}
129 » » » » So(ds.Get(ex), ShouldBeNil)
130 » » » » ex.State = dm.Execution_FINISHED
131 » » » » So(ds.Put(ex), ShouldBeNil)
132 129
133 req.Include.AttemptData = true 130 req.Include.AttemptData = true
134 req.Include.AttemptResult = true 131 req.Include.AttemptResult = true
135 req.Include.NumExecutions = 128 132 req.Include.NumExecutions = 128
133 req.Include.ExecutionInfoUrl = true
136 data := `{"data":["very","yes"]}` 134 data := `{"data":["very","yes"]}`
137 » » » » aExpect := dm.NewAttemptFinished(time.Time{}, ui nt32(len(data)), data) 135 » » » » aExpect := dm.NewAttemptFinished(time.Time{}, ui nt32(len(data)), data, []byte("distributorState"))
138 aExpect.Data.NumExecutions = 1 136 aExpect.Data.NumExecutions = 1
139 » » » » aExpect.Executions = map[uint32]*dm.Execution{1: { 137 » » » » aExpect.Executions = map[uint32]*dm.Execution{
140 » » » » » Data: &dm.Execution_Data{ 138 » » » » » 1: dm.NewExecutionFinished("distributorS tate"),
141 » » » » » » State: dm.Execution_FINISHED, 139 » » » » }
142 » » » » » }, 140 » » » » tok := string(fake.MkToken(dm.NewExecutionID(w, 1, 1)))
143 » » » » }} 141 » » » » aExpect.Executions[1].Data.DistributorInfo = &dm .Execution_Data_DistributorInfo{
144 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 142 » » » » » ConfigName: "fakeDistributor",
143 » » » » » ConfigVersion: "testing",
144 » » » » » Token: tok,
145 » » » » » Url: dist.InfoURL(distributor. Token(tok)),
146 » » » » }
147
148 » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphDa ta{
145 Quests: map[string]*dm.Quest{ 149 Quests: map[string]*dm.Quest{
146 w: { 150 w: {
147 Attempts: map[uint32]*dm .Attempt{1: aExpect}, 151 Attempts: map[uint32]*dm .Attempt{1: aExpect},
148 }, 152 },
149 }, 153 },
150 }) 154 })
151 }) 155 })
152 156
153 Convey("limited attempt results", func() { 157 Convey("limited attempt results", func() {
154 » » » » _, err := s.FinishAttempt(c, &dm.FinishAttemptRe q{ 158 » » » » wEx := dm.NewExecutionID(w, 1, 1)
155 » » » » » Auth: activate(c, execute(c, aid)) , 159 » » » » dist.RunTask(c, wEx, func(tsk *fake.Task) error {
156 » » » » » JsonResult: `{"data": ["very", "yes"]}`, 160 » » » » » tsk.MustActivate(c, s).Finish(
157 » » » » » Expiration: google_pb.NewTimestamp(clock .Now(c).Add(time.Hour * 24 * 4)), 161 » » » » » » `{"data": ["very", "yes"]}`, clo ck.Now(c).Add(time.Hour*24*4))
162 » » » » » tsk.State = []byte("distributorState")
163 » » » » » return nil
158 }) 164 })
159 » » » » So(err, ShouldBeNil) 165 » » » » ttest.Drain(c)
160
161 » » » » ex := &model.Execution{ID: 1, Attempt: ds.MakeKe y("Attempt", aid.DMEncoded())}
162 » » » » So(ds.Get(ex), ShouldBeNil)
163 » » » » ex.State = dm.Execution_FINISHED
164 » » » » So(ds.Put(ex), ShouldBeNil)
165 166
166 req.Include.AttemptResult = true 167 req.Include.AttemptResult = true
167 req.Limit.MaxDataSize = 10 168 req.Limit.MaxDataSize = 10
168 data := `{"data":["very","yes"]}` 169 data := `{"data":["very","yes"]}`
169 » » » » aExpect := dm.NewAttemptFinished(time.Time{}, ui nt32(len(data)), "") 170 » » » » aExpect := dm.NewAttemptFinished(time.Time{}, ui nt32(len(data)), "", nil)
170 aExpect.Data.NumExecutions = 1 171 aExpect.Data.NumExecutions = 1
171 aExpect.Partial = &dm.Attempt_Partial{Result: dm .Attempt_Partial_DATA_SIZE_LIMIT} 172 aExpect.Partial = &dm.Attempt_Partial{Result: dm .Attempt_Partial_DATA_SIZE_LIMIT}
172 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 173 » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphDa ta{
173 Quests: map[string]*dm.Quest{ 174 Quests: map[string]*dm.Quest{
174 w: { 175 w: {
175 Attempts: map[uint32]*dm .Attempt{1: aExpect}, 176 Attempts: map[uint32]*dm .Attempt{1: aExpect},
176 }, 177 },
177 }, 178 },
178 }) 179 })
179 }) 180 })
180 181
181 Convey("attemptRange", func() { 182 Convey("attemptRange", func() {
182 » » » » x := ensureQuest(c, "x", 1) 183 » » » » x := s.ensureQuest(c, "x", 1)
183 ttest.Drain(c) 184 ttest.Drain(c)
184 » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID( w, 1))), 185
185 » » » » » dm.NewAttemptID(x, 1), dm.NewAttemptID(x , 2), dm.NewAttemptID(x, 3), 186 » » » » wEx := dm.NewExecutionID(w, 1, 1)
186 » » » » » dm.NewAttemptID(x, 4)) 187 » » » » dist.RunTask(c, wEx, func(tsk *fake.Task) error {
188 » » » » » tsk.MustActivate(c, s).DepOn(
189 » » » » » » dm.NewAttemptID(x, 1), dm.NewAtt emptID(x, 2), dm.NewAttemptID(x, 3),
190 » » » » » » dm.NewAttemptID(x, 4))
191 » » » » » return nil
192 » » » » })
187 ttest.Drain(c) 193 ttest.Drain(c)
188 194
189 req.Limit.MaxDepth = 1 195 req.Limit.MaxDepth = 1
190 Convey("normal", func() { 196 Convey("normal", func() {
191 req.Query = dm.AttemptRangeQuery(x, 2, 4 ) 197 req.Query = dm.AttemptRangeQuery(x, 2, 4 )
192 » » » » » So(req, WalkShouldReturn(c), &dm.GraphDa ta{ 198 » » » » » So(req, fake.WalkShouldReturn(c, s), &dm .GraphData{
193 Quests: map[string]*dm.Quest{ 199 Quests: map[string]*dm.Quest{
194 x: {Attempts: map[uint32 ]*dm.Attempt{2: {}, 3: {}}}, 200 x: {Attempts: map[uint32 ]*dm.Attempt{2: {}, 3: {}}},
195 }, 201 },
196 }) 202 })
197 }) 203 })
198 204
199 Convey("oob range", func() { 205 Convey("oob range", func() {
200 req.Query = dm.AttemptRangeQuery(x, 2, 6 ) 206 req.Query = dm.AttemptRangeQuery(x, 2, 6 )
201 » » » » » So(req, WalkShouldReturn(c), &dm.GraphDa ta{ 207 » » » » » So(req, fake.WalkShouldReturn(c, s), &dm .GraphData{
202 Quests: map[string]*dm.Quest{ 208 Quests: map[string]*dm.Quest{
203 x: {Attempts: map[uint32 ]*dm.Attempt{ 209 x: {Attempts: map[uint32 ]*dm.Attempt{
204 2: {}, 3: {}, 4: {}, 5: {DNE: true}}}, 210 2: {}, 3: {}, 4: {}, 5: {DNE: true}}},
205 }, 211 },
206 }) 212 })
207 }) 213 })
208 }) 214 })
209 215
210 Convey("filtered attempt results", func() { 216 Convey("filtered attempt results", func() {
211 » » » » x := ensureQuest(c, "x", 2) 217 » » » » x := s.ensureQuest(c, "x", 2)
212 » » » » ttest.Drain(c)
213 » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID( w, 1))), dm.NewAttemptID(x, 1))
214 ttest.Drain(c) 218 ttest.Drain(c)
215 219
216 » » » » exp := google_pb.NewTimestamp(datastore.RoundTim e(clock.Now(c).Add(time.Hour * 24 * 4))) 220 » » » » dist.RunTask(c, dm.NewExecutionID(w, 1, 1), func (tsk *fake.Task) error {
221 » » » » » tsk.MustActivate(c, s).DepOn(dm.NewAttem ptID(x, 1))
222 » » » » » tsk.State = []byte("originalState")
223 » » » » » return nil
224 » » » » })
225 » » » » ttest.Drain(c)
226
227 » » » » exp := datastore.RoundTime(clock.Now(c).Add(time .Hour * 24 * 4))
217 228
218 x1data := `{"data":["I can see this"]}` 229 x1data := `{"data":["I can see this"]}`
219 » » » » _, err := s.FinishAttempt(c, &dm.FinishAttemptRe q{ 230 » » » » dist.RunTask(c, dm.NewExecutionID(x, 1, 1), func (tsk *fake.Task) error {
220 » » » » » Auth: activate(c, execute(c, dm.Ne wAttemptID(x, 1))), 231 » » » » » tsk.MustActivate(c, s).Finish(x1data, ex p)
221 » » » » » JsonResult: x1data, 232 » » » » » tsk.State = []byte("atmpt1")
222 » » » » » Expiration: exp, 233 » » » » » return nil
223 }) 234 })
224 So(err, ShouldBeNil)
225 235
226 x2data := `{"data":["nope"]}` 236 x2data := `{"data":["nope"]}`
227 » » » » _, err = s.FinishAttempt(c, &dm.FinishAttemptReq { 237 » » » » dist.RunTask(c, dm.NewExecutionID(x, 2, 1), func (tsk *fake.Task) error {
228 » » » » » Auth: activate(c, execute(c, dm.Ne wAttemptID(x, 2))), 238 » » » » » tsk.MustActivate(c, s).Finish(x2data, ex p)
229 » » » » » JsonResult: x2data, 239 » » » » » tsk.State = []byte("atmpt2")
230 » » » » » Expiration: exp, 240 » » » » » return nil
231 }) 241 })
232 » » » » So(err, ShouldBeNil) 242
243 » » » » // This Drain does:
244 » » » » // RecordCompletion -> AckFwdDep -> ScheduleEx ecution
245 » » » » // which attempts to load the configuration from the context, and
246 » » » » // panics if it's missing.
233 ttest.Drain(c) 247 ttest.Drain(c)
234 248
235 » » » » req.Auth = activate(c, execute(c, dm.NewAttemptI D(w, 1))) 249 » » » » wEID := dm.NewExecutionID(w, 1, 2)
236 » » » » req.Limit.MaxDepth = 2 250 » » » » wEx := model.ExecutionFromID(c, wEID)
237 » » » » req.Include.AttemptResult = true 251 » » » » So(ds.Get(wEx), ShouldBeNil)
238 » » » » req.Query = dm.AttemptListQueryL(map[string][]ui nt32{x: nil})
239 252
240 » » » » x1Expect := dm.NewAttemptFinished(time.Time{}, u int32(len(x1data)), x1data) 253 » » » » dist.RunTask(c, wEID, func(tsk *fake.Task) error {
241 » » » » x1Expect.Data.NumExecutions = 1 254 » » » » » So(tsk.State, ShouldResemble, distributo r.PersistentState("originalState"))
242 255
243 » » » » x2Expect := dm.NewAttemptFinished(time.Time{}, u int32(len(x2data)), "") 256 » » » » » act := tsk.MustActivate(c, s)
244 » » » » x2Expect.Partial = &dm.Attempt_Partial{Result: d m.Attempt_Partial_NOT_AUTHORIZED} 257 » » » » » req.Limit.MaxDepth = 2
245 » » » » x2Expect.Data.NumExecutions = 1 258 » » » » » req.Include.AttemptResult = true
259 » » » » » req.Query = dm.AttemptListQueryL(map[str ing][]uint32{x: nil})
246 260
247 » » » » So(req, WalkShouldReturn(c), &dm.GraphData{ 261 » » » » » x1Expect := dm.NewAttemptFinished(time.T ime{}, uint32(len(x1data)), x1data, []byte("atmpt1"))
248 » » » » » Quests: map[string]*dm.Quest{ 262 » » » » » x1Expect.Data.NumExecutions = 1
249 » » » » » » x: {Attempts: map[uint32]*dm.Att empt{ 263
250 » » » » » » » 1: x1Expect, 264 » » » » » x2Expect := dm.NewAttemptFinished(time.T ime{}, uint32(len(x2data)), "", nil)
251 » » » » » » » 2: x2Expect, 265 » » » » » x2Expect.Partial = &dm.Attempt_Partial{R esult: dm.Attempt_Partial_NOT_AUTHORIZED}
252 » » » » » » }}, 266 » » » » » x2Expect.Data.NumExecutions = 1
253 » » » » » }}) 267
268 » » » » » So(req, act.WalkShouldReturn, &dm.GraphD ata{
269 » » » » » » Quests: map[string]*dm.Quest{
270 » » » » » » » x: {Attempts: map[uint32 ]*dm.Attempt{
271 » » » » » » » » 1: x1Expect,
272 » » » » » » » » 2: x2Expect,
273 » » » » » » » }},
274 » » » » » » },
275 » » » » » })
276 » » » » » return nil
277 » » » » })
254 }) 278 })
255 279
256 Convey("own attempt results", func() { 280 Convey("own attempt results", func() {
257 » » » » x := ensureQuest(c, "x", 2) 281 » » » » x := s.ensureQuest(c, "x", 2)
258 ttest.Drain(c) 282 ttest.Drain(c)
259 » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID( w, 1))), dm.NewAttemptID(x, 1)) 283 » » » » dist.RunTask(c, dm.NewExecutionID(w, 1, 1), func (tsk *fake.Task) error {
284 » » » » » tsk.MustActivate(c, s).DepOn(dm.NewAttem ptID(x, 1))
285 » » » » » return nil
286 » » » » })
260 ttest.Drain(c) 287 ttest.Drain(c)
261 288
262 » » » » exp := google_pb.NewTimestamp(datastore.RoundTim e(clock.Now(c).Add(time.Hour * 24 * 4))) 289 » » » » exp := datastore.RoundTime(clock.Now(c).Add(time .Hour * 24 * 4))
263 290
264 x1data := `{"data":["I can see this"]}` 291 x1data := `{"data":["I can see this"]}`
265 » » » » _, err := s.FinishAttempt(c, &dm.FinishAttemptRe q{ 292 » » » » dist.RunTask(c, dm.NewExecutionID(x, 1, 1), func (tsk *fake.Task) error {
266 » » » » » Auth: activate(c, execute(c, dm.Ne wAttemptID(x, 1))), 293 » » » » » tsk.MustActivate(c, s).Finish(x1data, ex p)
267 » » » » » JsonResult: x1data, 294 » » » » » tsk.State = []byte("state")
268 » » » » » Expiration: exp, 295 » » » » » return nil
269 }) 296 })
270 So(err, ShouldBeNil)
271 ttest.Drain(c) 297 ttest.Drain(c)
272 298
273 » » » » req.Auth = activate(c, execute(c, dm.NewAttemptI D(w, 1))) 299 » » » » dist.RunTask(c, dm.NewExecutionID(w, 1, 2), func (tsk *fake.Task) error {
274 » » » » req.Limit.MaxDepth = 2 300 » » » » » act := tsk.MustActivate(c, s)
275 » » » » req.Include.AttemptResult = true 301 » » » » » req.Limit.MaxDepth = 2
276 » » » » req.Query = dm.AttemptListQueryL(map[string][]ui nt32{w: {1}}) 302 » » » » » req.Include.AttemptResult = true
303 » » » » » req.Query = dm.AttemptListQueryL(map[str ing][]uint32{w: {1}})
277 304
278 » » » » x1Expect := dm.NewAttemptFinished(time.Time{}, u int32(len(x1data)), x1data) 305 » » » » » x1Expect := dm.NewAttemptFinished(time.T ime{}, uint32(len(x1data)), x1data, []byte("state"))
279 » » » » x1Expect.Data.NumExecutions = 1 306 » » » » » x1Expect.Data.NumExecutions = 1
280 307
281 » » » » w1Exepct := dm.NewAttemptExecuting(2) 308 » » » » » w1Exepct := dm.NewAttemptExecuting(2)
282 » » » » w1Exepct.Data.NumExecutions = 2 309 » » » » » w1Exepct.Data.NumExecutions = 2
283 310
284 » » » » // This filter ensures that WalkShouldReturn is using the optimized 311 » » » » » // This filter ensures that WalkShouldRe turn is using the optimized
285 » » » » // path for deps traversal when starting from an authed attempt. 312 » » » » » // path for deps traversal when starting from an authed attempt.
286 » » » » c = datastore.AddRawFilters(c, func(c context.Co ntext, ri datastore.RawInterface) datastore.RawInterface { 313 » » » » » c = datastore.AddRawFilters(c, func(c co ntext.Context, ri datastore.RawInterface) datastore.RawInterface {
287 » » » » » return breakFwdDepLoads{ri} 314 » » » » » » return breakFwdDepLoads{ri}
315 » » » » » })
316
317 » » » » » So(req, act.WalkShouldReturn, &dm.GraphD ata{
318 » » » » » » Quests: map[string]*dm.Quest{
319 » » » » » » » w: {Attempts: map[uint32 ]*dm.Attempt{1: w1Exepct}},
320 » » » » » » » x: {Attempts: map[uint32 ]*dm.Attempt{1: x1Expect}},
321 » » » » » » },
322 » » » » » })
323 » » » » » return nil
288 }) 324 })
289
290 So(req, WalkShouldReturn(c), &dm.GraphData{
291 Quests: map[string]*dm.Quest{
292 w: {Attempts: map[uint32]*dm.Att empt{1: w1Exepct}},
293 x: {Attempts: map[uint32]*dm.Att empt{1: x1Expect}},
294 }})
295 }) 325 })
296 326
297 Convey("deps (no dest attempts)", func() { 327 Convey("deps (no dest attempts)", func() {
298 req.Limit.MaxDepth = 3 328 req.Limit.MaxDepth = 3
299 » » » » w1auth := activate(c, execute(c, dm.NewAttemptID (w, 1))) 329
300 » » » » x := ensureQuest(c, "x", 1) 330 » » » » x := s.ensureQuest(c, "x", 1)
301 ttest.Drain(c) 331 ttest.Drain(c)
302 depOn(c, w1auth, dm.NewAttemptID(x, 1), dm.NewAt temptID(x, 2))
303 332
304 » » » » Convey("before tumble", func() { 333 » » » » dist.RunTask(c, dm.NewExecutionID(w, 1, 1), func (tsk *fake.Task) error {
305 » » » » » Convey("deps", func() { 334 » » » » » tsk.MustActivate(c, s).DepOn(dm.NewAttem ptID(x, 1), dm.NewAttemptID(x, 2))
335
336 » » » » » Convey("before tumble", func() {
306 req.Include.FwdDeps = true 337 req.Include.FwdDeps = true
307 // didn't run tumble, so that x| 1 and x|2 don't get created. 338 // didn't run tumble, so that x| 1 and x|2 don't get created.
308 » » » » » » So(req, WalkShouldReturn(c), &dm .GraphData{ 339 » » » » » » // don't use act.WalkShouldRetur n; we want to observe the graph
340 » » » » » » // state from
341 » » » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphData{
309 Quests: map[string]*dm.Q uest{ 342 Quests: map[string]*dm.Q uest{
310 w: {Attempts: ma p[uint32]*dm.Attempt{1: { 343 w: {Attempts: ma p[uint32]*dm.Attempt{1: {
311 FwdDeps: dm.NewAttemptList(map[string][]uint32{ 344 FwdDeps: dm.NewAttemptList(map[string][]uint32{
312 x: {2, 1}, 345 x: {2, 1},
313 }), 346 }),
314 }}}, 347 }}},
315 x: {Attempts: ma p[uint32]*dm.Attempt{ 348 x: {Attempts: ma p[uint32]*dm.Attempt{
316 1: {FwdD eps: &dm.AttemptList{}}, // exists, but has no fwddeps 349 1: {FwdD eps: &dm.AttemptList{}}, // exists, but has no fwddeps
317 2: {DNE: true}, 350 2: {DNE: true},
318 }}, 351 }},
319 }, 352 },
320 }) 353 })
321 }) 354 })
355 return nil
322 }) 356 })
323 357
324 Convey("after tumble", func() { 358 Convey("after tumble", func() {
325 ttest.Drain(c) 359 ttest.Drain(c)
360
326 Convey("deps (with dest attempts)", func () { 361 Convey("deps (with dest attempts)", func () {
327 req.Include.FwdDeps = true 362 req.Include.FwdDeps = true
328 req.Include.BackDeps = true 363 req.Include.BackDeps = true
329 » » » » » » So(req, WalkShouldReturn(c), &dm .GraphData{ 364 » » » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphData{
330 Quests: map[string]*dm.Q uest{ 365 Quests: map[string]*dm.Q uest{
331 w: {Attempts: ma p[uint32]*dm.Attempt{1: { 366 w: {Attempts: ma p[uint32]*dm.Attempt{1: {
332 FwdDeps: dm.NewAttemptList(map[string][]uint32{x: {2, 1}}), 367 FwdDeps: dm.NewAttemptList(map[string][]uint32{x: {2, 1}}),
333 BackDeps : &dm.AttemptList{}, 368 BackDeps : &dm.AttemptList{},
334 }}}, 369 }}},
335 x: {Attempts: ma p[uint32]*dm.Attempt{1: { 370 x: {Attempts: ma p[uint32]*dm.Attempt{1: {
336 FwdDeps: &dm.AttemptList{}, 371 FwdDeps: &dm.AttemptList{},
337 BackDeps : dm.NewAttemptList(map[string][]uint32{w: {1}}), 372 BackDeps : dm.NewAttemptList(map[string][]uint32{w: {1}}),
338 }, 2: { 373 }, 2: {
339 FwdDeps: &dm.AttemptList{}, 374 FwdDeps: &dm.AttemptList{},
340 BackDeps : dm.NewAttemptList(map[string][]uint32{w: {1}}), 375 BackDeps : dm.NewAttemptList(map[string][]uint32{w: {1}}),
341 }}}, 376 }}},
342 }, 377 },
343 }) 378 })
344 }) 379 })
345 380
346 Convey("diamond", func() { 381 Convey("diamond", func() {
347 » » » » » » z := ensureQuest(c, "z", 1) 382 » » » » » » z := s.ensureQuest(c, "z", 1)
348 » » » » » » ttest.Drain(c)
349 » » » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID(x, 1))), dm.NewAttemptID(z, 1))
350 » » » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID(x, 2))), dm.NewAttemptID(z, 1))
351 ttest.Drain(c) 383 ttest.Drain(c)
352 384
353 » » » » » » So(req, WalkShouldReturn(c), &dm .GraphData{ 385 » » » » » » dist.RunTask(c, dm.NewExecutionI D(x, 1, 1), func(tsk *fake.Task) error {
354 » » » » » » » Quests: map[string]*dm.Q uest{ 386 » » » » » » » tsk.MustActivate(c, s).D epOn(dm.NewAttemptID(z, 1))
355 » » » » » » » » w: {Attempts: ma p[uint32]*dm.Attempt{1: {}}}, 387 » » » » » » » return nil
356 » » » » » » » » x: {Attempts: ma p[uint32]*dm.Attempt{1: {}, 2: {}}},
357 » » » » » » » » z: {Attempts: ma p[uint32]*dm.Attempt{1: {}}},
358 » » » » » » » },
359 }) 388 })
360 » » » » » }) 389 » » » » » » dist.RunTask(c, dm.NewExecutionI D(x, 2, 1), func(tsk *fake.Task) error {
361 390 » » » » » » » tsk.MustActivate(c, s).D epOn(dm.NewAttemptID(z, 1))
362 » » » » » Convey("diamond (dfs)", func() { 391 » » » » » » » return nil
363 » » » » » » z := ensureQuest(c, "z", 1) 392 » » » » » » })
364 » » » » » » ttest.Drain(c)
365 » » » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID(x, 1))), dm.NewAttemptID(z, 1))
366 » » » » » » depOn(c, activate(c, execute(c, dm.NewAttemptID(x, 2))), dm.NewAttemptID(z, 1))
367 ttest.Drain(c) 393 ttest.Drain(c)
368 394
369 » » » » » » req.Mode.Dfs = true 395 » » » » » » Convey("walk", func() {
370 » » » » » » So(req, WalkShouldReturn(c), &dm .GraphData{ 396 » » » » » » » So(req, fake.WalkShouldR eturn(c, s), &dm.GraphData{
371 » » » » » » » Quests: map[string]*dm.Q uest{ 397 » » » » » » » » Quests: map[stri ng]*dm.Quest{
372 » » » » » » » » w: {Attempts: ma p[uint32]*dm.Attempt{1: {}}}, 398 » » » » » » » » » w: {Atte mpts: map[uint32]*dm.Attempt{1: {}}},
373 » » » » » » » » x: {Attempts: ma p[uint32]*dm.Attempt{1: {}, 2: {}}}, 399 » » » » » » » » » x: {Atte mpts: map[uint32]*dm.Attempt{1: {}, 2: {}}},
374 » » » » » » » » z: {Attempts: ma p[uint32]*dm.Attempt{1: {}}}, 400 » » » » » » » » » z: {Atte mpts: map[uint32]*dm.Attempt{1: {}}},
375 » » » » » » » }, 401 » » » » » » » » },
402 » » » » » » » })
376 }) 403 })
404
405 Convey("walk (dfs)", func() {
406 req.Mode.Dfs = true
407 So(req, fake.WalkShouldR eturn(c, s), &dm.GraphData{
408 Quests: map[stri ng]*dm.Quest{
409 w: {Atte mpts: map[uint32]*dm.Attempt{1: {}}},
410 x: {Atte mpts: map[uint32]*dm.Attempt{1: {}, 2: {}}},
411 z: {Atte mpts: map[uint32]*dm.Attempt{1: {}}},
412 },
413 })
414 })
415
377 }) 416 })
378 417
379 Convey("attemptlist", func() { 418 Convey("attemptlist", func() {
380 req.Limit.MaxDepth = 1 419 req.Limit.MaxDepth = 1
381 req.Include.ObjectIds = true 420 req.Include.ObjectIds = true
382 req.Query = dm.AttemptListQueryL (map[string][]uint32{x: nil}) 421 req.Query = dm.AttemptListQueryL (map[string][]uint32{x: nil})
383 » » » » » » So(req, WalkShouldReturn(c), &dm .GraphData{ 422 » » » » » » So(req, fake.WalkShouldReturn(c, s), &dm.GraphData{
384 Quests: map[string]*dm.Q uest{ 423 Quests: map[string]*dm.Q uest{
385 x: { 424 x: {
386 Id: dm.N ewQuestID(x), 425 Id: dm.N ewQuestID(x),
387 Attempts : map[uint32]*dm.Attempt{ 426 Attempts : map[uint32]*dm.Attempt{
388 1: {Id: dm.NewAttemptID(x, 1)}, 427 1: {Id: dm.NewAttemptID(x, 1)},
389 2: {Id: dm.NewAttemptID(x, 2)}, 428 2: {Id: dm.NewAttemptID(x, 2)},
390 }, 429 },
391 }, 430 },
392 }, 431 },
393 }) 432 })
394 }) 433 })
395 434
396 }) 435 })
397 436
398 // This is disabled beacuse it was flaky. 437 // This is disabled beacuse it was flaky.
399 // BUG: crbug.com/621170 438 // BUG: crbug.com/621170
400 SkipConvey("early stop", func() { 439 SkipConvey("early stop", func() {
401 req.Limit.MaxDepth = 100 440 req.Limit.MaxDepth = 100
402 req.Limit.MaxTime = google_pb.NewDuratio n(time.Nanosecond) 441 req.Limit.MaxTime = google_pb.NewDuratio n(time.Nanosecond)
403 tc := clock.Get(c).(testclock.TestClock) 442 tc := clock.Get(c).(testclock.TestClock)
404 tc.SetTimerCallback(func(d time.Duration , t clock.Timer) { 443 tc.SetTimerCallback(func(d time.Duration , t clock.Timer) {
405 tc.Add(d + time.Second) 444 tc.Add(d + time.Second)
406 }) 445 })
407 » » » » » ret, err := newDecoratedDeps().WalkGraph (c, req) 446 » » » » » ret, err := s.WalkGraph(c, req)
408 So(err, ShouldBeNil) 447 So(err, ShouldBeNil)
409 So(ret.HadMore, ShouldBeTrue) 448 So(ret.HadMore, ShouldBeTrue)
410 }) 449 })
411 450
412 }) 451 })
413 }) 452 })
414 }) 453 })
415 } 454 }
OLDNEW
« no previous file with comments | « appengine/cmd/dm/deps/walk_graph.go ('k') | appengine/cmd/dm/distributor/config.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698