Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(720)

Side by Side Diff: sync/internal_api/public/attachments/task_queue.h

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
7
8 #include <stddef.h>
9
10 #include <deque>
11 #include <memory>
12 #include <set>
13 #include <utility>
14
15 #include "base/bind.h"
16 #include "base/callback.h"
17 #include "base/macros.h"
18 #include "base/memory/weak_ptr.h"
19 #include "base/threading/non_thread_safe.h"
20 #include "base/threading/thread_task_runner_handle.h"
21 #include "base/time/time.h"
22 #include "base/timer/timer.h"
23 #include "net/base/backoff_entry.h"
24
25 namespace syncer {
26
27 // A queue that dispatches tasks, ignores duplicates, and provides backoff
28 // semantics.
29 //
30 // |T| is the task type.
31 //
32 // For each task added to the queue, the HandleTaskCallback will eventually be
33 // invoked. For each invocation, the user of TaskQueue must call exactly one of
34 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
35 //
36 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
37 //
38 // Example usage:
39 //
40 // void Handle(const Foo& foo);
41 // ...
42 // TaskQueue<Foo> queue(base::Bind(&Handle),
43 // base::TimeDelta::FromSeconds(1),
44 // base::TimeDelta::FromMinutes(1));
45 // ...
46 // {
47 // Foo foo;
48 // // Add foo to the queue. At some point, Handle will be invoked in this
49 // // message loop.
50 // queue.AddToQueue(foo);
51 // }
52 // ...
53 // void Handle(const Foo& foo) {
54 // DoSomethingWith(foo);
55 // // We must call one of the three methods to tell the queue how we're
56 // // dealing with foo. Of course, we are free to call in the the context of
57 // // this HandleTaskCallback or outside the context if we so choose.
58 // if (SuccessfullyHandled(foo)) {
59 // queue.MarkAsSucceeded(foo);
60 // } else if (Failed(foo)) {
61 // queue.MarkAsFailed(foo);
62 // if (ShouldRetry(foo)) {
63 // queue.AddToQueue(foo);
64 // }
65 // } else {
66 // Cancel(foo);
67 // }
68 // }
69 //
70 template <typename T>
71 class TaskQueue : base::NonThreadSafe {
72 public:
73 // A callback provided by users of the TaskQueue to handle tasks.
74 //
75 // This callback is invoked by the queue with a task to be handled. The
76 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
77 // or |Cancel| to signify completion of the task.
78 typedef base::Callback<void(const T&)> HandleTaskCallback;
79
80 // Construct a TaskQueue.
81 //
82 // |callback| the callback to be invoked for handling tasks.
83 //
84 // |initial_backoff_delay| the initial amount of time the queue will wait
85 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
86 // zero. Subsequent failures will increase the delay up to
87 // |max_backoff_delay|.
88 //
89 // |max_backoff_delay| the maximum amount of time the queue will wait before
90 // dispatching tasks. May be zero. Must be greater than or equal to
91 // |initial_backoff_delay|.
92 TaskQueue(const HandleTaskCallback& callback,
93 const base::TimeDelta& initial_backoff_delay,
94 const base::TimeDelta& max_backoff_delay);
95
96 // Add |task| to the end of the queue.
97 //
98 // If |task| is already present (as determined by operator==) it is not added.
99 void AddToQueue(const T& task);
100
101 // Mark |task| as completing successfully.
102 //
103 // Marking a task as completing successfully will reduce or eliminate any
104 // backoff delay in effect.
105 //
106 // May only be called after the HandleTaskCallback has been invoked with
107 // |task|.
108 void MarkAsSucceeded(const T& task);
109
110 // Mark |task| as failed.
111 //
112 // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
113 // of subsequent tasks. Repeated failures will increase the delay.
114 //
115 // May only be called after the HandleTaskCallback has been invoked with
116 // |task|.
117 void MarkAsFailed(const T& task);
118
119 // Cancel |task|.
120 //
121 // |task| is removed from the queue and will not be retried. Does not affect
122 // the backoff delay.
123 //
124 // May only be called after the HandleTaskCallback has been invoked with
125 // |task|.
126 void Cancel(const T& task);
127
128 // Reset any backoff delay and resume dispatching of tasks.
129 //
130 // Useful for when you know the cause of previous failures has been resolved
131 // and you want don't want to wait for the accumulated backoff delay to
132 // elapse.
133 void ResetBackoff();
134
135 // Use |timer| for scheduled events.
136 //
137 // Used in tests. See also MockTimer.
138 void SetTimerForTest(std::unique_ptr<base::Timer> timer);
139
140 private:
141 void FinishTask(const T& task);
142 void ScheduleDispatch();
143 void Dispatch();
144 // Return true if we should dispatch tasks.
145 bool ShouldDispatch();
146
147 const HandleTaskCallback process_callback_;
148 net::BackoffEntry::Policy backoff_policy_;
149 std::unique_ptr<net::BackoffEntry> backoff_entry_;
150 // The number of tasks currently being handled.
151 int num_in_progress_;
152 std::deque<T> queue_;
153 // The set of tasks in queue_ or currently being handled.
154 std::set<T> tasks_;
155 base::Closure dispatch_closure_;
156 std::unique_ptr<base::Timer> backoff_timer_;
157 base::TimeDelta delay_;
158
159 // Must be last data member.
160 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
161
162 DISALLOW_COPY_AND_ASSIGN(TaskQueue);
163 };
164
165 // The maximum number of tasks that may be concurrently executed. Think
166 // carefully before changing this value. The desired behavior of backoff may
167 // not be obvious when there is more than one concurrent task
168 const int kMaxConcurrentTasks = 1;
169
170 template <typename T>
171 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
172 const base::TimeDelta& initial_backoff_delay,
173 const base::TimeDelta& max_backoff_delay)
174 : process_callback_(callback),
175 backoff_policy_({}),
176 num_in_progress_(0),
177 weak_ptr_factory_(this) {
178 DCHECK_LE(initial_backoff_delay.InMicroseconds(),
179 max_backoff_delay.InMicroseconds());
180 backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
181 backoff_policy_.multiply_factor = 2.0;
182 backoff_policy_.jitter_factor = 0.1;
183 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
184 backoff_policy_.entry_lifetime_ms = -1;
185 backoff_policy_.always_use_initial_delay = false;
186 backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
187 dispatch_closure_ =
188 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
189 backoff_timer_.reset(new base::Timer(false, false));
190 }
191
192 template <typename T>
193 void TaskQueue<T>::AddToQueue(const T& task) {
194 DCHECK(CalledOnValidThread());
195 // Ignore duplicates.
196 if (tasks_.find(task) == tasks_.end()) {
197 queue_.push_back(task);
198 tasks_.insert(task);
199 }
200 ScheduleDispatch();
201 }
202
203 template <typename T>
204 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
205 DCHECK(CalledOnValidThread());
206 FinishTask(task);
207 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
208 // reschedule a dispatch.
209 backoff_timer_->Stop();
210 backoff_entry_->Reset();
211 ScheduleDispatch();
212 }
213
214 template <typename T>
215 void TaskQueue<T>::MarkAsFailed(const T& task) {
216 DCHECK(CalledOnValidThread());
217 FinishTask(task);
218 backoff_entry_->InformOfRequest(false);
219 ScheduleDispatch();
220 }
221
222 template <typename T>
223 void TaskQueue<T>::Cancel(const T& task) {
224 DCHECK(CalledOnValidThread());
225 FinishTask(task);
226 ScheduleDispatch();
227 }
228
229 template <typename T>
230 void TaskQueue<T>::ResetBackoff() {
231 backoff_timer_->Stop();
232 backoff_entry_->Reset();
233 ScheduleDispatch();
234 }
235
236 template <typename T>
237 void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) {
238 DCHECK(CalledOnValidThread());
239 DCHECK(timer.get());
240 backoff_timer_ = std::move(timer);
241 }
242
243 template <typename T>
244 void TaskQueue<T>::FinishTask(const T& task) {
245 DCHECK(CalledOnValidThread());
246 DCHECK_GE(num_in_progress_, 1);
247 --num_in_progress_;
248 const size_t num_erased = tasks_.erase(task);
249 DCHECK_EQ(1U, num_erased);
250 }
251
252 template <typename T>
253 void TaskQueue<T>::ScheduleDispatch() {
254 DCHECK(CalledOnValidThread());
255 if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
256 return;
257 }
258
259 backoff_timer_->Start(
260 FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
261 }
262
263 template <typename T>
264 void TaskQueue<T>::Dispatch() {
265 DCHECK(CalledOnValidThread());
266 if (!ShouldDispatch()) {
267 return;
268 }
269
270 DCHECK(!queue_.empty());
271 const T& task = queue_.front();
272 ++num_in_progress_;
273 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
274 base::ThreadTaskRunnerHandle::Get()->PostTask(
275 FROM_HERE, base::Bind(process_callback_, task));
276 queue_.pop_front();
277 }
278
279 template <typename T>
280 bool TaskQueue<T>::ShouldDispatch() {
281 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
282 }
283
284 } // namespace syncer
285
286 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
OLDNEW
« no previous file with comments | « sync/internal_api/public/attachments/on_disk_attachment_store.h ('k') | sync/internal_api/public/base/DEPS » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698