OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "fmt" | |
9 "infra/gae/libs/wrapper" | |
10 "net/http" | |
11 "regexp" | |
12 | |
13 "golang.org/x/net/context" | |
14 | |
15 "appengine" | |
16 "appengine/taskqueue" | |
17 "appengine_internal" | |
18 dbpb "appengine_internal/datastore" | |
19 pb "appengine_internal/taskqueue" | |
20 ) | |
21 | |
22 /////////////////////////////// public functions /////////////////////////////// | |
23 | |
24 func useTQ(c context.Context) context.Context { | |
25 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { | |
26 tqd := cur(ic).Get(memContextTQIdx) | |
27 var ret interface { | |
28 wrapper.TQTestable | |
29 wrapper.TaskQueue | |
30 } | |
31 switch x := tqd.(type) { | |
32 case *taskQueueData: | |
33 ret = &taskqueueImpl{ | |
34 wrapper.DummyTQ(), | |
35 x, | |
36 ic, | |
37 curGID(ic).namespace, | |
38 } | |
39 | |
40 case *txnTaskQueueData: | |
41 ret = &taskqueueTxnImpl{ | |
42 wrapper.DummyTQ(), | |
43 x, | |
44 ic, | |
45 curGID(ic).namespace, | |
46 } | |
47 | |
48 default: | |
49 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | |
50 } | |
51 return ret | |
52 }) | |
53 } | |
54 | |
55 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
56 | |
57 type taskqueueImpl struct { | |
58 wrapper.TaskQueue | |
59 *taskQueueData | |
60 | |
61 ctx context.Context | |
62 ns string | |
63 } | |
64 | |
65 var ( | |
66 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) | |
67 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) | |
68 ) | |
69 | |
70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { | |
71 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | |
72 if err != nil { | |
73 return nil, err | |
74 } | |
75 | |
76 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
77 // SDK converts TOMBSTONE -> already added too | |
78 return nil, taskqueue.ErrTaskAlreadyAdded | |
79 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
80 return nil, taskqueue.ErrTaskAlreadyAdded | |
81 } else { | |
82 t.named[queueName][toSched.Name] = toSched | |
83 } | |
84 | |
85 return dupTask(toSched), nil | |
86 } | |
87 | |
88 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.
Task, error) { | |
89 if err := t.IsBroken(); err != nil { | |
90 return nil, err | |
91 } | |
92 | |
93 t.Lock() | |
94 defer t.Unlock() | |
95 | |
96 return t.addLocked(task, queueName) | |
97 } | |
98 | |
99 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
or { | |
100 queueName, err := t.getQueueName(queueName) | |
101 if err != nil { | |
102 return err | |
103 } | |
104 | |
105 if _, ok := t.archived[queueName][task.Name]; ok { | |
106 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) | |
107 } | |
108 | |
109 if _, ok := t.named[queueName][task.Name]; !ok { | |
110 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) | |
111 } | |
112 | |
113 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
114 delete(t.named[queueName], task.Name) | |
115 | |
116 return nil | |
117 } | |
118 | |
119 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { | |
120 if err := t.IsBroken(); err != nil { | |
121 return err | |
122 } | |
123 | |
124 t.Lock() | |
125 defer t.Unlock() | |
126 | |
127 return t.deleteLocked(task, queueName) | |
128 } | |
129 | |
130 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*
taskqueue.Task, error) { | |
131 if err := t.IsBroken(); err != nil { | |
132 return nil, err | |
133 } | |
134 | |
135 t.Lock() | |
136 defer t.Unlock() | |
137 | |
138 return multi(tasks, queueName, t.addLocked) | |
139 } | |
140 | |
141 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e
rror { | |
142 if err := t.IsBroken(); err != nil { | |
143 return err | |
144 } | |
145 | |
146 t.Lock() | |
147 defer t.Unlock() | |
148 | |
149 _, err := multi(tasks, queueName, | |
150 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { | |
151 return nil, t.deleteLocked(tsk, qn) | |
152 }) | |
153 return err | |
154 } | |
155 | |
156 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
157 | |
158 type taskqueueTxnImpl struct { | |
159 wrapper.TaskQueue | |
160 *txnTaskQueueData | |
161 | |
162 ctx context.Context | |
163 ns string | |
164 } | |
165 | |
166 var ( | |
167 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) | |
168 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) | |
169 ) | |
170 | |
171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { | |
172 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | |
173 if err != nil { | |
174 return nil, err | |
175 } | |
176 | |
177 numTasks := 0 | |
178 for _, vs := range t.anony { | |
179 numTasks += len(vs) | |
180 } | |
181 if numTasks+1 > 5 { | |
182 // transactional tasks are actually implemented 'for real' as Ac
tions which | |
183 // ride on the datastore. The current datastore implementation o
nly allows | |
184 // a maximum of 5 Actions per transaction, and more than that re
sult in a | |
185 // BAD_REQUEST. | |
186 return nil, newDSError(dbpb.Error_BAD_REQUEST) | |
187 } | |
188 | |
189 t.anony[queueName] = append(t.anony[queueName], toSched) | |
190 | |
191 // the fact that we have generated a unique name for this task queue ite
m is | |
192 // an implementation detail. | |
193 // TODO(riannucci): now that I think about this... it may not actually b
e true. | |
194 // We should verify that the .Name for a task added in a tr
ansaction is | |
195 // meaningless. Maybe names generated in a transaction are
somehow | |
196 // guaranteed to be meaningful? | |
197 toRet := dupTask(toSched) | |
198 toRet.Name = "" | |
199 | |
200 return toRet, nil | |
201 } | |
202 | |
203 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque
ue.Task, error) { | |
204 if err := t.IsBroken(); err != nil { | |
205 return nil, err | |
206 } | |
207 | |
208 t.Lock() | |
209 defer t.Unlock() | |
210 | |
211 return t.addLocked(task, queueName) | |
212 } | |
213 | |
214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) (
[]*taskqueue.Task, error) { | |
215 if err := t.IsBroken(); err != nil { | |
216 return nil, err | |
217 } | |
218 | |
219 t.Lock() | |
220 defer t.Unlock() | |
221 | |
222 return multi(tasks, queueName, t.addLocked) | |
223 } | |
224 | |
225 ////////////////////////////// private functions /////////////////////////////// | |
226 | |
227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
228 | |
229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | |
230 | |
231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str
ing { | |
232 _, ok := queue[cur] | |
233 for !ok && cur == "" { | |
234 name := [500]byte{} | |
235 for i := 0; i < 500; i++ { | |
236 name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len
(validTaskChars))] | |
237 } | |
238 cur = string(name[:]) | |
239 _, ok = queue[cur] | |
240 } | |
241 return cur | |
242 } | |
243 | |
244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { | |
245 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} | |
246 } | |
247 | |
248 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { | |
249 ret := []*taskqueue.Task(nil) | |
250 me := appengine.MultiError(nil) | |
251 foundErr := false | |
252 for _, task := range tasks { | |
253 rt, err := f(task, queueName) | |
254 ret = append(ret, rt) | |
255 me = append(me, err) | |
256 if err != nil { | |
257 foundErr = true | |
258 } | |
259 } | |
260 if !foundErr { | |
261 me = nil | |
262 } | |
263 return ret, me | |
264 } | |
265 | |
266 func dupTask(t *taskqueue.Task) *taskqueue.Task { | |
267 ret := &taskqueue.Task{} | |
268 *ret = *t | |
269 | |
270 if t.Header != nil { | |
271 ret.Header = make(http.Header, len(t.Header)) | |
272 for k, vs := range t.Header { | |
273 newVs := make([]string, len(vs)) | |
274 copy(newVs, vs) | |
275 ret.Header[k] = newVs | |
276 } | |
277 } | |
278 | |
279 if t.Payload != nil { | |
280 ret.Payload = make([]byte, len(t.Payload)) | |
281 copy(ret.Payload, t.Payload) | |
282 } | |
283 | |
284 if t.RetryOptions != nil { | |
285 ret.RetryOptions = &taskqueue.RetryOptions{} | |
286 *ret.RetryOptions = *t.RetryOptions | |
287 } | |
288 | |
289 return ret | |
290 } | |
291 | |
292 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | |
293 r := make(wrapper.QueueData, len(q)) | |
294 for k, q := range q { | |
295 r[k] = make(map[string]*taskqueue.Task, len(q)) | |
296 for tn, t := range q { | |
297 r[k][tn] = dupTask(t) | |
298 } | |
299 } | |
300 return r | |
301 } | |
OLD | NEW |