Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package taskqueue | 5 package taskqueue |
| 6 | 6 |
| 7 // Interface is the full interface to the Task Queue service. | 7 import ( |
| 8 type Interface interface { | 8 » "github.com/luci/luci-go/common/errors" |
|
dnj
2016/09/01 15:25:41
Interface now gone, *Multi collapsed into vararg.
| |
| 9 » Add(task *Task, queueName string) error | |
| 10 » Delete(task *Task, queueName string) error | |
| 11 | 9 |
| 12 » AddMulti(tasks []*Task, queueName string) error | 10 » "golang.org/x/net/context" |
| 13 » DeleteMulti(tasks []*Task, queueName string) error | 11 ) |
| 14 | 12 |
| 15 » // NOTE(riannucci): No support for pull taskqueues. We're not planning o n | 13 // Add adds the specified task(s) to the specified task queue. |
| 16 » // making pull-queue clients which RUN in appengine (e.g. they'd all be | 14 // |
| 17 » // external REST consumers). If someone needs this, it will need to be a dded | 15 // If only one task is provided its error will be returned directly. If more |
| 18 » // here and in RawInterface. The theory is that a good lease API might l ook | 16 // than one task is provided, an errors.MultiError will be returned in the |
| 19 » // like: | 17 // event of an error, with a given error index corresponding to the error |
| 20 » // | 18 // encountered when processing the task at that index. |
| 21 » // func Lease(queueName, tag string, batchSize int, duration time.Time , cb func(*Task, error<-)) | 19 func Add(c context.Context, queueName string, tasks ...*Task) error { |
| 22 » // | 20 » lme := errors.NewLazyMultiError(len(tasks)) |
| 23 » // Which blocks and calls cb for each task obtained. Lease would then do all | 21 » i := 0 |
| 24 » // necessary backoff negotiation with the backend. The callback could ex ecute | 22 » err := Raw(c).AddMulti(tasks, queueName, func(t *Task, err error) { |
| 25 » // synchronously (stuffing an error into the chan or panicing if it fail s), or | 23 » » if !lme.Assign(i, err) { |
| 26 » // asynchronously (dispatching a goroutine which will then populate the error | 24 » » » *tasks[i] = *t |
| 27 » // channel if needed). If it operates asynchronously, it has the option of | 25 » » } |
| 28 » // processing multiple work items at a time. | 26 » » i++ |
| 29 » // | 27 » }) |
| 30 » // Lease would also take care of calling ModifyLease as necessary to ens ure | 28 » if err == nil { |
| 31 » // that each call to cb would have 'duration' amount of time to work on the | 29 » » err = lme.Get() |
| 32 » // task, as well as releasing as many leased tasks as it can on a failur e. | 30 » » if len(tasks) == 1 { |
| 31 » » » err = errors.SingleError(err) | |
| 32 » » } | |
| 33 » } | |
| 34 » return err | |
| 35 } | |
| 33 | 36 |
| 34 » Purge(queueName string) error | 37 // Delete deletes a task from the task queue. |
| 38 // | |
| 39 // If only one task is provided its error will be returned directly. If more | |
| 40 // than one task is provided, an errors.MultiError will be returned in the | |
| 41 // event of an error, with a given error index corresponding to the error | |
| 42 // encountered when processing the task at that index. | |
| 43 func Delete(c context.Context, queueName string, tasks ...*Task) error { | |
| 44 » lme := errors.NewLazyMultiError(len(tasks)) | |
| 45 » i := 0 | |
| 46 » err := Raw(c).DeleteMulti(tasks, queueName, func(err error) { | |
| 47 » » lme.Assign(i, err) | |
| 48 » » i++ | |
| 49 » }) | |
| 50 » if err == nil { | |
| 51 » » err = lme.Get() | |
| 52 » » if len(tasks) == 1 { | |
| 53 » » » err = errors.SingleError(err) | |
| 54 » » } | |
| 55 » } | |
| 56 » return err | |
| 57 } | |
| 35 | 58 |
| 36 » Stats(queueNames ...string) ([]Statistics, error) | 59 // NOTE(riannucci): No support for pull taskqueues. We're not planning on |
| 60 // making pull-queue clients which RUN in appengine (e.g. they'd all be | |
| 61 // external REST consumers). If someone needs this, it will need to be added | |
| 62 // here and in RawInterface. The theory is that a good lease API might look | |
| 63 // like: | |
| 64 // | |
| 65 // func Lease(queueName, tag string, batchSize int, duration time.Time, cb fun c(*Task, error<-)) | |
| 66 // | |
| 67 // Which blocks and calls cb for each task obtained. Lease would then do all | |
| 68 // necessary backoff negotiation with the backend. The callback could execute | |
| 69 // synchronously (stuffing an error into the chan or panicing if it fails), or | |
| 70 // asynchronously (dispatching a goroutine which will then populate the error | |
| 71 // channel if needed). If it operates asynchronously, it has the option of | |
| 72 // processing multiple work items at a time. | |
| 73 // | |
| 74 // Lease would also take care of calling ModifyLease as necessary to ensure | |
| 75 // that each call to cb would have 'duration' amount of time to work on the | |
| 76 // task, as well as releasing as many leased tasks as it can on a failure. | |
| 37 | 77 |
| 38 » Testable() Testable | 78 // Purge purges all tasks form the named queue. |
| 79 func Purge(c context.Context, queueName string) error { | |
| 80 » return Raw(c).Purge(queueName) | |
| 81 } | |
| 39 | 82 |
| 40 » Raw() RawInterface | 83 // Stats returns Statistics instances for each of the named task queues. |
| 84 // | |
| 85 // If only one task is provided its error will be returned directly. If more | |
| 86 // than one task is provided, an errors.MultiError will be returned in the | |
| 87 // event of an error, with a given error index corresponding to the error | |
| 88 // encountered when processing the task at that index. | |
| 89 func Stats(c context.Context, queueNames ...string) ([]Statistics, error) { | |
| 90 » ret := make([]Statistics, len(queueNames)) | |
| 91 » lme := errors.NewLazyMultiError(len(queueNames)) | |
| 92 » i := 0 | |
| 93 » err := Raw(c).Stats(queueNames, func(s *Statistics, err error) { | |
| 94 » » if !lme.Assign(i, err) { | |
| 95 » » » ret[i] = *s | |
| 96 » » } | |
| 97 » » i++ | |
| 98 » }) | |
| 99 » if err == nil { | |
| 100 » » err = lme.Get() | |
| 101 » » if len(queueNames) == 1 { | |
| 102 » » » err = errors.SingleError(err) | |
| 103 » » } | |
| 104 » } | |
| 105 » return ret, err | |
| 41 } | 106 } |
| 107 | |
| 108 // GetTestable returns a Testable for the current task queue service in c, or | |
| 109 // nil if it does not offer one. | |
| 110 func GetTestable(c context.Context) Testable { | |
| 111 return Raw(c).GetTestable() | |
| 112 } | |
| OLD | NEW |