OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package tumble |
| 6 |
| 7 import ( |
| 8 "encoding/base64" |
| 9 "fmt" |
| 10 "net/http" |
| 11 "net/http/httptest" |
| 12 "sort" |
| 13 "strings" |
| 14 "testing" |
| 15 "time" |
| 16 |
| 17 "github.com/julienschmidt/httprouter" |
| 18 "github.com/luci/gae/impl/memory" |
| 19 "github.com/luci/gae/service/datastore" |
| 20 "github.com/luci/gae/service/taskqueue" |
| 21 "github.com/luci/luci-go/common/bit_field" |
| 22 "github.com/luci/luci-go/common/clock/testclock" |
| 23 "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/common/logging/memlogger" |
| 25 . "github.com/smartystreets/goconvey/convey" |
| 26 "golang.org/x/net/context" |
| 27 ) |
| 28 |
| 29 type User struct { |
| 30 Name string `gae:"$id"` |
| 31 } |
| 32 |
| 33 func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*O
utgoingMessage, error) { |
| 34 sort.Strings(toUsers) |
| 35 ds := datastore.Get(c) |
| 36 k := ds.KeyForObj(u) |
| 37 outMsg := &OutgoingMessage{ |
| 38 FromUser: k, |
| 39 Message: msg, |
| 40 Recipients: toUsers, |
| 41 Success: bf.Make(uint64(len(toUsers))), |
| 42 Failure: bf.Make(uint64(len(toUsers))), |
| 43 } |
| 44 err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error
) { |
| 45 ds := datastore.Get(c) |
| 46 if err := ds.Put(outMsg); err != nil { |
| 47 return nil, err |
| 48 } |
| 49 outKey := ds.KeyForObj(outMsg) |
| 50 muts := make([]Mutation, len(toUsers)) |
| 51 for i := range muts { |
| 52 muts[i] = &SendMessage{outKey, toUsers[i]} |
| 53 } |
| 54 return muts, nil |
| 55 }) |
| 56 if err != nil { |
| 57 outMsg = nil |
| 58 } |
| 59 return outMsg, err |
| 60 } |
| 61 |
| 62 type OutgoingMessage struct { |
| 63 // datastore-assigned |
| 64 ID int64 `gae:"$id"` |
| 65 FromUser *datastore.Key `gae:"$parent"` |
| 66 |
| 67 Message string `gae:",noindex"` |
| 68 Recipients []string `gae:",noindex"` |
| 69 |
| 70 Success bf.BitField |
| 71 Failure bf.BitField |
| 72 } |
| 73 |
| 74 type IncomingMessage struct { |
| 75 // OtherUser|OutgoingMessageID |
| 76 ID string `gae:"$id"` |
| 77 ForUser *datastore.Key `gae:"$parent"` |
| 78 } |
| 79 |
| 80 type SendMessage struct { |
| 81 Message *datastore.Key |
| 82 ToUser string |
| 83 } |
| 84 |
| 85 func (m *SendMessage) Root(ctx context.Context) *datastore.Key { |
| 86 return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser}) |
| 87 } |
| 88 |
| 89 func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) { |
| 90 ds := datastore.Get(c) |
| 91 u := &User{Name: m.ToUser} |
| 92 if err := ds.Get(u); err != nil { |
| 93 if err == datastore.ErrNoSuchEntity { |
| 94 return []Mutation{&WriteReceipt{m.Message, m.ToUser, fal
se}}, nil |
| 95 } |
| 96 return nil, err |
| 97 } |
| 98 im := &IncomingMessage{ |
| 99 ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.M
essage.IntID()), |
| 100 ForUser: ds.KeyForObj(&User{Name: m.ToUser}), |
| 101 } |
| 102 err := ds.Get(im) |
| 103 if err == datastore.ErrNoSuchEntity { |
| 104 err = ds.Put(im) |
| 105 return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err |
| 106 } |
| 107 return nil, err |
| 108 } |
| 109 |
| 110 type WriteReceipt struct { |
| 111 Message *datastore.Key |
| 112 Recipient string |
| 113 Success bool |
| 114 } |
| 115 |
| 116 func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key { |
| 117 return w.Message.Root() |
| 118 } |
| 119 |
| 120 func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) { |
| 121 ds := datastore.Get(c) |
| 122 m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent(
)} |
| 123 if err := ds.Get(m); err != nil { |
| 124 return nil, err |
| 125 } |
| 126 |
| 127 idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient)) |
| 128 if w.Success { |
| 129 m.Success.Set(idx) |
| 130 } else { |
| 131 m.Failure.Set(idx) |
| 132 } |
| 133 |
| 134 return nil, ds.Put(m) |
| 135 } |
| 136 |
| 137 func init() { |
| 138 Register((*SendMessage)(nil)) |
| 139 Register((*WriteReceipt)(nil)) |
| 140 |
| 141 dustSettleTimeout = 0 |
| 142 } |
| 143 |
| 144 func TestHighLevel(t *testing.T) { |
| 145 t.Parallel() |
| 146 |
| 147 Convey("Tumble", t, func() { |
| 148 Convey("Check registration", func() { |
| 149 So(registry, ShouldContainKey, "*tumble.SendMessage") |
| 150 }) |
| 151 |
| 152 Convey("Good", func() { |
| 153 ctx := memory.Use(memlogger.Use(context.Background())) |
| 154 ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC
) |
| 155 cfg := GetConfig(ctx) |
| 156 ds := datastore.Get(ctx) |
| 157 tq := taskqueue.Get(ctx) |
| 158 l := logging.Get(ctx).(*memlogger.MemLogger) |
| 159 _ = l |
| 160 |
| 161 tq.Testable().CreateQueue(cfg.Name) |
| 162 |
| 163 ds.Testable().AddIndexes(&datastore.IndexDefinition{ |
| 164 Kind: "tumble.Mutation", |
| 165 SortBy: []datastore.IndexColumn{ |
| 166 {Property: "ExpandedShard"}, |
| 167 {Property: "TargetRoot"}, |
| 168 }, |
| 169 }) |
| 170 ds.Testable().CatchupIndexes() |
| 171 |
| 172 iterate := func() int { |
| 173 ret := 0 |
| 174 tsks := tq.Testable().GetScheduledTasks()[cfg.Na
me] |
| 175 for _, tsk := range tsks { |
| 176 if tsk.ETA.After(clk.Now()) { |
| 177 continue |
| 178 } |
| 179 toks := strings.Split(tsk.Path, "/") |
| 180 rec := httptest.NewRecorder() |
| 181 ProcessShardHandler(ctx, rec, &http.Requ
est{ |
| 182 Header: http.Header{"X-AppEngine
-QueueName": []string{cfg.Name}}, |
| 183 }, httprouter.Params{ |
| 184 {Key: "shard_id", Value: toks[4]
}, |
| 185 {Key: "timestamp", Value: toks[6
]}, |
| 186 }) |
| 187 So(rec.Code, ShouldEqual, 200) |
| 188 So(tq.Delete(tsk, cfg.Name), ShouldBeNil
) |
| 189 ret++ |
| 190 } |
| 191 return ret |
| 192 } |
| 193 |
| 194 cron := func() { |
| 195 rec := httptest.NewRecorder() |
| 196 FireAllTasksHandler(ctx, rec, &http.Request{ |
| 197 Header: http.Header{"X-Appengine-Cron":
[]string{"true"}}, |
| 198 }, nil) |
| 199 So(rec.Code, ShouldEqual, 200) |
| 200 } |
| 201 |
| 202 charlie := &User{Name: "charlie"} |
| 203 So(ds.Put(charlie), ShouldBeNil) |
| 204 |
| 205 Convey("can't send to someone who doesn't exist", func()
{ |
| 206 outMsg, err := charlie.SendMessage(ctx, "Hey the
re", "lennon") |
| 207 So(err, ShouldBeNil) |
| 208 |
| 209 // need to advance clock and catch up indexes |
| 210 So(iterate(), ShouldEqual, 0) |
| 211 clk.Add(time.Second * 10) |
| 212 |
| 213 // need to catch up indexes |
| 214 So(iterate(), ShouldEqual, 1) |
| 215 |
| 216 cron() |
| 217 ds.Testable().CatchupIndexes() |
| 218 clk.Add(time.Second * 10) |
| 219 |
| 220 So(iterate(), ShouldEqual, cfg.NumShards) |
| 221 ds.Testable().CatchupIndexes() |
| 222 clk.Add(time.Second * 10) |
| 223 |
| 224 So(iterate(), ShouldEqual, 1) |
| 225 |
| 226 So(ds.Get(outMsg), ShouldBeNil) |
| 227 So(outMsg.Failure.All(true), ShouldBeTrue) |
| 228 }) |
| 229 |
| 230 Convey("sending to yourself could be done in one iterati
on if you're lucky", func() { |
| 231 ds.Testable().Consistent(true) |
| 232 |
| 233 outMsg, err := charlie.SendMessage(ctx, "Hey the
re", "charlie") |
| 234 So(err, ShouldBeNil) |
| 235 |
| 236 clk.Add(time.Second * 10) |
| 237 |
| 238 So(iterate(), ShouldEqual, 1) |
| 239 |
| 240 So(ds.Get(outMsg), ShouldBeNil) |
| 241 So(outMsg.Success.All(true), ShouldBeTrue) |
| 242 }) |
| 243 |
| 244 Convey("different version IDs log a warning", func() { |
| 245 ds.Testable().Consistent(true) |
| 246 |
| 247 outMsg, err := charlie.SendMessage(ctx, "Hey the
re", "charlie") |
| 248 So(err, ShouldBeNil) |
| 249 |
| 250 rm := &realMutation{ |
| 251 ID: "0000000000000001_00000000", |
| 252 Parent: ds.KeyForObj(charlie), |
| 253 } |
| 254 So(ds.Get(rm), ShouldBeNil) |
| 255 So(rm.Version, ShouldEqual, "testVersionID.1") |
| 256 rm.Version = "otherCodeVersion.1" |
| 257 So(ds.Put(rm), ShouldBeNil) |
| 258 |
| 259 clk.Add(time.Second * 10) |
| 260 |
| 261 l.Reset() |
| 262 So(iterate(), ShouldEqual, 1) |
| 263 So(l.Has(logging.Warning, "loading mutation with
different code version", map[string]interface{}{ |
| 264 "key": "tumble.23.lock", |
| 265 "clientID": "-62132730888_23", |
| 266 "mut_version": "otherCodeVersion.1", |
| 267 "cur_version": "testVersionID.1", |
| 268 }), ShouldBeTrue) |
| 269 |
| 270 So(ds.Get(outMsg), ShouldBeNil) |
| 271 So(outMsg.Success.All(true), ShouldBeTrue) |
| 272 }) |
| 273 |
| 274 Convey("sending to 200 people is no big deal", func() { |
| 275 users := make([]User, 200) |
| 276 recipients := make([]string, 200) |
| 277 for i := range recipients { |
| 278 name := base64.StdEncoding.EncodeToStrin
g([]byte{byte(i)}) |
| 279 recipients[i] = name |
| 280 users[i].Name = name |
| 281 } |
| 282 So(ds.PutMulti(users), ShouldBeNil) |
| 283 |
| 284 outMsg, err := charlie.SendMessage(ctx, "Hey the
re", recipients...) |
| 285 So(err, ShouldBeNil) |
| 286 |
| 287 // do all the SendMessages |
| 288 ds.Testable().CatchupIndexes() |
| 289 clk.Add(time.Second * 10) |
| 290 So(iterate(), ShouldEqual, cfg.NumShards) |
| 291 |
| 292 // do all the WriteReceipts |
| 293 l.Reset() |
| 294 ds.Testable().CatchupIndexes() |
| 295 clk.Add(time.Second * 10) |
| 296 So(iterate(), ShouldEqual, 1) |
| 297 |
| 298 // hacky proof that all 200 incoming message rec
iepts were buffered |
| 299 // appropriately. |
| 300 So(l.Has(logging.Info, "successfully processed 1
28 mutations, adding 0 more", map[string]interface{}{ |
| 301 "key": "tumble.23.lock", |
| 302 "clientID": "-62132730880_23", |
| 303 }), ShouldBeTrue) |
| 304 So(l.Has(logging.Info, "successfully processed 7
2 mutations, adding 0 more", map[string]interface{}{ |
| 305 "key": "tumble.23.lock", |
| 306 "clientID": "-62132730880_23", |
| 307 }), ShouldBeTrue) |
| 308 |
| 309 So(ds.Get(outMsg), ShouldBeNil) |
| 310 So(outMsg.Success.All(true), ShouldBeTrue) |
| 311 So(outMsg.Success.Size(), ShouldEqual, 200) |
| 312 |
| 313 }) |
| 314 |
| 315 }) |
| 316 |
| 317 }) |
| 318 } |
OLD | NEW |