Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(203)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1154213012: Add context-aware "time" library wrapper. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Added coverage files. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "infra/gae/libs/wrapper" 9 "infra/gae/libs/wrapper"
10 "math/rand"
11 "net/http" 10 "net/http"
12 "regexp" 11 "regexp"
13 "time"
14 12
15 "golang.org/x/net/context" 13 "golang.org/x/net/context"
16 14
17 "appengine" 15 "appengine"
18 "appengine/taskqueue" 16 "appengine/taskqueue"
19 "appengine_internal" 17 "appengine_internal"
20 dbpb "appengine_internal/datastore" 18 dbpb "appengine_internal/datastore"
21 pb "appengine_internal/taskqueue" 19 pb "appengine_internal/taskqueue"
22 ) 20 )
23 21
24 /////////////////////////////// public functions /////////////////////////////// 22 /////////////////////////////// public functions ///////////////////////////////
25 23
26 func useTQ(c context.Context) context.Context { 24 func useTQ(c context.Context) context.Context {
27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { 25 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
28 tqd := cur(ic).Get(memContextTQIdx) 26 tqd := cur(ic).Get(memContextTQIdx)
29 var ret interface { 27 var ret interface {
30 wrapper.TQTestable 28 wrapper.TQTestable
31 wrapper.TaskQueue 29 wrapper.TaskQueue
32 } 30 }
33 switch x := tqd.(type) { 31 switch x := tqd.(type) {
34 case *taskQueueData: 32 case *taskQueueData:
35 ret = &taskqueueImpl{ 33 ret = &taskqueueImpl{
36 wrapper.DummyTQ(), 34 wrapper.DummyTQ(),
37 x, 35 x,
36 ic,
38 curGID(ic).namespace, 37 curGID(ic).namespace,
39 func() time.Time { return wrapper.GetTimeNow(ic) },
40 wrapper.GetMathRand(ic),
41 } 38 }
42 39
43 case *txnTaskQueueData: 40 case *txnTaskQueueData:
44 ret = &taskqueueTxnImpl{ 41 ret = &taskqueueTxnImpl{
45 wrapper.DummyTQ(), 42 wrapper.DummyTQ(),
46 x, 43 x,
44 ic,
47 curGID(ic).namespace, 45 curGID(ic).namespace,
48 func() time.Time { return wrapper.GetTimeNow(ic) },
49 wrapper.GetMathRand(ic),
50 } 46 }
51 47
52 default: 48 default:
53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) 49 panic(fmt.Errorf("TQ: bad type: %v", tqd))
54 } 50 }
55 return ret 51 return ret
56 }) 52 })
57 } 53 }
58 54
59 //////////////////////////////// taskqueueImpl ///////////////////////////////// 55 //////////////////////////////// taskqueueImpl /////////////////////////////////
60 56
61 type taskqueueImpl struct { 57 type taskqueueImpl struct {
62 wrapper.TaskQueue 58 wrapper.TaskQueue
63 *taskQueueData 59 *taskQueueData
64 60
65 » ns string 61 » ctx context.Context
66 » timeNow func() time.Time 62 » ns string
67 » mathRand *rand.Rand
68 } 63 }
69 64
70 var ( 65 var (
71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) 66 _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) 67 _ = wrapper.TQTestable((*taskqueueImpl)(nil))
73 ) 68 )
74 69
75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { 70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) 71 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
77 if err != nil { 72 if err != nil {
78 return nil, err 73 return nil, err
79 } 74 }
80 75
81 if _, ok := t.archived[queueName][toSched.Name]; ok { 76 if _, ok := t.archived[queueName][toSched.Name]; ok {
82 // SDK converts TOMBSTONE -> already added too 77 // SDK converts TOMBSTONE -> already added too
83 return nil, taskqueue.ErrTaskAlreadyAdded 78 return nil, taskqueue.ErrTaskAlreadyAdded
84 } else if _, ok := t.named[queueName][toSched.Name]; ok { 79 } else if _, ok := t.named[queueName][toSched.Name]; ok {
85 return nil, taskqueue.ErrTaskAlreadyAdded 80 return nil, taskqueue.ErrTaskAlreadyAdded
86 } else { 81 } else {
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
157 }) 152 })
158 return err 153 return err
159 } 154 }
160 155
161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 156 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
162 157
163 type taskqueueTxnImpl struct { 158 type taskqueueTxnImpl struct {
164 wrapper.TaskQueue 159 wrapper.TaskQueue
165 *txnTaskQueueData 160 *txnTaskQueueData
166 161
167 » ns string 162 » ctx context.Context
168 » timeNow func() time.Time 163 » ns string
169 » mathRand *rand.Rand
170 } 164 }
171 165
172 var ( 166 var (
173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) 167 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) 168 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
175 ) 169 )
176 170
177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { 171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) 172 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e)
179 if err != nil { 173 if err != nil {
180 return nil, err 174 return nil, err
181 } 175 }
182 176
183 numTasks := 0 177 numTasks := 0
184 for _, vs := range t.anony { 178 for _, vs := range t.anony {
185 numTasks += len(vs) 179 numTasks += len(vs)
186 } 180 }
187 if numTasks+1 > 5 { 181 if numTasks+1 > 5 {
188 // transactional tasks are actually implemented 'for real' as Ac tions which 182 // transactional tasks are actually implemented 'for real' as Ac tions which
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
227 221
228 return multi(tasks, queueName, t.addLocked) 222 return multi(tasks, queueName, t.addLocked)
229 } 223 }
230 224
231 ////////////////////////////// private functions /////////////////////////////// 225 ////////////////////////////// private functions ///////////////////////////////
232 226
233 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
234 228
235 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
236 230
237 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string { 231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str ing {
238 _, ok := queue[cur] 232 _, ok := queue[cur]
239 for !ok && cur == "" { 233 for !ok && cur == "" {
240 name := [500]byte{} 234 name := [500]byte{}
241 for i := 0; i < 500; i++ { 235 for i := 0; i < 500; i++ {
242 » » » name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] 236 » » » name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len (validTaskChars))]
243 } 237 }
244 cur = string(name[:]) 238 cur = string(name[:])
245 _, ok = queue[cur] 239 _, ok = queue[cur]
246 } 240 }
247 return cur 241 return cur
248 } 242 }
249 243
250 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error { 244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
251 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)} 245 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
252 } 246 }
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { 292 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
299 r := make(wrapper.QueueData, len(q)) 293 r := make(wrapper.QueueData, len(q))
300 for k, q := range q { 294 for k, q := range q {
301 r[k] = make(map[string]*taskqueue.Task, len(q)) 295 r[k] = make(map[string]*taskqueue.Task, len(q))
302 for tn, t := range q { 296 for tn, t := range q {
303 r[k][tn] = dupTask(t) 297 r[k][tn] = dupTask(t)
304 } 298 }
305 } 299 }
306 return r 300 return r
307 } 301 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/memcache_test.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698