Index: go/src/infra/gae/libs/gae/memory/taskqueue.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go b/go/src/infra/gae/libs/gae/memory/taskqueue.go |
similarity index 53% |
rename from go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
rename to go/src/infra/gae/libs/gae/memory/taskqueue.go |
index 4d05752a7235fd0e8ae12166ee712628c91e646e..b0e04b201d5b73a179d3df15f8cdf7e7538ee835 100644 |
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
+++ b/go/src/infra/gae/libs/gae/memory/taskqueue.go |
@@ -5,33 +5,29 @@ |
package memory |
import ( |
+ "errors" |
"fmt" |
- "infra/gae/libs/wrapper" |
"net/http" |
"regexp" |
"golang.org/x/net/context" |
- "appengine" |
- "appengine/taskqueue" |
- "appengine_internal" |
- dbpb "appengine_internal/datastore" |
- pb "appengine_internal/taskqueue" |
+ "infra/gae/libs/gae" |
) |
/////////////////////////////// public functions /////////////////////////////// |
func useTQ(c context.Context) context.Context { |
- return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue { |
+ return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { |
tqd := cur(ic).Get(memContextTQIdx) |
var ret interface { |
- wrapper.TQTestable |
- wrapper.TaskQueue |
+ gae.TQTestable |
+ gae.TaskQueue |
} |
switch x := tqd.(type) { |
case *taskQueueData: |
ret = &taskqueueImpl{ |
- wrapper.DummyTQ(), |
+ gae.DummyTQ(), |
x, |
ic, |
curGID(ic).namespace, |
@@ -39,7 +35,7 @@ func useTQ(c context.Context) context.Context { |
case *txnTaskQueueData: |
ret = &taskqueueTxnImpl{ |
- wrapper.DummyTQ(), |
+ gae.DummyTQ(), |
x, |
ic, |
curGID(ic).namespace, |
@@ -55,7 +51,7 @@ func useTQ(c context.Context) context.Context { |
//////////////////////////////// taskqueueImpl ///////////////////////////////// |
type taskqueueImpl struct { |
- wrapper.TaskQueue |
+ gae.TaskQueue |
*taskQueueData |
ctx context.Context |
@@ -63,11 +59,11 @@ type taskqueueImpl struct { |
} |
var ( |
- _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
- _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
+ _ = gae.TaskQueue((*taskqueueImpl)(nil)) |
+ _ = gae.TQTestable((*taskqueueImpl)(nil)) |
) |
-func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
+func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -75,9 +71,9 @@ func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task |
if _, ok := t.archived[queueName][toSched.Name]; ok { |
// SDK converts TOMBSTONE -> already added too |
- return nil, taskqueue.ErrTaskAlreadyAdded |
+ return nil, gae.ErrTQTaskAlreadyAdded |
} else if _, ok := t.named[queueName][toSched.Name]; ok { |
- return nil, taskqueue.ErrTaskAlreadyAdded |
+ return nil, gae.ErrTQTaskAlreadyAdded |
} else { |
t.named[queueName][toSched.Name] = toSched |
} |
@@ -85,29 +81,28 @@ func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task |
return dupTask(toSched), nil |
} |
-func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
- if err := t.IsBroken(); err != nil { |
- return nil, err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
- |
- return t.addLocked(task, queueName) |
+func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
+ err = t.RunIfNotBroken(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTask, err = t.addLocked(task, queueName) |
+ return |
+ }) |
+ return |
} |
-func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error { |
+func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return err |
} |
if _, ok := t.archived[queueName][task.Name]; ok { |
- return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
+ return errors.New("TOMBSTONED_TASK") |
} |
if _, ok := t.named[queueName][task.Name]; !ok { |
- return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
+ return errors.New("UNKNOWN_TASK") |
} |
t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
@@ -116,47 +111,41 @@ func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err |
return nil |
} |
-func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
- if err := t.IsBroken(); err != nil { |
- return err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
- |
- return t.deleteLocked(task, queueName) |
+func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { |
+ return t.RunIfNotBroken(func() error { |
+ t.Lock() |
+ defer t.Unlock() |
+ return t.deleteLocked(task, queueName) |
+ }) |
} |
-func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
- if err := t.IsBroken(); err != nil { |
- return nil, err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
- |
- return multi(tasks, queueName, t.addLocked) |
+func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
+ err = t.RunIfNotBroken(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTasks, err = multi(tasks, queueName, t.addLocked) |
+ return |
+ }) |
+ return |
} |
-func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error { |
- if err := t.IsBroken(); err != nil { |
- return err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
+func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { |
+ return t.RunIfNotBroken(func() error { |
+ t.Lock() |
+ defer t.Unlock() |
- _, err := multi(tasks, queueName, |
- func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
- return nil, t.deleteLocked(tsk, qn) |
- }) |
- return err |
+ _, err := multi(tasks, queueName, |
+ func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { |
+ return nil, t.deleteLocked(tsk, qn) |
+ }) |
+ return err |
+ }) |
} |
/////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
type taskqueueTxnImpl struct { |
- wrapper.TaskQueue |
+ gae.TaskQueue |
*txnTaskQueueData |
ctx context.Context |
@@ -164,11 +153,11 @@ type taskqueueTxnImpl struct { |
} |
var ( |
- _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
- _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
+ _ = gae.TaskQueue((*taskqueueTxnImpl)(nil)) |
+ _ = gae.TQTestable((*taskqueueTxnImpl)(nil)) |
) |
-func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
+func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -183,7 +172,7 @@ func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t |
// ride on the datastore. The current datastore implementation only allows |
// a maximum of 5 Actions per transaction, and more than that result in a |
// BAD_REQUEST. |
- return nil, newDSError(dbpb.Error_BAD_REQUEST) |
+ return nil, errors.New("BAD_REQUEST") |
} |
t.anony[queueName] = append(t.anony[queueName], toSched) |
@@ -200,26 +189,24 @@ func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t |
return toRet, nil |
} |
-func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
- if err := t.IsBroken(); err != nil { |
- return nil, err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
- |
- return t.addLocked(task, queueName) |
+func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
+ err = t.RunIfNotBroken(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTask, err = t.addLocked(task, queueName) |
+ return |
+ }) |
+ return |
} |
-func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
- if err := t.IsBroken(); err != nil { |
- return nil, err |
- } |
- |
- t.Lock() |
- defer t.Unlock() |
- |
- return multi(tasks, queueName, t.addLocked) |
+func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
+ err = t.RunIfNotBroken(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTasks, err = multi(tasks, queueName, t.addLocked) |
+ return |
+ }) |
+ return |
} |
////////////////////////////// private functions /////////////////////////////// |
@@ -228,12 +215,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_" |
-func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) string { |
+func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string { |
_, ok := queue[cur] |
for !ok && cur == "" { |
name := [500]byte{} |
for i := 0; i < 500; i++ { |
- name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len(validTaskChars))] |
+ name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))] |
} |
cur = string(name[:]) |
_, ok = queue[cur] |
@@ -241,13 +228,9 @@ func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str |
return cur |
} |
-func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError { |
- return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)} |
-} |
- |
-func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
- ret := []*taskqueue.Task(nil) |
- me := appengine.MultiError(nil) |
+func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) { |
+ ret := []*gae.TQTask(nil) |
+ me := gae.MultiError(nil) |
foundErr := false |
for _, task := range tasks { |
rt, err := f(task, queueName) |
@@ -263,8 +246,8 @@ func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st |
return ret, me |
} |
-func dupTask(t *taskqueue.Task) *taskqueue.Task { |
- ret := &taskqueue.Task{} |
+func dupTask(t *gae.TQTask) *gae.TQTask { |
+ ret := &gae.TQTask{} |
*ret = *t |
if t.Header != nil { |
@@ -282,17 +265,17 @@ func dupTask(t *taskqueue.Task) *taskqueue.Task { |
} |
if t.RetryOptions != nil { |
- ret.RetryOptions = &taskqueue.RetryOptions{} |
+ ret.RetryOptions = &gae.TQRetryOptions{} |
*ret.RetryOptions = *t.RetryOptions |
} |
return ret |
} |
-func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
- r := make(wrapper.QueueData, len(q)) |
+func dupQueue(q gae.QueueData) gae.QueueData { |
+ r := make(gae.QueueData, len(q)) |
for k, q := range q { |
- r[k] = make(map[string]*taskqueue.Task, len(q)) |
+ r[k] = make(map[string]*gae.TQTask, len(q)) |
for tn, t := range q { |
r[k][tn] = dupTask(t) |
} |