Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
deleted file mode 100644 |
index 2d23b1d803ee3959a480d823cbd86364a53be40a..0000000000000000000000000000000000000000 |
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
+++ /dev/null |
@@ -1,265 +0,0 @@ |
-// Copyright 2015 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-package memory |
- |
-import ( |
- "errors" |
- "fmt" |
- "infra/gae/libs/wrapper" |
- "net/http" |
- "sync" |
- "sync/atomic" |
- |
- "appengine/datastore" |
- "appengine/taskqueue" |
- pb "appengine_internal/taskqueue" |
- "golang.org/x/net/context" |
- "infra/libs/clock" |
-) |
- |
-var ( |
- currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") |
- defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace") |
-) |
- |
-//////////////////////////////// taskQueueData ///////////////////////////////// |
- |
-type taskQueueData struct { |
- sync.Mutex |
- wrapper.BrokenFeatures |
- |
- named wrapper.QueueData |
- archived wrapper.QueueData |
-} |
- |
-var ( |
- _ = memContextObj((*taskQueueData)(nil)) |
- _ = wrapper.TQTestable((*taskQueueData)(nil)) |
-) |
- |
-func newTaskQueueData() memContextObj { |
- return &taskQueueData{ |
- BrokenFeatures: wrapper.BrokenFeatures{ |
- DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)}, |
- named: wrapper.QueueData{"default": {}}, |
- archived: wrapper.QueueData{"default": {}}, |
- } |
-} |
- |
-func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } |
-func (t *taskQueueData) endTxn() {} |
-func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { |
- txn := obj.(*txnTaskQueueData) |
- for qn, tasks := range txn.anony { |
- for _, tsk := range tasks { |
- tsk.Name = mkName(c, tsk.Name, t.named[qn]) |
- t.named[qn][tsk.Name] = tsk |
- } |
- } |
- txn.anony = nil |
-} |
-func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
- return &txnTaskQueueData{ |
- BrokenFeatures: &t.BrokenFeatures, |
- parent: t, |
- anony: wrapper.AnonymousQueueData{}, |
- }, nil |
-} |
- |
-func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
- return nil |
-} |
- |
-func (t *taskQueueData) CreateQueue(queueName string) { |
- t.Lock() |
- defer t.Unlock() |
- |
- if _, ok := t.named[queueName]; ok { |
- panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName)) |
- } |
- t.named[queueName] = map[string]*taskqueue.Task{} |
- t.archived[queueName] = map[string]*taskqueue.Task{} |
-} |
- |
-func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { |
- t.Lock() |
- defer t.Unlock() |
- |
- return dupQueue(t.named) |
-} |
- |
-func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { |
- t.Lock() |
- defer t.Unlock() |
- |
- return dupQueue(t.archived) |
-} |
- |
-func (t *taskQueueData) resetTasksWithLock() { |
- for queuename := range t.named { |
- t.named[queuename] = map[string]*taskqueue.Task{} |
- t.archived[queuename] = map[string]*taskqueue.Task{} |
- } |
-} |
- |
-func (t *taskQueueData) ResetTasks() { |
- t.Lock() |
- defer t.Unlock() |
- |
- t.resetTasksWithLock() |
-} |
- |
-func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
- if queueName == "" { |
- queueName = "default" |
- } |
- if _, ok := t.named[queueName]; !ok { |
- return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) |
- } |
- return queueName, nil |
-} |
- |
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) ( |
- *taskqueue.Task, string, error) { |
- queueName, err := t.getQueueName(queueName) |
- if err != nil { |
- return nil, "", err |
- } |
- |
- toSched := dupTask(task) |
- |
- if toSched.Path == "" { |
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) |
- } |
- |
- if toSched.ETA.IsZero() { |
- toSched.ETA = clock.Now(c).Add(toSched.Delay) |
- } else if toSched.Delay != 0 { |
- panic("taskqueue: both Delay and ETA are set") |
- } |
- toSched.Delay = 0 |
- |
- if toSched.Method == "" { |
- toSched.Method = "POST" |
- } |
- if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { |
- return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method) |
- } |
- if toSched.Method != "POST" && toSched.Method != "PUT" { |
- toSched.Payload = nil |
- } |
- |
- if _, ok := toSched.Header[currentNamespace]; !ok { |
- if ns != "" { |
- if toSched.Header == nil { |
- toSched.Header = http.Header{} |
- } |
- toSched.Header[currentNamespace] = []string{ns} |
- } |
- } |
- // TODO(riannucci): implement DefaultNamespace |
- |
- if toSched.Name == "" { |
- toSched.Name = mkName(c, "", t.named[queueName]) |
- } else { |
- if !validTaskName.MatchString(toSched.Name) { |
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME) |
- } |
- } |
- |
- return toSched, queueName, nil |
-} |
- |
-/////////////////////////////// txnTaskQueueData /////////////////////////////// |
- |
-type txnTaskQueueData struct { |
- *wrapper.BrokenFeatures |
- |
- lock sync.Mutex |
- |
- // boolean 0 or 1, use atomic.*Int32 to access. |
- closed int32 |
- anony wrapper.AnonymousQueueData |
- parent *taskQueueData |
-} |
- |
-var ( |
- _ = memContextObj((*txnTaskQueueData)(nil)) |
- _ = wrapper.TQTestable((*txnTaskQueueData)(nil)) |
-) |
- |
-func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
- |
-func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { |
- panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) |
-} |
- |
-func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
- return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") |
-} |
- |
-func (t *txnTaskQueueData) endTxn() { |
- if atomic.LoadInt32(&t.closed) == 1 { |
- panic("cannot end transaction twice") |
- } |
- atomic.StoreInt32(&t.closed, 1) |
-} |
- |
-func (t *txnTaskQueueData) IsBroken() error { |
- // Slightly different from the SDK... datastore and taskqueue each implement |
- // this here, where in the SDK only datastore.transaction.Call does. |
- if atomic.LoadInt32(&t.closed) == 1 { |
- return fmt.Errorf("taskqueue: transaction context has expired") |
- } |
- return t.parent.IsBroken() |
-} |
- |
-func (t *txnTaskQueueData) ResetTasks() { |
- t.Lock() |
- defer t.Unlock() |
- |
- for queuename := range t.anony { |
- t.anony[queuename] = nil |
- } |
- t.parent.resetTasksWithLock() |
-} |
- |
-func (t *txnTaskQueueData) Lock() { |
- t.lock.Lock() |
- t.parent.Lock() |
-} |
-func (t *txnTaskQueueData) Unlock() { |
- t.parent.Unlock() |
- t.lock.Unlock() |
-} |
- |
-func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
- t.Lock() |
- defer t.Unlock() |
- |
- ret := make(wrapper.AnonymousQueueData, len(t.anony)) |
- for k, vs := range t.anony { |
- ret[k] = make([]*taskqueue.Task, len(vs)) |
- for i, v := range vs { |
- tsk := dupTask(v) |
- tsk.Name = "" |
- ret[k][i] = tsk |
- } |
- } |
- |
- return ret |
-} |
- |
-func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { |
- return t.parent.GetTombstonedTasks() |
-} |
- |
-func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { |
- return t.parent.GetScheduledTasks() |
-} |
- |
-func (t *txnTaskQueueData) CreateQueue(queueName string) { |
- t.parent.CreateQueue(queueName) |
-} |