OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package taskqueue | 5 package taskqueue |
6 | 6 |
7 // Interface is the full interface to the Task Queue service. | 7 import ( |
8 type Interface interface { | 8 » "github.com/luci/luci-go/common/errors" |
dnj
2016/09/01 15:25:41
Interface now gone, *Multi collapsed into vararg.
| |
9 » Add(task *Task, queueName string) error | |
10 » Delete(task *Task, queueName string) error | |
11 | 9 |
12 » AddMulti(tasks []*Task, queueName string) error | 10 » "golang.org/x/net/context" |
13 » DeleteMulti(tasks []*Task, queueName string) error | 11 ) |
14 | 12 |
15 » // NOTE(riannucci): No support for pull taskqueues. We're not planning o n | 13 // Add adds the specified task(s) to the specified task queue. |
16 » // making pull-queue clients which RUN in appengine (e.g. they'd all be | 14 // |
17 » // external REST consumers). If someone needs this, it will need to be a dded | 15 // If only one task is provided its error will be returned directly. If more |
18 » // here and in RawInterface. The theory is that a good lease API might l ook | 16 // than one task is provided, an errors.MultiError will be returned in the |
19 » // like: | 17 // event of an error, with a given error index corresponding to the error |
20 » // | 18 // encountered when processing the task at that index. |
21 » // func Lease(queueName, tag string, batchSize int, duration time.Time , cb func(*Task, error<-)) | 19 func Add(c context.Context, queueName string, tasks ...*Task) error { |
22 » // | 20 » lme := errors.NewLazyMultiError(len(tasks)) |
23 » // Which blocks and calls cb for each task obtained. Lease would then do all | 21 » i := 0 |
24 » // necessary backoff negotiation with the backend. The callback could ex ecute | 22 » err := Raw(c).AddMulti(tasks, queueName, func(t *Task, err error) { |
25 » // synchronously (stuffing an error into the chan or panicing if it fail s), or | 23 » » if !lme.Assign(i, err) { |
26 » // asynchronously (dispatching a goroutine which will then populate the error | 24 » » » *tasks[i] = *t |
27 » // channel if needed). If it operates asynchronously, it has the option of | 25 » » } |
28 » // processing multiple work items at a time. | 26 » » i++ |
29 » // | 27 » }) |
30 » // Lease would also take care of calling ModifyLease as necessary to ens ure | 28 » if err == nil { |
31 » // that each call to cb would have 'duration' amount of time to work on the | 29 » » err = lme.Get() |
32 » // task, as well as releasing as many leased tasks as it can on a failur e. | 30 » » if len(tasks) == 1 { |
31 » » » err = errors.SingleError(err) | |
32 » » } | |
33 » } | |
34 » return err | |
35 } | |
33 | 36 |
34 » Purge(queueName string) error | 37 // Delete deletes a task from the task queue. |
38 // | |
39 // If only one task is provided its error will be returned directly. If more | |
40 // than one task is provided, an errors.MultiError will be returned in the | |
41 // event of an error, with a given error index corresponding to the error | |
42 // encountered when processing the task at that index. | |
43 func Delete(c context.Context, queueName string, tasks ...*Task) error { | |
44 » lme := errors.NewLazyMultiError(len(tasks)) | |
45 » i := 0 | |
46 » err := Raw(c).DeleteMulti(tasks, queueName, func(err error) { | |
47 » » lme.Assign(i, err) | |
48 » » i++ | |
49 » }) | |
50 » if err == nil { | |
51 » » err = lme.Get() | |
52 » » if len(tasks) == 1 { | |
53 » » » err = errors.SingleError(err) | |
54 » » } | |
55 » } | |
56 » return err | |
57 } | |
35 | 58 |
36 » Stats(queueNames ...string) ([]Statistics, error) | 59 // NOTE(riannucci): No support for pull taskqueues. We're not planning on |
60 // making pull-queue clients which RUN in appengine (e.g. they'd all be | |
61 // external REST consumers). If someone needs this, it will need to be added | |
62 // here and in RawInterface. The theory is that a good lease API might look | |
63 // like: | |
64 // | |
65 // func Lease(queueName, tag string, batchSize int, duration time.Time, cb fun c(*Task, error<-)) | |
66 // | |
67 // Which blocks and calls cb for each task obtained. Lease would then do all | |
68 // necessary backoff negotiation with the backend. The callback could execute | |
69 // synchronously (stuffing an error into the chan or panicing if it fails), or | |
70 // asynchronously (dispatching a goroutine which will then populate the error | |
71 // channel if needed). If it operates asynchronously, it has the option of | |
72 // processing multiple work items at a time. | |
73 // | |
74 // Lease would also take care of calling ModifyLease as necessary to ensure | |
75 // that each call to cb would have 'duration' amount of time to work on the | |
76 // task, as well as releasing as many leased tasks as it can on a failure. | |
37 | 77 |
38 » Testable() Testable | 78 // Purge purges all tasks form the named queue. |
79 func Purge(c context.Context, queueName string) error { | |
80 » return Raw(c).Purge(queueName) | |
81 } | |
39 | 82 |
40 » Raw() RawInterface | 83 // Stats returns Statistics instances for each of the named task queues. |
84 // | |
85 // If only one task is provided its error will be returned directly. If more | |
86 // than one task is provided, an errors.MultiError will be returned in the | |
87 // event of an error, with a given error index corresponding to the error | |
88 // encountered when processing the task at that index. | |
89 func Stats(c context.Context, queueNames ...string) ([]Statistics, error) { | |
90 » ret := make([]Statistics, len(queueNames)) | |
91 » lme := errors.NewLazyMultiError(len(queueNames)) | |
92 » i := 0 | |
93 » err := Raw(c).Stats(queueNames, func(s *Statistics, err error) { | |
94 » » if !lme.Assign(i, err) { | |
95 » » » ret[i] = *s | |
96 » » } | |
97 » » i++ | |
98 » }) | |
99 » if err == nil { | |
100 » » err = lme.Get() | |
101 » » if len(queueNames) == 1 { | |
102 » » » err = errors.SingleError(err) | |
103 » » } | |
104 » } | |
105 » return ret, err | |
41 } | 106 } |
107 | |
108 // GetTestable returns a Testable for the current task queue service in c, or | |
109 // nil if it does not offer one. | |
110 func GetTestable(c context.Context) Testable { | |
111 return Raw(c).GetTestable() | |
112 } | |
OLD | NEW |