| Index: impl/memory/taskqueue.go
 | 
| diff --git a/impl/memory/taskqueue.go b/impl/memory/taskqueue.go
 | 
| index 7b09af1a6602857a1836155a73c180395ae12ece..2af0f5a7117f4345322c298b1d7ddad19003c98d 100644
 | 
| --- a/impl/memory/taskqueue.go
 | 
| +++ b/impl/memory/taskqueue.go
 | 
| @@ -5,12 +5,11 @@
 | 
|  package memory
 | 
|  
 | 
|  import (
 | 
| -	"net/http"
 | 
|  	"regexp"
 | 
| +	"sync/atomic"
 | 
|  
 | 
|  	"golang.org/x/net/context"
 | 
|  
 | 
| -	"github.com/luci/gae/impl/dummy"
 | 
|  	tq "github.com/luci/gae/service/taskqueue"
 | 
|  	"github.com/luci/luci-go/common/errors"
 | 
|  	"github.com/luci/luci-go/common/mathrand"
 | 
| @@ -19,18 +18,16 @@ import (
 | 
|  /////////////////////////////// public functions ///////////////////////////////
 | 
|  
 | 
|  func useTQ(c context.Context) context.Context {
 | 
| -	return tq.SetFactory(c, func(ic context.Context) tq.Interface {
 | 
| +	return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
 | 
|  		tqd := cur(ic).Get(memContextTQIdx)
 | 
|  		if x, ok := tqd.(*taskQueueData); ok {
 | 
|  			return &taskqueueImpl{
 | 
| -				dummy.TaskQueue(),
 | 
|  				x,
 | 
|  				ic,
 | 
|  				curGID(ic).namespace,
 | 
|  			}
 | 
|  		}
 | 
|  		return &taskqueueTxnImpl{
 | 
| -			dummy.TaskQueue(),
 | 
|  			tqd.(*txnTaskQueueData),
 | 
|  			ic,
 | 
|  			curGID(ic).namespace,
 | 
| @@ -41,7 +38,6 @@ func useTQ(c context.Context) context.Context {
 | 
|  //////////////////////////////// taskqueueImpl /////////////////////////////////
 | 
|  
 | 
|  type taskqueueImpl struct {
 | 
| -	tq.Interface
 | 
|  	*taskQueueData
 | 
|  
 | 
|  	ctx context.Context
 | 
| @@ -49,12 +45,12 @@ type taskqueueImpl struct {
 | 
|  }
 | 
|  
 | 
|  var (
 | 
| -	_ = tq.Interface((*taskqueueImpl)(nil))
 | 
| +	_ = tq.RawInterface((*taskqueueImpl)(nil))
 | 
|  	_ = tq.Testable((*taskqueueImpl)(nil))
 | 
|  )
 | 
|  
 | 
|  func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
 | 
| -	toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
 | 
| +	toSched, err := t.prepTask(t.ctx, t.ns, task, queueName)
 | 
|  	if err != nil {
 | 
|  		return nil, err
 | 
|  	}
 | 
| @@ -68,21 +64,10 @@ func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
 | 
|  		t.named[queueName][toSched.Name] = toSched
 | 
|  	}
 | 
|  
 | 
| -	return dupTask(toSched), nil
 | 
| -}
 | 
| -
 | 
| -func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -	return t.addLocked(task, queueName)
 | 
| +	return toSched.Duplicate(), nil
 | 
|  }
 | 
|  
 | 
|  func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
 | 
| -	queueName, err := t.getQueueName(queueName)
 | 
| -	if err != nil {
 | 
| -		return err
 | 
| -	}
 | 
| -
 | 
|  	if _, ok := t.archived[queueName][task.Name]; ok {
 | 
|  		return errors.New("TOMBSTONED_TASK")
 | 
|  	}
 | 
| @@ -97,33 +82,72 @@ func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
 | 
|  	return nil
 | 
|  }
 | 
|  
 | 
| -func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error {
 | 
| +func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
 | 
|  	t.Lock()
 | 
|  	defer t.Unlock()
 | 
| -	return t.deleteLocked(task, queueName)
 | 
| +
 | 
| +	queueName, err := t.getQueueNameLocked(queueName)
 | 
| +	if err != nil {
 | 
| +		return err
 | 
| +	}
 | 
| +
 | 
| +	for _, task := range tasks {
 | 
| +		cb(t.addLocked(task, queueName))
 | 
| +	}
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
| -func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
 | 
| +func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
 | 
|  	t.Lock()
 | 
|  	defer t.Unlock()
 | 
| -	return multi(tasks, queueName, t.addLocked)
 | 
| +
 | 
| +	queueName, err := t.getQueueNameLocked(queueName)
 | 
| +	if err != nil {
 | 
| +		return err
 | 
| +	}
 | 
| +
 | 
| +	for _, task := range tasks {
 | 
| +		cb(t.deleteLocked(task, queueName))
 | 
| +	}
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
| -func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
 | 
| +func (t *taskqueueImpl) Purge(queueName string) error {
 | 
|  	t.Lock()
 | 
|  	defer t.Unlock()
 | 
|  
 | 
| -	_, err := multi(tasks, queueName,
 | 
| -		func(tsk *tq.Task, qn string) (*tq.Task, error) {
 | 
| -			return nil, t.deleteLocked(tsk, qn)
 | 
| -		})
 | 
| -	return err
 | 
| +	return t.purgeLocked(queueName)
 | 
| +}
 | 
| +
 | 
| +func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
 | 
| +	t.Lock()
 | 
| +	defer t.Unlock()
 | 
| +
 | 
| +	for _, qn := range queueNames {
 | 
| +		qn, err := t.getQueueNameLocked(qn)
 | 
| +		if err != nil {
 | 
| +			cb(nil, err)
 | 
| +		} else {
 | 
| +			s := tq.Statistics{
 | 
| +				Tasks: len(t.named[qn]),
 | 
| +			}
 | 
| +			for _, t := range t.named[qn] {
 | 
| +				if s.OldestETA.IsZero() {
 | 
| +					s.OldestETA = t.ETA
 | 
| +				} else if t.ETA.Before(s.OldestETA) {
 | 
| +					s.OldestETA = t.ETA
 | 
| +				}
 | 
| +			}
 | 
| +			cb(&s, nil)
 | 
| +		}
 | 
| +	}
 | 
| +
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
|  /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
 | 
|  
 | 
|  type taskqueueTxnImpl struct {
 | 
| -	tq.Interface
 | 
|  	*txnTaskQueueData
 | 
|  
 | 
|  	ctx context.Context
 | 
| @@ -131,12 +155,12 @@ type taskqueueTxnImpl struct {
 | 
|  }
 | 
|  
 | 
|  var _ interface {
 | 
| -	tq.Interface
 | 
| +	tq.RawInterface
 | 
|  	tq.Testable
 | 
|  } = (*taskqueueTxnImpl)(nil)
 | 
|  
 | 
|  func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
 | 
| -	toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
 | 
| +	toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
 | 
|  	if err != nil {
 | 
|  		return nil, err
 | 
|  	}
 | 
| @@ -161,30 +185,41 @@ func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
 | 
|  	//		We should verify that the .Name for a task added in a transaction is
 | 
|  	//		meaningless. Maybe names generated in a transaction are somehow
 | 
|  	//		guaranteed to be meaningful?
 | 
| -	toRet := dupTask(toSched)
 | 
| +	toRet := toSched.Duplicate()
 | 
|  	toRet.Name = ""
 | 
|  
 | 
|  	return toRet, nil
 | 
|  }
 | 
|  
 | 
| -func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) {
 | 
| -	err = t.run(func() (err error) {
 | 
| -		t.Lock()
 | 
| -		defer t.Unlock()
 | 
| -		retTask, err = t.addLocked(task, queueName)
 | 
| -		return
 | 
| -	})
 | 
| -	return
 | 
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
 | 
| +	if atomic.LoadInt32(&t.closed) == 1 {
 | 
| +		return errors.New("taskqueue: transaction context has expired")
 | 
| +	}
 | 
| +
 | 
| +	t.Lock()
 | 
| +	defer t.Unlock()
 | 
| +
 | 
| +	queueName, err := t.parent.getQueueNameLocked(queueName)
 | 
| +	if err != nil {
 | 
| +		return err
 | 
| +	}
 | 
| +
 | 
| +	for _, task := range tasks {
 | 
| +		cb(t.addLocked(task, queueName))
 | 
| +	}
 | 
| +	return nil
 | 
|  }
 | 
|  
 | 
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) {
 | 
| -	err = t.run(func() (err error) {
 | 
| -		t.Lock()
 | 
| -		defer t.Unlock()
 | 
| -		retTasks, err = multi(tasks, queueName, t.addLocked)
 | 
| -		return
 | 
| -	})
 | 
| -	return
 | 
| +func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
 | 
| +	return errors.New("taskqueue: cannot DeleteMulti from a transaction")
 | 
| +}
 | 
| +
 | 
| +func (t *taskqueueTxnImpl) Purge(string) error {
 | 
| +	return errors.New("taskqueue: cannot Purge from a transaction")
 | 
| +}
 | 
| +
 | 
| +func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
 | 
| +	return errors.New("taskqueue: cannot Stats from a transaction")
 | 
|  }
 | 
|  
 | 
|  ////////////////////////////// private functions ///////////////////////////////
 | 
| @@ -206,49 +241,12 @@ func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
 | 
|  	return cur
 | 
|  }
 | 
|  
 | 
| -func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) {
 | 
| -	ret := []*tq.Task(nil)
 | 
| -	lme := errors.LazyMultiError{Size: len(tasks)}
 | 
| -	for i, task := range tasks {
 | 
| -		rt, err := f(task, queueName)
 | 
| -		ret = append(ret, rt)
 | 
| -		lme.Assign(i, err)
 | 
| -	}
 | 
| -	return ret, lme.Get()
 | 
| -}
 | 
