Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: appengine/tumble/model_mutation.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "bytes"
9 "crypto/sha1"
10 "encoding/binary"
11 "encoding/gob"
12 "fmt"
13 "math"
14 "reflect"
15
16 "github.com/luci/gae/service/datastore"
17 "github.com/luci/luci-go/appengine/meta"
18 "github.com/luci/luci-go/common/logging"
19 "golang.org/x/net/context"
20 )
21
22 var registry = map[string]reflect.Type{}
23
24 // Register allows |mut| to be played by the tumble backend. This should be
25 // called at init() time once for every Mutation implementation.
26 //
27 // Example:
28 // Register((*MyMutationImpl)(nil))
29 func Register(mut Mutation) {
30 t := reflect.TypeOf(mut)
31 registry[t.String()] = t
32 }
33
34 // Mutation is the interface that your tumble mutations must implement.
35 //
36 // Mutation implementations can be registered with the Register function.
37 type Mutation interface {
38 // Root returns a datastore.Key which will be used to derive the Key for the
39 // entity group which this Mutation will operate on. This is used to bat ch
40 // together Entries for more efficient processing.
41 //
42 // Returning nil is an error.
43 Root(c context.Context) *datastore.Key
44
45 // RollForward performs the action of the Mutation.
46 //
47 // It is only considered sucessful if it returns nil. If it returns non- nil,
48 // then it will be retried at a later time. If it never returns nil, the n it
49 // will never be flushed from tumble's queue, and you'll have to manuall y
50 // delete it or fix the code so that it can be handled without error.
51 //
52 // This method runs inside of a single-group transaction. It must modify only
53 // the entity group specified by Root().
54 //
55 // As a side effect, RollForward may return new arbitrary Mutations. The se
56 // will be comitted in the same transaction as RollForward.
57 //
58 // The context contains an implementation of "luci/gae/service/datastore ",
59 // using the "luci/gae/filter/txnBuf" transaction buffer. This means tha t
60 // all functionality (including additional transactions) is available, w ith
61 // the limitations mentioned by that package (notably, no cursors are
62 // allowed).
63 RollForward(c context.Context) ([]Mutation, error)
64 }
65
66 type realMutation struct {
67 // TODO(riannucci): add functionality to luci/gae/service/datastore so t hat
68 // GetMeta/SetMeta may be overridden by the struct.
69 _kind string `gae:"$kind,tumble.Mutation"`
70 ID string `gae:"$id"`
71 Parent *datastore.Key `gae:"$parent"`
72
73 ExpandedShard int64
74 TargetRoot *datastore.Key
75
76 Type string `gae:",noindex"`
77 Data []byte `gae:",noindex"`
78 }
79
80 func (r *realMutation) shard(cfg *Config) uint64 {
81 expandedShardsPerShard := math.MaxUint64 / cfg.NumShards
82 ret := uint64(r.ExpandedShard-math.MinInt64) / expandedShardsPerShard
83 // account for rounding errors on the last shard.
84 if ret >= cfg.NumShards {
85 ret = cfg.NumShards - 1
86 }
87 return ret
88 }
89
90 func putMutations(c context.Context, fromRoot *datastore.Key, muts []Mutation) ( shardSet map[uint64]struct{}, err error) {
91 cfg := GetConfig(c)
92
93 if len(muts) == 0 {
94 return
95 }
96
97 version, err := meta.GetEntityGroupVersion(c, fromRoot)
98 if err != nil {
99 return
100 }
101
102 shardSet = map[uint64]struct{}{}
103 toPut := make([]*realMutation, len(muts))
104 for i, m := range muts {
105 id := fmt.Sprintf("%016x_%08x", version, i)
106 toPut[i], err = newRealMutation(c, id, fromRoot, m)
107 if err != nil {
108 logging.Errorf(c, "error creating real mutation for %v: %s", m, err)
109 return
110 }
111
112 shardSet[toPut[i].shard(&cfg)] = struct{}{}
113 }
114
115 if err = datastore.Get(c).PutMulti(toPut); err != nil {
116 logging.Errorf(c, "error putting %d new mutations: %s", len(toPu t), err)
117 }
118 return
119 }
120
121 func newRealMutation(c context.Context, id string, parent *datastore.Key, m Muta tion) (*realMutation, error) {
122 t := reflect.TypeOf(m).String()
123 if _, ok := registry[t]; !ok {
124 return nil, fmt.Errorf("Attempting to add unregistered mutation %v: %v", t, m)
125 }
126
127 buf := &bytes.Buffer{}
128 err := gob.NewEncoder(buf).Encode(m)
129 if err != nil {
130 return nil, err
131 }
132
133 root := m.Root(c).Root()
134
135 hash := sha1.Sum([]byte(root.Encode()))
136 eshard := int64(binary.BigEndian.Uint64(hash[:]))
137
138 return &realMutation{
139 ID: id,
140 Parent: parent,
141
142 ExpandedShard: eshard,
143 TargetRoot: root,
144
145 Type: t,
146 Data: buf.Bytes(),
147 }, nil
148 }
149
150 func (r *realMutation) GetMutation() (Mutation, error) {
151 typ, ok := registry[r.Type]
152 if !ok {
153 return nil, fmt.Errorf("unable to load reflect.Type for %q", r.T ype)
154 }
155
156 ret := reflect.New(typ)
157 if err := gob.NewDecoder(bytes.NewBuffer(r.Data)).DecodeValue(ret); err != nil {
158 return nil, err
159 }
160
161 return ret.Elem().Interface().(Mutation), nil
162 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698