OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "infra/gae/libs/wrapper" | |
11 "net/http" | |
12 "sync" | |
13 "sync/atomic" | |
14 | |
15 "appengine/datastore" | |
16 "appengine/taskqueue" | |
17 pb "appengine_internal/taskqueue" | |
18 "golang.org/x/net/context" | |
19 "infra/libs/clock" | |
20 ) | |
21 | |
22 var ( | |
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") | |
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") | |
25 ) | |
26 | |
27 //////////////////////////////// taskQueueData ///////////////////////////////// | |
28 | |
29 type taskQueueData struct { | |
30 sync.Mutex | |
31 wrapper.BrokenFeatures | |
32 | |
33 named wrapper.QueueData | |
34 archived wrapper.QueueData | |
35 } | |
36 | |
37 var ( | |
38 _ = memContextObj((*taskQueueData)(nil)) | |
39 _ = wrapper.TQTestable((*taskQueueData)(nil)) | |
40 ) | |
41 | |
42 func newTaskQueueData() memContextObj { | |
43 return &taskQueueData{ | |
44 BrokenFeatures: wrapper.BrokenFeatures{ | |
45 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI
ENT_ERROR)}, | |
46 named: wrapper.QueueData{"default": {}}, | |
47 archived: wrapper.QueueData{"default": {}}, | |
48 } | |
49 } | |
50 | |
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } | |
52 func (t *taskQueueData) endTxn() {} | |
53 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { | |
54 txn := obj.(*txnTaskQueueData) | |
55 for qn, tasks := range txn.anony { | |
56 for _, tsk := range tasks { | |
57 tsk.Name = mkName(c, tsk.Name, t.named[qn]) | |
58 t.named[qn][tsk.Name] = tsk | |
59 } | |
60 } | |
61 txn.anony = nil | |
62 } | |
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err
or) { | |
64 return &txnTaskQueueData{ | |
65 BrokenFeatures: &t.BrokenFeatures, | |
66 parent: t, | |
67 anony: wrapper.AnonymousQueueData{}, | |
68 }, nil | |
69 } | |
70 | |
71 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
72 return nil | |
73 } | |
74 | |
75 func (t *taskQueueData) CreateQueue(queueName string) { | |
76 t.Lock() | |
77 defer t.Unlock() | |
78 | |
79 if _, ok := t.named[queueName]; ok { | |
80 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw
ice! %q", queueName)) | |
81 } | |
82 t.named[queueName] = map[string]*taskqueue.Task{} | |
83 t.archived[queueName] = map[string]*taskqueue.Task{} | |
84 } | |
85 | |
86 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { | |
87 t.Lock() | |
88 defer t.Unlock() | |
89 | |
90 return dupQueue(t.named) | |
91 } | |
92 | |
93 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
94 t.Lock() | |
95 defer t.Unlock() | |
96 | |
97 return dupQueue(t.archived) | |
98 } | |
99 | |
100 func (t *taskQueueData) resetTasksWithLock() { | |
101 for queuename := range t.named { | |
102 t.named[queuename] = map[string]*taskqueue.Task{} | |
103 t.archived[queuename] = map[string]*taskqueue.Task{} | |
104 } | |
105 } | |
106 | |
107 func (t *taskQueueData) ResetTasks() { | |
108 t.Lock() | |
109 defer t.Unlock() | |
110 | |
111 t.resetTasksWithLock() | |
112 } | |
113 | |
114 func (t *taskQueueData) getQueueName(queueName string) (string, error) { | |
115 if queueName == "" { | |
116 queueName = "default" | |
117 } | |
118 if _, ok := t.named[queueName]; !ok { | |
119 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) | |
120 } | |
121 return queueName, nil | |
122 } | |
123 | |
124 func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
ask, queueName string) ( | |
125 *taskqueue.Task, string, error) { | |
126 queueName, err := t.getQueueName(queueName) | |
127 if err != nil { | |
128 return nil, "", err | |
129 } | |
130 | |
131 toSched := dupTask(task) | |
132 | |
133 if toSched.Path == "" { | |
134 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) | |
135 } | |
136 | |
137 if toSched.ETA.IsZero() { | |
138 toSched.ETA = clock.Now(c).Add(toSched.Delay) | |
139 } else if toSched.Delay != 0 { | |
140 panic("taskqueue: both Delay and ETA are set") | |
141 } | |
142 toSched.Delay = 0 | |
143 | |
144 if toSched.Method == "" { | |
145 toSched.Method = "POST" | |
146 } | |
147 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method];
!ok { | |
148 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M
ethod) | |
149 } | |
150 if toSched.Method != "POST" && toSched.Method != "PUT" { | |
151 toSched.Payload = nil | |
152 } | |
153 | |
154 if _, ok := toSched.Header[currentNamespace]; !ok { | |
155 if ns != "" { | |
156 if toSched.Header == nil { | |
157 toSched.Header = http.Header{} | |
158 } | |
159 toSched.Header[currentNamespace] = []string{ns} | |
160 } | |
161 } | |
162 // TODO(riannucci): implement DefaultNamespace | |
163 | |
164 if toSched.Name == "" { | |
165 toSched.Name = mkName(c, "", t.named[queueName]) | |
166 } else { | |
167 if !validTaskName.MatchString(toSched.Name) { | |
168 return nil, "", newTQError(pb.TaskQueueServiceError_INVA
LID_TASK_NAME) | |
169 } | |
170 } | |
171 | |
172 return toSched, queueName, nil | |
173 } | |
174 | |
175 /////////////////////////////// txnTaskQueueData /////////////////////////////// | |
176 | |
177 type txnTaskQueueData struct { | |
178 *wrapper.BrokenFeatures | |
179 | |
180 lock sync.Mutex | |
181 | |
182 // boolean 0 or 1, use atomic.*Int32 to access. | |
183 closed int32 | |
184 anony wrapper.AnonymousQueueData | |
185 parent *taskQueueData | |
186 } | |
187 | |
188 var ( | |
189 _ = memContextObj((*txnTaskQueueData)(nil)) | |
190 _ = wrapper.TQTestable((*txnTaskQueueData)(nil)) | |
191 ) | |
192 | |
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } | |
194 | |
195 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { | |
196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) | |
197 } | |
198 | |
199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj,
error) { | |
200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") | |
201 } | |
202 | |
203 func (t *txnTaskQueueData) endTxn() { | |
204 if atomic.LoadInt32(&t.closed) == 1 { | |
205 panic("cannot end transaction twice") | |
206 } | |
207 atomic.StoreInt32(&t.closed, 1) | |
208 } | |
209 | |
210 func (t *txnTaskQueueData) IsBroken() error { | |
211 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
212 // this here, where in the SDK only datastore.transaction.Call does. | |
213 if atomic.LoadInt32(&t.closed) == 1 { | |
214 return fmt.Errorf("taskqueue: transaction context has expired") | |
215 } | |
216 return t.parent.IsBroken() | |
217 } | |
218 | |
219 func (t *txnTaskQueueData) ResetTasks() { | |
220 t.Lock() | |
221 defer t.Unlock() | |
222 | |
223 for queuename := range t.anony { | |
224 t.anony[queuename] = nil | |
225 } | |
226 t.parent.resetTasksWithLock() | |
227 } | |
228 | |
229 func (t *txnTaskQueueData) Lock() { | |
230 t.lock.Lock() | |
231 t.parent.Lock() | |
232 } | |
233 func (t *txnTaskQueueData) Unlock() { | |
234 t.parent.Unlock() | |
235 t.lock.Unlock() | |
236 } | |
237 | |
238 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
239 t.Lock() | |
240 defer t.Unlock() | |
241 | |
242 ret := make(wrapper.AnonymousQueueData, len(t.anony)) | |
243 for k, vs := range t.anony { | |
244 ret[k] = make([]*taskqueue.Task, len(vs)) | |
245 for i, v := range vs { | |
246 tsk := dupTask(v) | |
247 tsk.Name = "" | |
248 ret[k][i] = tsk | |
249 } | |
250 } | |
251 | |
252 return ret | |
253 } | |
254 | |
255 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
256 return t.parent.GetTombstonedTasks() | |
257 } | |
258 | |
259 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { | |
260 return t.parent.GetScheduledTasks() | |
261 } | |
262 | |
263 func (t *txnTaskQueueData) CreateQueue(queueName string) { | |
264 t.parent.CreateQueue(queueName) | |
265 } | |
OLD | NEW |