Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: sync/internal_api/public/attachments/task_queue.h

Issue 2130453004: [Sync] Move //sync to //components/sync. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sync/internal_api/public/attachments/on_disk_attachment_store.h ('k') | sync/internal_api/public/base/DEPS » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sync/internal_api/public/attachments/task_queue.h
diff --git a/sync/internal_api/public/attachments/task_queue.h b/sync/internal_api/public/attachments/task_queue.h
deleted file mode 100644
index 8e668b1ec29271abcd454ffd5806066dbefe7c38..0000000000000000000000000000000000000000
--- a/sync/internal_api/public/attachments/task_queue.h
+++ /dev/null
@@ -1,286 +0,0 @@
-// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
-#define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
-
-#include <stddef.h>
-
-#include <deque>
-#include <memory>
-#include <set>
-#include <utility>
-
-#include "base/bind.h"
-#include "base/callback.h"
-#include "base/macros.h"
-#include "base/memory/weak_ptr.h"
-#include "base/threading/non_thread_safe.h"
-#include "base/threading/thread_task_runner_handle.h"
-#include "base/time/time.h"
-#include "base/timer/timer.h"
-#include "net/base/backoff_entry.h"
-
-namespace syncer {
-
-// A queue that dispatches tasks, ignores duplicates, and provides backoff
-// semantics.
-//
-// |T| is the task type.
-//
-// For each task added to the queue, the HandleTaskCallback will eventually be
-// invoked. For each invocation, the user of TaskQueue must call exactly one of
-// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
-//
-// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
-//
-// Example usage:
-//
-// void Handle(const Foo& foo);
-// ...
-// TaskQueue<Foo> queue(base::Bind(&Handle),
-// base::TimeDelta::FromSeconds(1),
-// base::TimeDelta::FromMinutes(1));
-// ...
-// {
-// Foo foo;
-// // Add foo to the queue. At some point, Handle will be invoked in this
-// // message loop.
-// queue.AddToQueue(foo);
-// }
-// ...
-// void Handle(const Foo& foo) {
-// DoSomethingWith(foo);
-// // We must call one of the three methods to tell the queue how we're
-// // dealing with foo. Of course, we are free to call in the the context of
-// // this HandleTaskCallback or outside the context if we so choose.
-// if (SuccessfullyHandled(foo)) {
-// queue.MarkAsSucceeded(foo);
-// } else if (Failed(foo)) {
-// queue.MarkAsFailed(foo);
-// if (ShouldRetry(foo)) {
-// queue.AddToQueue(foo);
-// }
-// } else {
-// Cancel(foo);
-// }
-// }
-//
-template <typename T>
-class TaskQueue : base::NonThreadSafe {
- public:
- // A callback provided by users of the TaskQueue to handle tasks.
- //
- // This callback is invoked by the queue with a task to be handled. The
- // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
- // or |Cancel| to signify completion of the task.
- typedef base::Callback<void(const T&)> HandleTaskCallback;
-
- // Construct a TaskQueue.
- //
- // |callback| the callback to be invoked for handling tasks.
- //
- // |initial_backoff_delay| the initial amount of time the queue will wait
- // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
- // zero. Subsequent failures will increase the delay up to
- // |max_backoff_delay|.
- //
- // |max_backoff_delay| the maximum amount of time the queue will wait before
- // dispatching tasks. May be zero. Must be greater than or equal to
- // |initial_backoff_delay|.
- TaskQueue(const HandleTaskCallback& callback,
- const base::TimeDelta& initial_backoff_delay,
- const base::TimeDelta& max_backoff_delay);
-
- // Add |task| to the end of the queue.
- //
- // If |task| is already present (as determined by operator==) it is not added.
- void AddToQueue(const T& task);
-
- // Mark |task| as completing successfully.
- //
- // Marking a task as completing successfully will reduce or eliminate any
- // backoff delay in effect.
- //
- // May only be called after the HandleTaskCallback has been invoked with
- // |task|.
- void MarkAsSucceeded(const T& task);
-
- // Mark |task| as failed.
- //
- // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
- // of subsequent tasks. Repeated failures will increase the delay.
- //
- // May only be called after the HandleTaskCallback has been invoked with
- // |task|.
- void MarkAsFailed(const T& task);
-
- // Cancel |task|.
- //
- // |task| is removed from the queue and will not be retried. Does not affect
- // the backoff delay.
- //
- // May only be called after the HandleTaskCallback has been invoked with
- // |task|.
- void Cancel(const T& task);
-
- // Reset any backoff delay and resume dispatching of tasks.
- //
- // Useful for when you know the cause of previous failures has been resolved
- // and you want don't want to wait for the accumulated backoff delay to
- // elapse.
- void ResetBackoff();
-
- // Use |timer| for scheduled events.
- //
- // Used in tests. See also MockTimer.
- void SetTimerForTest(std::unique_ptr<base::Timer> timer);
-
- private:
- void FinishTask(const T& task);
- void ScheduleDispatch();
- void Dispatch();
- // Return true if we should dispatch tasks.
- bool ShouldDispatch();
-
- const HandleTaskCallback process_callback_;
- net::BackoffEntry::Policy backoff_policy_;
- std::unique_ptr<net::BackoffEntry> backoff_entry_;
- // The number of tasks currently being handled.
- int num_in_progress_;
- std::deque<T> queue_;
- // The set of tasks in queue_ or currently being handled.
- std::set<T> tasks_;
- base::Closure dispatch_closure_;
- std::unique_ptr<base::Timer> backoff_timer_;
- base::TimeDelta delay_;
-
- // Must be last data member.
- base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
-
- DISALLOW_COPY_AND_ASSIGN(TaskQueue);
-};
-
-// The maximum number of tasks that may be concurrently executed. Think
-// carefully before changing this value. The desired behavior of backoff may
-// not be obvious when there is more than one concurrent task
-const int kMaxConcurrentTasks = 1;
-
-template <typename T>
-TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
- const base::TimeDelta& initial_backoff_delay,
- const base::TimeDelta& max_backoff_delay)
- : process_callback_(callback),
- backoff_policy_({}),
- num_in_progress_(0),
- weak_ptr_factory_(this) {
- DCHECK_LE(initial_backoff_delay.InMicroseconds(),
- max_backoff_delay.InMicroseconds());
- backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
- backoff_policy_.multiply_factor = 2.0;
- backoff_policy_.jitter_factor = 0.1;
- backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
- backoff_policy_.entry_lifetime_ms = -1;
- backoff_policy_.always_use_initial_delay = false;
- backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
- dispatch_closure_ =
- base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
- backoff_timer_.reset(new base::Timer(false, false));
-}
-
-template <typename T>
-void TaskQueue<T>::AddToQueue(const T& task) {
- DCHECK(CalledOnValidThread());
- // Ignore duplicates.
- if (tasks_.find(task) == tasks_.end()) {
- queue_.push_back(task);
- tasks_.insert(task);
- }
- ScheduleDispatch();
-}
-
-template <typename T>
-void TaskQueue<T>::MarkAsSucceeded(const T& task) {
- DCHECK(CalledOnValidThread());
- FinishTask(task);
- // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
- // reschedule a dispatch.
- backoff_timer_->Stop();
- backoff_entry_->Reset();
- ScheduleDispatch();
-}
-
-template <typename T>
-void TaskQueue<T>::MarkAsFailed(const T& task) {
- DCHECK(CalledOnValidThread());
- FinishTask(task);
- backoff_entry_->InformOfRequest(false);
- ScheduleDispatch();
-}
-
-template <typename T>
-void TaskQueue<T>::Cancel(const T& task) {
- DCHECK(CalledOnValidThread());
- FinishTask(task);
- ScheduleDispatch();
-}
-
-template <typename T>
-void TaskQueue<T>::ResetBackoff() {
- backoff_timer_->Stop();
- backoff_entry_->Reset();
- ScheduleDispatch();
-}
-
-template <typename T>
-void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) {
- DCHECK(CalledOnValidThread());
- DCHECK(timer.get());
- backoff_timer_ = std::move(timer);
-}
-
-template <typename T>
-void TaskQueue<T>::FinishTask(const T& task) {
- DCHECK(CalledOnValidThread());
- DCHECK_GE(num_in_progress_, 1);
- --num_in_progress_;
- const size_t num_erased = tasks_.erase(task);
- DCHECK_EQ(1U, num_erased);
-}
-
-template <typename T>
-void TaskQueue<T>::ScheduleDispatch() {
- DCHECK(CalledOnValidThread());
- if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
- return;
- }
-
- backoff_timer_->Start(
- FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
-}
-
-template <typename T>
-void TaskQueue<T>::Dispatch() {
- DCHECK(CalledOnValidThread());
- if (!ShouldDispatch()) {
- return;
- }
-
- DCHECK(!queue_.empty());
- const T& task = queue_.front();
- ++num_in_progress_;
- DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
- base::ThreadTaskRunnerHandle::Get()->PostTask(
- FROM_HERE, base::Bind(process_callback_, task));
- queue_.pop_front();
-}
-
-template <typename T>
-bool TaskQueue<T>::ShouldDispatch() {
- return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
-}
-
-} // namespace syncer
-
-#endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
« no previous file with comments | « sync/internal_api/public/attachments/on_disk_attachment_store.h ('k') | sync/internal_api/public/base/DEPS » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698