Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(805)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1154213012: Add context-aware "time" library wrapper. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Added coverage files. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "infra/gae/libs/wrapper" 10 "infra/gae/libs/wrapper"
11 "math/rand"
12 "net/http" 11 "net/http"
13 "sync" 12 "sync"
14 "sync/atomic" 13 "sync/atomic"
15 "time"
16 14
17 "appengine/datastore" 15 "appengine/datastore"
18 "appengine/taskqueue" 16 "appengine/taskqueue"
19 pb "appengine_internal/taskqueue" 17 pb "appengine_internal/taskqueue"
18 "golang.org/x/net/context"
19 "infra/libs/clock"
20 ) 20 )
21 21
22 var ( 22 var (
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e") 23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e") 24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
25 ) 25 )
26 26
27 //////////////////////////////// taskQueueData ///////////////////////////////// 27 //////////////////////////////// taskQueueData /////////////////////////////////
28 28
29 type taskQueueData struct { 29 type taskQueueData struct {
(...skipping 13 matching lines...) Expand all
43 return &taskQueueData{ 43 return &taskQueueData{
44 BrokenFeatures: wrapper.BrokenFeatures{ 44 BrokenFeatures: wrapper.BrokenFeatures{
45 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)}, 45 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)},
46 named: wrapper.QueueData{"default": {}}, 46 named: wrapper.QueueData{"default": {}},
47 archived: wrapper.QueueData{"default": {}}, 47 archived: wrapper.QueueData{"default": {}},
48 } 48 }
49 } 49 }
50 50
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } 51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
52 func (t *taskQueueData) endTxn() {} 52 func (t *taskQueueData) endTxn() {}
53 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) { 53 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
54 txn := obj.(*txnTaskQueueData) 54 txn := obj.(*txnTaskQueueData)
55 for qn, tasks := range txn.anony { 55 for qn, tasks := range txn.anony {
56 for _, tsk := range tasks { 56 for _, tsk := range tasks {
57 » » » tsk.Name = mkName(rnd, tsk.Name, t.named[qn]) 57 » » » tsk.Name = mkName(c, tsk.Name, t.named[qn])
58 t.named[qn][tsk.Name] = tsk 58 t.named[qn][tsk.Name] = tsk
59 } 59 }
60 } 60 }
61 txn.anony = nil 61 txn.anony = nil
62 } 62 }
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) { 63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
64 return &txnTaskQueueData{ 64 return &txnTaskQueueData{
65 BrokenFeatures: &t.BrokenFeatures, 65 BrokenFeatures: &t.BrokenFeatures,
66 parent: t, 66 parent: t,
67 anony: wrapper.AnonymousQueueData{}, 67 anony: wrapper.AnonymousQueueData{},
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 func (t *taskQueueData) getQueueName(queueName string) (string, error) { 114 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
115 if queueName == "" { 115 if queueName == "" {
116 queueName = "default" 116 queueName = "default"
117 } 117 }
118 if _, ok := t.named[queueName]; !ok { 118 if _, ok := t.named[queueName]; !ok {
119 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) 119 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
120 } 120 }
121 return queueName, nil 121 return queueName, nil
122 } 122 }
123 123
124 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) { 124 func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T ask, queueName string) (
125 » *taskqueue.Task, string, error) {
125 queueName, err := t.getQueueName(queueName) 126 queueName, err := t.getQueueName(queueName)
126 if err != nil { 127 if err != nil {
127 return nil, "", err 128 return nil, "", err
128 } 129 }
129 130
130 toSched := dupTask(task) 131 toSched := dupTask(task)
131 132
132 if toSched.Path == "" { 133 if toSched.Path == "" {
133 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) 134 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
134 } 135 }
135 136
136 if toSched.ETA.IsZero() { 137 if toSched.ETA.IsZero() {
137 » » toSched.ETA = now.Add(toSched.Delay) 138 » » toSched.ETA = clock.Now(c).Add(toSched.Delay)
138 } else if toSched.Delay != 0 { 139 } else if toSched.Delay != 0 {
139 panic("taskqueue: both Delay and ETA are set") 140 panic("taskqueue: both Delay and ETA are set")
140 } 141 }
141 toSched.Delay = 0 142 toSched.Delay = 0
142 143
143 if toSched.Method == "" { 144 if toSched.Method == "" {
144 toSched.Method = "POST" 145 toSched.Method = "POST"
145 } 146 }
146 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { 147 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
147 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) 148 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
148 } 149 }
149 if toSched.Method != "POST" && toSched.Method != "PUT" { 150 if toSched.Method != "POST" && toSched.Method != "PUT" {
150 toSched.Payload = nil 151 toSched.Payload = nil
151 } 152 }
152 153
153 if _, ok := toSched.Header[currentNamespace]; !ok { 154 if _, ok := toSched.Header[currentNamespace]; !ok {
154 if ns != "" { 155 if ns != "" {
155 if toSched.Header == nil { 156 if toSched.Header == nil {
156 toSched.Header = http.Header{} 157 toSched.Header = http.Header{}
157 } 158 }
158 toSched.Header[currentNamespace] = []string{ns} 159 toSched.Header[currentNamespace] = []string{ns}
159 } 160 }
160 } 161 }
161 // TODO(riannucci): implement DefaultNamespace 162 // TODO(riannucci): implement DefaultNamespace
162 163
163 if toSched.Name == "" { 164 if toSched.Name == "" {
164 » » toSched.Name = mkName(rnd, "", t.named[queueName]) 165 » » toSched.Name = mkName(c, "", t.named[queueName])
165 } else { 166 } else {
166 if !validTaskName.MatchString(toSched.Name) { 167 if !validTaskName.MatchString(toSched.Name) {
167 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME) 168 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
168 } 169 }
169 } 170 }
170 171
171 return toSched, queueName, nil 172 return toSched, queueName, nil
172 } 173 }
173 174
174 /////////////////////////////// txnTaskQueueData /////////////////////////////// 175 /////////////////////////////// txnTaskQueueData ///////////////////////////////
175 176
176 type txnTaskQueueData struct { 177 type txnTaskQueueData struct {
177 *wrapper.BrokenFeatures 178 *wrapper.BrokenFeatures
178 179
179 lock sync.Mutex 180 lock sync.Mutex
180 181
181 // boolean 0 or 1, use atomic.*Int32 to access. 182 // boolean 0 or 1, use atomic.*Int32 to access.
182 closed int32 183 closed int32
183 anony wrapper.AnonymousQueueData 184 anony wrapper.AnonymousQueueData
184 parent *taskQueueData 185 parent *taskQueueData
185 } 186 }
186 187
187 var ( 188 var (
188 _ = memContextObj((*txnTaskQueueData)(nil)) 189 _ = memContextObj((*txnTaskQueueData)(nil))
189 _ = wrapper.TQTestable((*txnTaskQueueData)(nil)) 190 _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
190 ) 191 )
191 192
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } 193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
193 194
194 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) { 195 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
195 panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) 196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
196 } 197 }
197 198
198 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { 199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
199 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") 200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
200 } 201 }
201 202
202 func (t *txnTaskQueueData) endTxn() { 203 func (t *txnTaskQueueData) endTxn() {
203 if atomic.LoadInt32(&t.closed) == 1 { 204 if atomic.LoadInt32(&t.closed) == 1 {
204 panic("cannot end transaction twice") 205 panic("cannot end transaction twice")
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
255 return t.parent.GetTombstonedTasks() 256 return t.parent.GetTombstonedTasks()
256 } 257 }
257 258
258 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { 259 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
259 return t.parent.GetScheduledTasks() 260 return t.parent.GetScheduledTasks()
260 } 261 }
261 262
262 func (t *txnTaskQueueData) CreateQueue(queueName string) { 263 func (t *txnTaskQueueData) CreateQueue(queueName string) {
263 t.parent.CreateQueue(queueName) 264 t.parent.CreateQueue(queueName)
264 } 265 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/taskqueue.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698