| Index: impl/prod/taskqueue.go
 | 
| diff --git a/impl/prod/taskqueue.go b/impl/prod/taskqueue.go
 | 
| index 815db3a734145680ba995038c483442aa57f604a..ce0f443dfb528cef53fa566de815775f899f5b65 100644
 | 
| --- a/impl/prod/taskqueue.go
 | 
| +++ b/impl/prod/taskqueue.go
 | 
| @@ -9,15 +9,15 @@ import (
 | 
|  	"reflect"
 | 
|  
 | 
|  	tq "github.com/luci/gae/service/taskqueue"
 | 
| -	"github.com/luci/luci-go/common/errors"
 | 
|  	"golang.org/x/net/context"
 | 
| +	"google.golang.org/appengine"
 | 
|  	"google.golang.org/appengine/taskqueue"
 | 
|  )
 | 
|  
 | 
|  // useTQ adds a gae.TaskQueue implementation to context, accessible
 | 
|  // by gae.GetTQ(c)
 | 
|  func useTQ(c context.Context) context.Context {
 | 
| -	return tq.SetFactory(c, func(ci context.Context) tq.Interface {
 | 
| +	return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface {
 | 
|  		return tqImpl{ci}
 | 
|  	})
 | 
|  }
 | 
| @@ -43,11 +43,10 @@ func init() {
 | 
|  	}
 | 
|  }
 | 
|  
 | 
| -// tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a
 | 
| -// *tq.Task, and passes through an error.
 | 
| -func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
 | 
| -	if err != nil {
 | 
| -		return nil, err
 | 
| +// tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task.
 | 
| +func tqR2F(o *taskqueue.Task) *tq.Task {
 | 
| +	if o == nil {
 | 
| +		return nil
 | 
|  	}
 | 
|  	n := tq.Task{}
 | 
|  	n.Path = o.Path
 | 
| @@ -59,7 +58,7 @@ func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
 | 
|  	n.ETA = o.ETA
 | 
|  	n.RetryCount = o.RetryCount
 | 
|  	n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions)
 | 
| -	return &n, nil
 | 
| +	return &n
 | 
|  }
 | 
|  
 | 
|  // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task.
 | 
| @@ -77,19 +76,6 @@ func tqF2R(n *tq.Task) *taskqueue.Task {
 | 
|  	return &o
 | 
|  }
 | 
|  
 | 
| -// tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
 | 
| -// *taskqueue.Task to a slice of *tq.Task
 | 
| -func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) {
 | 
| -	if err != nil {
 | 
| -		return nil, errors.Fix(err)
 | 
| -	}
 | 
| -	ret := make([]*tq.Task, len(os))
 | 
| -	for i, t := range os {
 | 
| -		ret[i], _ = tqR2FErr(t, nil)
 | 
| -	}
 | 
| -	return ret, nil
 | 
| -}
 | 
| -
 | 
|  // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
 | 
|  func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
 | 
|  	ret := make([]*taskqueue.Task, len(ns))
 | 
| @@ -99,52 +85,49 @@ func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
 | 
|  	return ret
 | 
|  }
 | 
|  
 | 
| -//////// TQSingleReadWriter
 | 
| -
 | 
| -func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
 | 
| -	return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName))
 | 
| -}
 | 
| -func (t tqImpl) Delete(task *tq.Task, queueName string) error {
 | 
| -	return taskqueue.Delete(t.Context, tqF2R(task), queueName)
 | 
| -}
 | 
| -
 | 
| -//////// TQMultiReadWriter
 | 
| -
 | 
| -func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
 | 
| -	return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName))
 | 
| -}
 | 
| -func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
 | 
| -	return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName))
 | 
| +func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
 | 
| +	realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
 | 
| +	if err != nil {
 | 
| +		if me, ok := err.(appengine.MultiError); ok {
 | 
| +			for i, err := range me {
 | 
| +				tsk := (*taskqueue.Task)(nil)
 | 
| +				if realTasks != nil {
 | 
| +					tsk = realTasks[i]
 | 
| +				}
 | 
| +				cb(tqR2F(tsk), err)
 | 
| +			}
 | 
| +			err = nil
 | 
| +		}
 | 
| +	} else {
 | 
| +		for _, tsk := range realTasks {
 | 
| +			cb(tqR2F(tsk), nil)
 | 
| +		}
 | 
| +	}
 | 
| +	return err
 | 
|  }
 | 
|  
 | 
| -//////// TQLeaser
 | 
| -
 | 
| -func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task, error) {
 | 
| -	return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTime))
 | 
| -}
 | 
| -func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*tq.Task, error) {
 | 
| -	return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, leaseTime, tag))
 | 
| -}
 | 
| -func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) error {
 | 
| -	return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTime)
 | 
| +func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
 | 
| +	err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)
 | 
| +	if me, ok := err.(appengine.MultiError); ok {
 | 
| +		for _, err := range me {
 | 
| +			cb(err)
 | 
| +		}
 | 
| +		err = nil
 | 
| +	}
 | 
| +	return err
 | 
|  }
 | 
|  
 | 
| -//////// TQPurger
 | 
| -
 | 
|  func (t tqImpl) Purge(queueName string) error {
 | 
|  	return taskqueue.Purge(t.Context, queueName)
 | 
|  }
 | 
|  
 | 
| -//////// TQStatter
 | 
| -
 | 
| -func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) {
 | 
| +func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
 | 
|  	stats, err := taskqueue.QueueStats(t.Context, queueNames)
 | 
|  	if err != nil {
 | 
| -		return nil, err
 | 
| +		return err
 | 
|  	}
 | 
| -	ret := make([]tq.Statistics, len(stats))
 | 
| -	for i, s := range stats {
 | 
| -		ret[i] = tq.Statistics(s)
 | 
| +	for _, s := range stats {
 | 
| +		cb((*tq.Statistics)(&s), nil)
 | 
|  	}
 | 
| -	return ret, nil
 | 
| +	return nil
 | 
|  }
 | 
| 
 |