OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "fmt" | 9 "fmt" |
10 "net/http" | 10 "net/http" |
11 "sync" | 11 "sync" |
12 "sync/atomic" | 12 "sync/atomic" |
13 | 13 |
14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
15 | 15 |
16 ds "github.com/luci/gae/service/datastore" | 16 ds "github.com/luci/gae/service/datastore" |
| 17 "github.com/luci/gae/service/info" |
17 tq "github.com/luci/gae/service/taskqueue" | 18 tq "github.com/luci/gae/service/taskqueue" |
| 19 |
18 "github.com/luci/luci-go/common/clock" | 20 "github.com/luci/luci-go/common/clock" |
19 ) | 21 ) |
20 | 22 |
21 var ( | 23 var ( |
22 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") | 24 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") |
23 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") | 25 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") |
24 ) | 26 ) |
25 | 27 |
26 //////////////////////////////// taskQueueData ///////////////////////////////// | 28 //////////////////////////////// taskQueueData ///////////////////////////////// |
27 | 29 |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
120 queueName, err := t.getQueueNameLocked(queueName) | 122 queueName, err := t.getQueueNameLocked(queueName) |
121 if err != nil { | 123 if err != nil { |
122 return err | 124 return err |
123 } | 125 } |
124 | 126 |
125 t.named[queueName] = map[string]*tq.Task{} | 127 t.named[queueName] = map[string]*tq.Task{} |
126 t.archived[queueName] = map[string]*tq.Task{} | 128 t.archived[queueName] = map[string]*tq.Task{} |
127 return nil | 129 return nil |
128 } | 130 } |
129 | 131 |
130 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
eueName string) (*tq.Task, error) { | 132 func (t *taskQueueData) prepTask(c context.Context, task *tq.Task, queueName str
ing) (*tq.Task, error) { |
131 toSched := task.Duplicate() | 133 toSched := task.Duplicate() |
132 | 134 |
133 if toSched.Path == "" { | 135 if toSched.Path == "" { |
134 toSched.Path = "/_ah/queue/" + queueName | 136 toSched.Path = "/_ah/queue/" + queueName |
135 } | 137 } |
136 | 138 |
137 if toSched.ETA.IsZero() { | 139 if toSched.ETA.IsZero() { |
138 toSched.ETA = clock.Now(c).Add(toSched.Delay) | 140 toSched.ETA = clock.Now(c).Add(toSched.Delay) |
139 } else if toSched.Delay != 0 { | 141 } else if toSched.Delay != 0 { |
140 panic("taskqueue: both Delay and ETA are set") | 142 panic("taskqueue: both Delay and ETA are set") |
(...skipping 10 matching lines...) Expand all Loading... |
151 | 153 |
152 // Methods that can not have payloads. | 154 // Methods that can not have payloads. |
153 case "GET", "HEAD", "DELETE": | 155 case "GET", "HEAD", "DELETE": |
154 toSched.Payload = nil | 156 toSched.Payload = nil |
155 | 157 |
156 default: | 158 default: |
157 return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho
d) | 159 return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho
d) |
158 } | 160 } |
159 | 161 |
160 if _, ok := toSched.Header[currentNamespace]; !ok { | 162 if _, ok := toSched.Header[currentNamespace]; !ok { |
161 » » if ns != "" { | 163 » » if ns := info.GetNamespace(c); ns != "" { |
162 if toSched.Header == nil { | 164 if toSched.Header == nil { |
163 toSched.Header = http.Header{} | 165 toSched.Header = http.Header{} |
164 } | 166 } |
165 toSched.Header[currentNamespace] = []string{ns} | 167 toSched.Header[currentNamespace] = []string{ns} |
166 } | 168 } |
167 } | 169 } |
168 // TODO(riannucci): implement DefaultNamespace | 170 // TODO(riannucci): implement DefaultNamespace |
169 | 171 |
170 if toSched.Name == "" { | 172 if toSched.Name == "" { |
171 toSched.Name = mkName(c, "", t.named[queueName]) | 173 toSched.Name = mkName(c, "", t.named[queueName]) |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
250 return t.parent.GetTombstonedTasks() | 252 return t.parent.GetTombstonedTasks() |
251 } | 253 } |
252 | 254 |
253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { | 255 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { |
254 return t.parent.GetScheduledTasks() | 256 return t.parent.GetScheduledTasks() |
255 } | 257 } |
256 | 258 |
257 func (t *txnTaskQueueData) CreateQueue(queueName string) { | 259 func (t *txnTaskQueueData) CreateQueue(queueName string) { |
258 t.parent.CreateQueue(queueName) | 260 t.parent.CreateQueue(queueName) |
259 } | 261 } |
OLD | NEW |