Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
index 81a2978fe504f0074c04023dc413da9bd3fe0552..2d23b1d803ee3959a480d823cbd86364a53be40a 100644 |
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
+++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
@@ -8,15 +8,15 @@ import ( |
"errors" |
"fmt" |
"infra/gae/libs/wrapper" |
- "math/rand" |
"net/http" |
"sync" |
"sync/atomic" |
- "time" |
"appengine/datastore" |
"appengine/taskqueue" |
pb "appengine_internal/taskqueue" |
+ "golang.org/x/net/context" |
+ "infra/libs/clock" |
) |
var ( |
@@ -50,11 +50,11 @@ func newTaskQueueData() memContextObj { |
func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } |
func (t *taskQueueData) endTxn() {} |
-func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) { |
+func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { |
txn := obj.(*txnTaskQueueData) |
for qn, tasks := range txn.anony { |
for _, tsk := range tasks { |
- tsk.Name = mkName(rnd, tsk.Name, t.named[qn]) |
+ tsk.Name = mkName(c, tsk.Name, t.named[qn]) |
t.named[qn][tsk.Name] = tsk |
} |
} |
@@ -121,7 +121,8 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
return queueName, nil |
} |
-func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName string, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) { |
+func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) ( |
+ *taskqueue.Task, string, error) { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return nil, "", err |
@@ -134,7 +135,7 @@ func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri |
} |
if toSched.ETA.IsZero() { |
- toSched.ETA = now.Add(toSched.Delay) |
+ toSched.ETA = clock.Now(c).Add(toSched.Delay) |
} else if toSched.Delay != 0 { |
panic("taskqueue: both Delay and ETA are set") |
} |
@@ -161,7 +162,7 @@ func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri |
// TODO(riannucci): implement DefaultNamespace |
if toSched.Name == "" { |
- toSched.Name = mkName(rnd, "", t.named[queueName]) |
+ toSched.Name = mkName(c, "", t.named[queueName]) |
} else { |
if !validTaskName.MatchString(toSched.Name) { |
return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME) |
@@ -191,7 +192,7 @@ var ( |
func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
-func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) { |
+func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { |
panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) |
} |