Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(503)

Side by Side Diff: appengine/tumble/example_test.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tumble/doc.go ('k') | appengine/tumble/fire_tasks.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "encoding/base64"
9 "fmt"
10 "net/http"
11 "net/http/httptest"
12 "sort"
13 "strings"
14 "testing"
15 "time"
16
17 "github.com/julienschmidt/httprouter"
18 "github.com/luci/gae/impl/memory"
19 "github.com/luci/gae/service/datastore"
20 "github.com/luci/gae/service/taskqueue"
21 "github.com/luci/luci-go/common/bit_field"
22 "github.com/luci/luci-go/common/clock/testclock"
23 "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/common/logging/memlogger"
25 . "github.com/smartystreets/goconvey/convey"
26 "golang.org/x/net/context"
27 )
28
29 type User struct {
30 Name string `gae:"$id"`
31 }
32
33 func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*O utgoingMessage, error) {
34 sort.Strings(toUsers)
35 ds := datastore.Get(c)
36 k := ds.KeyForObj(u)
37 outMsg := &OutgoingMessage{
38 FromUser: k,
39 Message: msg,
40 Recipients: toUsers,
41 Success: bf.Make(uint64(len(toUsers))),
42 Failure: bf.Make(uint64(len(toUsers))),
43 }
44 err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error ) {
45 ds := datastore.Get(c)
46 if err := ds.Put(outMsg); err != nil {
47 return nil, err
48 }
49 outKey := ds.KeyForObj(outMsg)
50 muts := make([]Mutation, len(toUsers))
51 for i := range muts {
52 muts[i] = &SendMessage{outKey, toUsers[i]}
53 }
54 return muts, nil
55 })
56 if err != nil {
57 outMsg = nil
58 }
59 return outMsg, err
60 }
61
62 type OutgoingMessage struct {
63 // datastore-assigned
64 ID int64 `gae:"$id"`
65 FromUser *datastore.Key `gae:"$parent"`
66
67 Message string `gae:",noindex"`
68 Recipients []string `gae:",noindex"`
69
70 Success bf.BitField
71 Failure bf.BitField
72 }
73
74 type IncomingMessage struct {
75 // OtherUser|OutgoingMessageID
76 ID string `gae:"$id"`
77 ForUser *datastore.Key `gae:"$parent"`
78 }
79
80 type SendMessage struct {
81 Message *datastore.Key
82 ToUser string
83 }
84
85 func (m *SendMessage) Root(ctx context.Context) *datastore.Key {
86 return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser})
87 }
88
89 func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) {
90 ds := datastore.Get(c)
91 u := &User{Name: m.ToUser}
92 if err := ds.Get(u); err != nil {
93 if err == datastore.ErrNoSuchEntity {
94 return []Mutation{&WriteReceipt{m.Message, m.ToUser, fal se}}, nil
95 }
96 return nil, err
97 }
98 im := &IncomingMessage{
99 ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.M essage.IntID()),
100 ForUser: ds.KeyForObj(&User{Name: m.ToUser}),
101 }
102 err := ds.Get(im)
103 if err == datastore.ErrNoSuchEntity {
104 err = ds.Put(im)
105 return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err
106 }
107 return nil, err
108 }
109
110 type WriteReceipt struct {
111 Message *datastore.Key
112 Recipient string
113 Success bool
114 }
115
116 func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key {
117 return w.Message.Root()
118 }
119
120 func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) {
121 ds := datastore.Get(c)
122 m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent( )}
123 if err := ds.Get(m); err != nil {
124 return nil, err
125 }
126
127 idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient))
128 if w.Success {
129 m.Success.Set(idx)
130 } else {
131 m.Failure.Set(idx)
132 }
133
134 return nil, ds.Put(m)
135 }
136
137 func init() {
138 Register((*SendMessage)(nil))
139 Register((*WriteReceipt)(nil))
140
141 dustSettleTimeout = 0
142 }
143
144 func TestHighLevel(t *testing.T) {
145 t.Parallel()
146
147 Convey("Tumble", t, func() {
148 Convey("Check registration", func() {
149 So(registry, ShouldContainKey, "*tumble.SendMessage")
150 })
151
152 Convey("Good", func() {
153 ctx := memory.Use(memlogger.Use(context.Background()))
154 ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC )
155 cfg := GetConfig(ctx)
156 ds := datastore.Get(ctx)
157 tq := taskqueue.Get(ctx)
158 l := logging.Get(ctx).(*memlogger.MemLogger)
159 _ = l
160
161 tq.Testable().CreateQueue(cfg.Name)
162
163 ds.Testable().AddIndexes(&datastore.IndexDefinition{
164 Kind: "tumble.Mutation",
165 SortBy: []datastore.IndexColumn{
166 {Property: "ExpandedShard"},
167 {Property: "TargetRoot"},
168 },
169 })
170 ds.Testable().CatchupIndexes()
171
172 iterate := func() int {
173 ret := 0
174 tsks := tq.Testable().GetScheduledTasks()[cfg.Na me]
175 for _, tsk := range tsks {
176 if tsk.ETA.After(clk.Now()) {
177 continue
178 }
179 toks := strings.Split(tsk.Path, "/")
180 rec := httptest.NewRecorder()
181 ProcessShardHandler(ctx, rec, &http.Requ est{
182 Header: http.Header{"X-AppEngine -QueueName": []string{cfg.Name}},
183 }, httprouter.Params{
184 {Key: "shard_id", Value: toks[4] },
185 {Key: "timestamp", Value: toks[6 ]},
186 })
187 So(rec.Code, ShouldEqual, 200)
188 So(tq.Delete(tsk, cfg.Name), ShouldBeNil )
189 ret++
190 }
191 return ret
192 }
193
194 cron := func() {
195 rec := httptest.NewRecorder()
196 FireAllTasksHandler(ctx, rec, &http.Request{
197 Header: http.Header{"X-Appengine-Cron": []string{"true"}},
198 }, nil)
199 So(rec.Code, ShouldEqual, 200)
200 }
201
202 charlie := &User{Name: "charlie"}
203 So(ds.Put(charlie), ShouldBeNil)
204
205 Convey("can't send to someone who doesn't exist", func() {
206 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "lennon")
207 So(err, ShouldBeNil)
208
209 // need to advance clock and catch up indexes
210 So(iterate(), ShouldEqual, 0)
211 clk.Add(time.Second * 10)
212
213 // need to catch up indexes
214 So(iterate(), ShouldEqual, 1)
215
216 cron()
217 ds.Testable().CatchupIndexes()
218 clk.Add(time.Second * 10)
219
220 So(iterate(), ShouldEqual, cfg.NumShards)
221 ds.Testable().CatchupIndexes()
222 clk.Add(time.Second * 10)
223
224 So(iterate(), ShouldEqual, 1)
225
226 So(ds.Get(outMsg), ShouldBeNil)
227 So(outMsg.Failure.All(true), ShouldBeTrue)
228 })
229
230 Convey("sending to yourself could be done in one iterati on if you're lucky", func() {
231 ds.Testable().Consistent(true)
232
233 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "charlie")
234 So(err, ShouldBeNil)
235
236 clk.Add(time.Second * 10)
237
238 So(iterate(), ShouldEqual, 1)
239
240 So(ds.Get(outMsg), ShouldBeNil)
241 So(outMsg.Success.All(true), ShouldBeTrue)
242 })
243
244 Convey("different version IDs log a warning", func() {
245 ds.Testable().Consistent(true)
246
247 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "charlie")
248 So(err, ShouldBeNil)
249
250 rm := &realMutation{
251 ID: "0000000000000001_00000000",
252 Parent: ds.KeyForObj(charlie),
253 }
254 So(ds.Get(rm), ShouldBeNil)
255 So(rm.Version, ShouldEqual, "testVersionID.1")
256 rm.Version = "otherCodeVersion.1"
257 So(ds.Put(rm), ShouldBeNil)
258
259 clk.Add(time.Second * 10)
260
261 l.Reset()
262 So(iterate(), ShouldEqual, 1)
263 So(l.Has(logging.Warning, "loading mutation with different code version", map[string]interface{}{
264 "key": "tumble.23.lock",
265 "clientID": "-62132730888_23",
266 "mut_version": "otherCodeVersion.1",
267 "cur_version": "testVersionID.1",
268 }), ShouldBeTrue)
269
270 So(ds.Get(outMsg), ShouldBeNil)
271 So(outMsg.Success.All(true), ShouldBeTrue)
272 })
273
274 Convey("sending to 200 people is no big deal", func() {
275 users := make([]User, 200)
276 recipients := make([]string, 200)
277 for i := range recipients {
278 name := base64.StdEncoding.EncodeToStrin g([]byte{byte(i)})
279 recipients[i] = name
280 users[i].Name = name
281 }
282 So(ds.PutMulti(users), ShouldBeNil)
283
284 outMsg, err := charlie.SendMessage(ctx, "Hey the re", recipients...)
285 So(err, ShouldBeNil)
286
287 // do all the SendMessages
288 ds.Testable().CatchupIndexes()
289 clk.Add(time.Second * 10)
290 So(iterate(), ShouldEqual, cfg.NumShards)
291
292 // do all the WriteReceipts
293 l.Reset()
294 ds.Testable().CatchupIndexes()
295 clk.Add(time.Second * 10)
296 So(iterate(), ShouldEqual, 1)
297
298 // hacky proof that all 200 incoming message rec iepts were buffered
299 // appropriately.
300 So(l.Has(logging.Info, "successfully processed 1 28 mutations, adding 0 more", map[string]interface{}{
301 "key": "tumble.23.lock",
302 "clientID": "-62132730880_23",
303 }), ShouldBeTrue)
304 So(l.Has(logging.Info, "successfully processed 7 2 mutations, adding 0 more", map[string]interface{}{
305 "key": "tumble.23.lock",
306 "clientID": "-62132730880_23",
307 }), ShouldBeTrue)
308
309 So(ds.Get(outMsg), ShouldBeNil)
310 So(outMsg.Success.All(true), ShouldBeTrue)
311 So(outMsg.Success.Size(), ShouldEqual, 200)
312
313 })
314
315 })
316
317 })
318 }
OLDNEW
« no previous file with comments | « appengine/tumble/doc.go ('k') | appengine/tumble/fire_tasks.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698