| -
 | 
| -func dupTask(t *tq.Task) *tq.Task {
 | 
| -	ret := &tq.Task{}
 | 
| -	*ret = *t
 | 
| -
 | 
| -	if t.Header != nil {
 | 
| -		ret.Header = make(http.Header, len(t.Header))
 | 
| -		for k, vs := range t.Header {
 | 
| -			newVs := make([]string, len(vs))
 | 
| -			copy(newVs, vs)
 | 
| -			ret.Header[k] = newVs
 | 
| -		}
 | 
| -	}
 | 
| -
 | 
| -	if t.Payload != nil {
 | 
| -		ret.Payload = make([]byte, len(t.Payload))
 | 
| -		copy(ret.Payload, t.Payload)
 | 
| -	}
 | 
| -
 | 
| -	if t.RetryOptions != nil {
 | 
| -		ret.RetryOptions = &tq.RetryOptions{}
 | 
| -		*ret.RetryOptions = *t.RetryOptions
 | 
| -	}
 | 
| -
 | 
| -	return ret
 | 
| -}
 | 
| -
 | 
|  func dupQueue(q tq.QueueData) tq.QueueData {
 | 
|  	r := make(tq.QueueData, len(q))
 | 
|  	for k, q := range q {
 | 
|  		r[k] = make(map[string]*tq.Task, len(q))
 | 
|  		for tn, t := range q {
 | 
| -			r[k][tn] = dupTask(t)
 | 
| +			r[k][tn] = t.Duplicate()
 | 
|  		}
 | 
|  	}
 | 
|  	return r
 | 
| 
 |