| Index: go/src/infra/gae/libs/gae/memory/taskqueue_data.go
|
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go
|
| similarity index 69%
|
| rename from go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| rename to go/src/infra/gae/libs/gae/memory/taskqueue_data.go
|
| index 2d23b1d803ee3959a480d823cbd86364a53be40a..3e96e4fa7891a1162b9918edf8c8f85cdc3456d8 100644
|
| --- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| +++ b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go
|
| @@ -7,16 +7,14 @@ package memory
|
| import (
|
| "errors"
|
| "fmt"
|
| - "infra/gae/libs/wrapper"
|
| + "golang.org/x/net/context"
|
| "net/http"
|
| "sync"
|
| "sync/atomic"
|
|
|
| - "appengine/datastore"
|
| - "appengine/taskqueue"
|
| - pb "appengine_internal/taskqueue"
|
| - "golang.org/x/net/context"
|
| - "infra/libs/clock"
|
| + "infra/gae/libs/gae"
|
| +
|
| + "github.com/luci/luci-go/common/clock"
|
| )
|
|
|
| var (
|
| @@ -28,23 +26,23 @@ var (
|
|
|
| type taskQueueData struct {
|
| sync.Mutex
|
| - wrapper.BrokenFeatures
|
| + gae.BrokenFeatures
|
|
|
| - named wrapper.QueueData
|
| - archived wrapper.QueueData
|
| + named gae.QueueData
|
| + archived gae.QueueData
|
| }
|
|
|
| var (
|
| _ = memContextObj((*taskQueueData)(nil))
|
| - _ = wrapper.TQTestable((*taskQueueData)(nil))
|
| + _ = gae.TQTestable((*taskQueueData)(nil))
|
| )
|
|
|
| func newTaskQueueData() memContextObj {
|
| return &taskQueueData{
|
| - BrokenFeatures: wrapper.BrokenFeatures{
|
| - DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)},
|
| - named: wrapper.QueueData{"default": {}},
|
| - archived: wrapper.QueueData{"default": {}},
|
| + BrokenFeatures: gae.BrokenFeatures{
|
| + DefaultError: errors.New("TRANSIENT_ERROR")},
|
| + named: gae.QueueData{"default": {}},
|
| + archived: gae.QueueData{"default": {}},
|
| }
|
| }
|
|
|
| @@ -60,15 +58,15 @@ func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
|
| }
|
| txn.anony = nil
|
| }
|
| -func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
|
| +func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
|
| return &txnTaskQueueData{
|
| BrokenFeatures: &t.BrokenFeatures,
|
| parent: t,
|
| - anony: wrapper.AnonymousQueueData{},
|
| + anony: gae.AnonymousQueueData{},
|
| }, nil
|
| }
|
|
|
| -func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
|
| +func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
|
| return nil
|
| }
|
|
|
| @@ -79,18 +77,18 @@ func (t *taskQueueData) CreateQueue(queueName string) {
|
| if _, ok := t.named[queueName]; ok {
|
| panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
|
| }
|
| - t.named[queueName] = map[string]*taskqueue.Task{}
|
| - t.archived[queueName] = map[string]*taskqueue.Task{}
|
| + t.named[queueName] = map[string]*gae.TQTask{}
|
| + t.archived[queueName] = map[string]*gae.TQTask{}
|
| }
|
|
|
| -func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
|
| +func (t *taskQueueData) GetScheduledTasks() gae.QueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| return dupQueue(t.named)
|
| }
|
|
|
| -func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
|
| +func (t *taskQueueData) GetTombstonedTasks() gae.QueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| @@ -98,9 +96,9 @@ func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
|
| }
|
|
|
| func (t *taskQueueData) resetTasksWithLock() {
|
| - for queuename := range t.named {
|
| - t.named[queuename] = map[string]*taskqueue.Task{}
|
| - t.archived[queuename] = map[string]*taskqueue.Task{}
|
| + for queueName := range t.named {
|
| + t.named[queueName] = map[string]*gae.TQTask{}
|
| + t.archived[queueName] = map[string]*gae.TQTask{}
|
| }
|
| }
|
|
|
| @@ -116,13 +114,20 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) {
|
| queueName = "default"
|
| }
|
| if _, ok := t.named[queueName]; !ok {
|
| - return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
|
| + return "", errors.New("UNKNOWN_QUEUE")
|
| }
|
| return queueName, nil
|
| }
|
|
|
| -func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) (
|
| - *taskqueue.Task, string, error) {
|
| +var tqOkMethods = map[string]struct{}{
|
| + "GET": {},
|
| + "POST": {},
|
| + "HEAD": {},
|
| + "PUT": {},
|
| + "DELETE": {},
|
| +}
|
| +
|
| +func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) {
|
| queueName, err := t.getQueueName(queueName)
|
| if err != nil {
|
| return nil, "", err
|
| @@ -131,7 +136,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
|
| toSched := dupTask(task)
|
|
|
| if toSched.Path == "" {
|
| - return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
|
| + return nil, "", errors.New("INVALID_URL")
|
| }
|
|
|
| if toSched.ETA.IsZero() {
|
| @@ -144,7 +149,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
|
| if toSched.Method == "" {
|
| toSched.Method = "POST"
|
| }
|
| - if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
|
| + if _, ok := tqOkMethods[toSched.Method]; !ok {
|
| return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
|
| }
|
| if toSched.Method != "POST" && toSched.Method != "PUT" {
|
| @@ -165,7 +170,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
|
| toSched.Name = mkName(c, "", t.named[queueName])
|
| } else {
|
| if !validTaskName.MatchString(toSched.Name) {
|
| - return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
|
| + return nil, "", errors.New("INVALID_TASK_NAME")
|
| }
|
| }
|
|
|
| @@ -175,19 +180,19 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
|
| /////////////////////////////// txnTaskQueueData ///////////////////////////////
|
|
|
| type txnTaskQueueData struct {
|
| - *wrapper.BrokenFeatures
|
| + *gae.BrokenFeatures
|
|
|
| lock sync.Mutex
|
|
|
| // boolean 0 or 1, use atomic.*Int32 to access.
|
| closed int32
|
| - anony wrapper.AnonymousQueueData
|
| + anony gae.AnonymousQueueData
|
| parent *taskQueueData
|
| }
|
|
|
| var (
|
| _ = memContextObj((*txnTaskQueueData)(nil))
|
| - _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
|
| + _ = gae.TQTestable((*txnTaskQueueData)(nil))
|
| )
|
|
|
| func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
|
| @@ -196,7 +201,7 @@ func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
|
| panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
|
| }
|
|
|
| -func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
|
| +func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
|
| return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
|
| }
|
|
|
| @@ -207,13 +212,13 @@ func (t *txnTaskQueueData) endTxn() {
|
| atomic.StoreInt32(&t.closed, 1)
|
| }
|
|
|
| -func (t *txnTaskQueueData) IsBroken() error {
|
| +func (t *txnTaskQueueData) RunIfNotBroken(f func() error) error {
|
| // Slightly different from the SDK... datastore and taskqueue each implement
|
| // this here, where in the SDK only datastore.transaction.Call does.
|
| if atomic.LoadInt32(&t.closed) == 1 {
|
| return fmt.Errorf("taskqueue: transaction context has expired")
|
| }
|
| - return t.parent.IsBroken()
|
| + return t.parent.RunIfNotBroken(f)
|
| }
|
|
|
| func (t *txnTaskQueueData) ResetTasks() {
|
| @@ -235,13 +240,13 @@ func (t *txnTaskQueueData) Unlock() {
|
| t.lock.Unlock()
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
|
| +func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| - ret := make(wrapper.AnonymousQueueData, len(t.anony))
|
| + ret := make(gae.AnonymousQueueData, len(t.anony))
|
| for k, vs := range t.anony {
|
| - ret[k] = make([]*taskqueue.Task, len(vs))
|
| + ret[k] = make([]*gae.TQTask, len(vs))
|
| for i, v := range vs {
|
| tsk := dupTask(v)
|
| tsk.Name = ""
|
| @@ -252,11 +257,11 @@ func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
|
| return ret
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
|
| +func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData {
|
| return t.parent.GetTombstonedTasks()
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
|
| +func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData {
|
| return t.parent.GetScheduledTasks()
|
| }
|
|
|
|
|