Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(985)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: fixes Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "fmt"
9 "infra/gae/libs/wrapper"
10 "math/rand"
11 "net/http"
12 "regexp"
13 "time"
14
15 "golang.org/x/net/context"
16
17 "appengine"
18 "appengine/taskqueue"
19 "appengine_internal"
20 dbpb "appengine_internal/datastore"
21 pb "appengine_internal/taskqueue"
22 )
23
24 /////////////////////////////// public functions ///////////////////////////////
25
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible
27 // by wrapper.GetTQ(c)
28 func UseTQ(c context.Context) context.Context {
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
30 tqd := cur(ic).Get("TQ")
31 var ret interface {
32 wrapper.TQTestable
33 wrapper.TaskQueue
34 }
35 switch x := tqd.(type) {
36 case *taskQueueData:
37 ret = &taskqueueImpl{
38 wrapper.DummyTQ(),
M-A Ruel 2015/05/28 22:42:39 you create a TQ everytime?
iannucci 2015/05/28 23:00:35 it returns a constant object (a named struct{}) in
39 x,
40 curGID(ic).namespace,
41 func() time.Time { return wrapper.GetTimeNow(ic) },
42 wrapper.GetMathRand(ic),
43 }
44
45 case *txnTaskQueueData:
46 ret = &taskqueueTxnImpl{
47 wrapper.DummyTQ(),
48 x,
49 curGID(ic).namespace,
50 func() time.Time { return wrapper.GetTimeNow(ic) },
51 wrapper.GetMathRand(ic),
52 }
53
54 default:
55 panic(fmt.Errorf("TQ: bad type: %v", tqd))
56 }
57 return ret
58 })
59 }
60
61 //////////////////////////////// taskqueueImpl /////////////////////////////////
62
63 type taskqueueImpl struct {
64 wrapper.TaskQueue
65 *taskQueueData
66
67 ns string
68 timeNow func() time.Time
69 mathRand *rand.Rand
70 }
71
72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand)
74 if err != nil {
75 return nil, err
76 }
77
78 if _, ok := t.archived[queueName][toSched.Name]; ok {
79 // SDK converts TOMBSTONE -> already added too
80 return nil, taskqueue.ErrTaskAlreadyAdded
81 } else if _, ok := t.named[queueName][toSched.Name]; ok {
82 return nil, taskqueue.ErrTaskAlreadyAdded
83 } else {
84 t.named[queueName][toSched.Name] = toSched
85 }
86
87 return dupTask(toSched), nil
88 }
89
90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
91 if err := t.IsBroken(); err != nil {
92 return nil, err
93 }
94
95 t.Lock()
96 defer t.Unlock()
97
98 return t.addLocked(task, queueName)
99 }
100
101 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
102 queueName, err := t.getQueueName(queueName)
103 if err != nil {
104 return err
105 }
106
107 if _, ok := t.archived[queueName][task.Name]; ok {
108 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
109 }
110
111 if _, ok := t.named[queueName][task.Name]; !ok {
112 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
113 }
114
115 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
116 delete(t.named[queueName], task.Name)
117
118 return nil
119 }
120
121 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
122 if err := t.IsBroken(); err != nil {
123 return err
124 }
125
126 t.Lock()
127 defer t.Unlock()
128
129 return t.deleteLocked(task, queueName)
130 }
131
132 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
133 if err := t.IsBroken(); err != nil {
134 return nil, err
135 }
136
M-A Ruel 2015/05/28 22:42:39 Isn't a lock missing here?
iannucci 2015/05/28 23:00:35 indeed
137 return multi(tasks, queueName, t.addLocked)
138 }
139
140 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
141 if err := t.IsBroken(); err != nil {
142 return err
143 }
144
145 t.Lock()
146 defer t.Unlock()
147
148 _, err := multi(tasks, queueName,
149 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
150 return nil, t.deleteLocked(tsk, qn)
151 })
152 return err
153 }
154
155 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
156
157 type taskqueueTxnImpl struct {
158 wrapper.TaskQueue
159 *txnTaskQueueData
160
161 ns string
162 timeNow func() time.Time
163 mathRand *rand.Rand
164 }
165
166 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
167 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand)
168 if err != nil {
169 return nil, err
170 }
171
172 numTasks := 0
173 for _, vs := range t.anony {
174 numTasks += len(vs)
175 }
176 if numTasks+1 > 5 {
177 // transactional tasks are actually implemented 'for real' as Ac tions which
178 // ride on the datastore. The current datastore implementation o nly allows
179 // a maximum of 5 Actions per transaction, and more than that re sult in a
180 // BAD_REQUEST.
181 return nil, newDSError(dbpb.Error_BAD_REQUEST)
182 }
183
184 t.anony[queueName] = append(t.anony[queueName], toSched)
185
186 // the fact that we have generated a unique name for this task queue ite m is
187 // an implementation detail.
188 // TODO(riannucci): now that I think about this... it may not actually b e true.
189 // We should verify that the .Name for a task added in a tr ansaction is
190 // meaningless. Maybe names generated in a transaction are somehow
191 // guaranteed to be meaningful?
192 toRet := dupTask(toSched)
193 toRet.Name = ""
194
195 return toRet, nil
196 }
197
198 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
199 if err := t.IsBroken(); err != nil {
200 return nil, err
201 }
202
203 t.Lock()
204 defer t.Unlock()
205
206 return t.addLocked(task, queueName)
207 }
208
209 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
210 if err := t.IsBroken(); err != nil {
211 return nil, err
212 }
213
214 t.Lock()
215 defer t.Unlock()
216
217 return multi(tasks, queueName, t.addLocked)
218 }
219
220 ////////////////////////////// private functions ///////////////////////////////
221
222 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
223
224 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
225
226 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string {
227 _, ok := queue[cur]
228 for !ok && cur == "" {
229 name := [500]byte{}
230 for i := 0; i < 500; i++ {
231 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))]
232 }
233 cur = string(name[:])
234 _, ok = queue[cur]
235 }
236 return cur
237 }
238
239 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
240 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
241 }
242
243 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
244 ret := []*taskqueue.Task(nil)
245 me := appengine.MultiError(nil)
246 foundErr := false
247 for _, task := range tasks {
248 rt, err := f(task, queueName)
249 ret = append(ret, rt)
250 me = append(me, err)
251 if err != nil {
252 foundErr = true
253 }
254 }
255 if !foundErr {
256 me = nil
257 }
258 return ret, me
259 }
260
261 func dupTask(t *taskqueue.Task) *taskqueue.Task {
262 ret := &taskqueue.Task{}
263 *ret = *t
264
265 if t.Header != nil {
266 ret.Header = make(http.Header, len(t.Header))
267 for k, vs := range t.Header {
268 newVs := make([]string, len(vs))
269 copy(newVs, vs)
270 ret.Header[k] = newVs
271 }
272 }
273
274 if t.Payload != nil {
275 ret.Payload = make([]byte, len(t.Payload))
276 copy(ret.Payload, t.Payload)
277 }
278
279 if t.RetryOptions != nil {
280 ret.RetryOptions = &taskqueue.RetryOptions{}
281 *ret.RetryOptions = *t.RetryOptions
282 }
283
284 return ret
285 }
286
287 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
288 r := make(wrapper.QueueData, len(q))
289 for k, q := range q {
290 r[k] = make(map[string]*taskqueue.Task, len(q))
291 for tn, t := range q {
292 r[k][tn] = dupTask(t)
293 }
294 }
295 return r
296 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698