| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "net/http" | 10 "net/http" |
| 11 "sync" | 11 "sync" |
| 12 "sync/atomic" | 12 "sync/atomic" |
| 13 | 13 |
| 14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 15 | 15 |
| 16 ds "github.com/luci/gae/service/datastore" | 16 ds "github.com/luci/gae/service/datastore" |
| 17 "github.com/luci/gae/service/info" |
| 17 tq "github.com/luci/gae/service/taskqueue" | 18 tq "github.com/luci/gae/service/taskqueue" |
| 19 |
| 18 "github.com/luci/luci-go/common/clock" | 20 "github.com/luci/luci-go/common/clock" |
| 19 ) | 21 ) |
| 20 | 22 |
| 21 var ( | 23 var ( |
| 22 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") | 24 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") |
| 23 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") | 25 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") |
| 24 ) | 26 ) |
| 25 | 27 |
| 26 //////////////////////////////// taskQueueData ///////////////////////////////// | 28 //////////////////////////////// taskQueueData ///////////////////////////////// |
| 27 | 29 |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 120 queueName, err := t.getQueueNameLocked(queueName) | 122 queueName, err := t.getQueueNameLocked(queueName) |
| 121 if err != nil { | 123 if err != nil { |
| 122 return err | 124 return err |
| 123 } | 125 } |
| 124 | 126 |
| 125 t.named[queueName] = map[string]*tq.Task{} | 127 t.named[queueName] = map[string]*tq.Task{} |
| 126 t.archived[queueName] = map[string]*tq.Task{} | 128 t.archived[queueName] = map[string]*tq.Task{} |
| 127 return nil | 129 return nil |
| 128 } | 130 } |
| 129 | 131 |
| 130 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
eueName string) (*tq.Task, error) { | 132 func (t *taskQueueData) prepTask(c context.Context, task *tq.Task, queueName str
ing) (*tq.Task, error) { |
| 131 toSched := task.Duplicate() | 133 toSched := task.Duplicate() |
| 132 | 134 |
| 133 if toSched.Path == "" { | 135 if toSched.Path == "" { |
| 134 toSched.Path = "/_ah/queue/" + queueName | 136 toSched.Path = "/_ah/queue/" + queueName |
| 135 } | 137 } |
| 136 | 138 |
| 137 if toSched.ETA.IsZero() { | 139 if toSched.ETA.IsZero() { |
| 138 toSched.ETA = clock.Now(c).Add(toSched.Delay) | 140 toSched.ETA = clock.Now(c).Add(toSched.Delay) |
| 139 } else if toSched.Delay != 0 { | 141 } else if toSched.Delay != 0 { |
| 140 panic("taskqueue: both Delay and ETA are set") | 142 panic("taskqueue: both Delay and ETA are set") |
| (...skipping 10 matching lines...) Expand all Loading... |
| 151 | 153 |
| 152 // Methods that can not have payloads. | 154 // Methods that can not have payloads. |
| 153 case "GET", "HEAD", "DELETE": | 155 case "GET", "HEAD", "DELETE": |
| 154 toSched.Payload = nil | 156 toSched.Payload = nil |
| 155 | 157 |
| 156 default: | 158 default: |
| 157 return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho
d) | 159 return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho
d) |
| 158 } | 160 } |
| 159 | 161 |
| 160 if _, ok := toSched.Header[currentNamespace]; !ok { | 162 if _, ok := toSched.Header[currentNamespace]; !ok { |
| 161 » » if ns != "" { | 163 » » if ns := info.GetNamespace(c); ns != "" { |
| 162 if toSched.Header == nil { | 164 if toSched.Header == nil { |
| 163 toSched.Header = http.Header{} | 165 toSched.Header = http.Header{} |
| 164 } | 166 } |
| 165 toSched.Header[currentNamespace] = []string{ns} | 167 toSched.Header[currentNamespace] = []string{ns} |
| 166 } | 168 } |
| 167 } | 169 } |
| 168 // TODO(riannucci): implement DefaultNamespace | 170 // TODO(riannucci): implement DefaultNamespace |
| 169 | 171 |
| 170 if toSched.Name == "" { | 172 if toSched.Name == "" { |
| 171 toSched.Name = mkName(c, "", t.named[queueName]) | 173 toSched.Name = mkName(c, "", t.named[queueName]) |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 250 return t.parent.GetTombstonedTasks() | 252 return t.parent.GetTombstonedTasks() |
| 251 } | 253 } |
| 252 | 254 |
| 253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { | 255 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { |
| 254 return t.parent.GetScheduledTasks() | 256 return t.parent.GetScheduledTasks() |
| 255 } | 257 } |
| 256 | 258 |
| 257 func (t *txnTaskQueueData) CreateQueue(queueName string) { | 259 func (t *txnTaskQueueData) CreateQueue(queueName string) { |
| 258 t.parent.CreateQueue(queueName) | 260 t.parent.CreateQueue(queueName) |
| 259 } | 261 } |
| OLD | NEW |