Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(125)

Side by Side Diff: appengine/tumble/model_mutation.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: use exists Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tumble/fire_tasks_test.go ('k') | appengine/tumble/process.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "bytes"
9 "crypto/sha1"
10 "encoding/binary"
11 "encoding/gob"
12 "fmt"
13 "math"
14 "reflect"
15 "sync"
16
17 "github.com/luci/gae/service/datastore"
18 "github.com/luci/gae/service/info"
19 "github.com/luci/luci-go/appengine/meta"
20 "github.com/luci/luci-go/common/logging"
21 "golang.org/x/net/context"
22 )
23
24 var registry = map[string]reflect.Type{}
25
26 // Register allows |mut| to be played by the tumble backend. This should be
27 // called at init() time once for every Mutation implementation.
28 //
29 // Example:
30 // Register((*MyMutationImpl)(nil))
31 func Register(mut Mutation) {
32 t := reflect.TypeOf(mut)
33 registry[t.String()] = t
34 }
35
36 // Mutation is the interface that your tumble mutations must implement.
37 //
38 // Mutation implementations can be registered with the Register function.
39 type Mutation interface {
40 // Root returns a datastore.Key which will be used to derive the Key for the
41 // entity group which this Mutation will operate on. This is used to bat ch
42 // together Entries for more efficient processing.
43 //
44 // Returning nil is an error.
45 Root(c context.Context) *datastore.Key
46
47 // RollForward performs the action of the Mutation.
48 //
49 // It is only considered sucessful if it returns nil. If it returns non- nil,
50 // then it will be retried at a later time. If it never returns nil, the n it
51 // will never be flushed from tumble's queue, and you'll have to manuall y
52 // delete it or fix the code so that it can be handled without error.
53 //
54 // This method runs inside of a single-group transaction. It must modify only
55 // the entity group specified by Root().
56 //
57 // As a side effect, RollForward may return new arbitrary Mutations. The se
58 // will be comitted in the same transaction as RollForward.
59 //
60 // The context contains an implementation of "luci/gae/service/datastore ",
61 // using the "luci/gae/filter/txnBuf" transaction buffer. This means tha t
62 // all functionality (including additional transactions) is available, w ith
63 // the limitations mentioned by that package (notably, no cursors are
64 // allowed).
65 RollForward(c context.Context) ([]Mutation, error)
66 }
67
68 type realMutation struct {
69 // TODO(riannucci): add functionality to luci/gae/service/datastore so t hat
70 // GetMeta/SetMeta may be overridden by the struct.
71 _kind string `gae:"$kind,tumble.Mutation"`
72 ID string `gae:"$id"`
73 Parent *datastore.Key `gae:"$parent"`
74
75 ExpandedShard int64
76 TargetRoot *datastore.Key
77
78 Version string
79 Type string
80 Data []byte `gae:",noindex"`
81 }
82
83 func (r *realMutation) shard(cfg *Config) uint64 {
84 expandedShardsPerShard := math.MaxUint64 / cfg.NumShards
85 ret := uint64(r.ExpandedShard-math.MinInt64) / expandedShardsPerShard
86 // account for rounding errors on the last shard.
87 if ret >= cfg.NumShards {
88 ret = cfg.NumShards - 1
89 }
90 return ret
91 }
92
93 func putMutations(c context.Context, fromRoot *datastore.Key, muts []Mutation) ( shardSet map[uint64]struct{}, err error) {
94 cfg := GetConfig(c)
95
96 if len(muts) == 0 {
97 return
98 }
99
100 version, err := meta.GetEntityGroupVersion(c, fromRoot)
101 if err != nil {
102 return
103 }
104
105 shardSet = map[uint64]struct{}{}
106 toPut := make([]*realMutation, len(muts))
107 for i, m := range muts {
108 id := fmt.Sprintf("%016x_%08x", version, i)
109 toPut[i], err = newRealMutation(c, id, fromRoot, m)
110 if err != nil {
111 logging.Errorf(c, "error creating real mutation for %v: %s", m, err)
112 return
113 }
114
115 shardSet[toPut[i].shard(&cfg)] = struct{}{}
116 }
117
118 if err = datastore.Get(c).PutMulti(toPut); err != nil {
119 logging.Errorf(c, "error putting %d new mutations: %s", len(toPu t), err)
120 }
121 return
122 }
123
124 var appVersion = struct {
125 sync.Once
126 version string
127 }{}
128
129 func getAppVersion(c context.Context) string {
130 appVersion.Do(func() {
131 appVersion.version = info.Get(c).VersionID()
132 })
133 return appVersion.version
134 }
135
136 func newRealMutation(c context.Context, id string, parent *datastore.Key, m Muta tion) (*realMutation, error) {
137 t := reflect.TypeOf(m).String()
138 if _, ok := registry[t]; !ok {
139 return nil, fmt.Errorf("Attempting to add unregistered mutation %v: %v", t, m)
140 }
141
142 buf := &bytes.Buffer{}
143 err := gob.NewEncoder(buf).Encode(m)
144 if err != nil {
145 return nil, err
146 }
147
148 root := m.Root(c).Root()
149
150 hash := sha1.Sum([]byte(root.Encode()))
151 eshard := int64(binary.BigEndian.Uint64(hash[:]))
152
153 return &realMutation{
154 ID: id,
155 Parent: parent,
156
157 ExpandedShard: eshard,
158 TargetRoot: root,
159
160 Version: getAppVersion(c),
161 Type: t,
162 Data: buf.Bytes(),
163 }, nil
164 }
165
166 func (r *realMutation) GetMutation() (Mutation, error) {
167 typ, ok := registry[r.Type]
168 if !ok {
169 return nil, fmt.Errorf("unable to load reflect.Type for %q", r.T ype)
170 }
171
172 ret := reflect.New(typ)
173 if err := gob.NewDecoder(bytes.NewBuffer(r.Data)).DecodeValue(ret); err != nil {
174 return nil, err
175 }
176
177 return ret.Elem().Interface().(Mutation), nil
178 }
OLDNEW
« no previous file with comments | « appengine/tumble/fire_tasks_test.go ('k') | appengine/tumble/process.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698