Index: net/base/prioritized_dispatcher.cc |
=================================================================== |
--- net/base/prioritized_dispatcher.cc (revision 219192) |
+++ net/base/prioritized_dispatcher.cc (working copy) |
@@ -18,17 +18,7 @@ |
: queue_(limits.reserved_slots.size()), |
max_running_jobs_(limits.reserved_slots.size()), |
num_running_jobs_(0) { |
- size_t total = 0; |
- for (size_t i = 0; i < limits.reserved_slots.size(); ++i) { |
- total += limits.reserved_slots[i]; |
- max_running_jobs_[i] = total; |
- } |
- // Unreserved slots are available for all priorities. |
- DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs"; |
- size_t spare = limits.total_jobs - total; |
- for (size_t i = limits.reserved_slots.size(); i > 0; --i) { |
- max_running_jobs_[i - 1] += spare; |
- } |
+ SetLimits(limits); |
} |
PrioritizedDispatcher::~PrioritizedDispatcher() {} |
@@ -45,6 +35,18 @@ |
return queue_.Insert(job, priority); |
} |
+PrioritizedDispatcher::Handle PrioritizedDispatcher::AddAtHead( |
+ Job* job, Priority priority) { |
+ DCHECK(job); |
+ DCHECK_LT(priority, num_priorities()); |
+ if (num_running_jobs_ < max_running_jobs_[priority]) { |
+ ++num_running_jobs_; |
+ job->Start(); |
+ return Handle(); |
+ } |
+ return queue_.InsertAtFront(job, priority); |
+} |
+ |
void PrioritizedDispatcher::Cancel(const Handle& handle) { |
queue_.Erase(handle); |
} |
@@ -78,14 +80,47 @@ |
void PrioritizedDispatcher::OnJobFinished() { |
DCHECK_GT(num_running_jobs_, 0u); |
--num_running_jobs_; |
- Handle handle = queue_.FirstMax(); |
- if (handle.is_null()) { |
- DCHECK_EQ(0u, queue_.size()); |
- return; |
+ MaybeDispatchNextJob(); |
+} |
+ |
+PrioritizedDispatcher::Limits PrioritizedDispatcher::GetLimits() const { |
+ size_t num_priorities = max_running_jobs_.size(); |
+ Limits limits(num_priorities, max_running_jobs_.back()); |
+ |
+ // Calculate the number of jobs reserved for each priority and higher. Leave |
+ // the number of jobs reserved for the lowest priority or higher as 0. |
+ for (size_t i = 1; i < num_priorities; ++i) { |
+ limits.reserved_slots[i] = max_running_jobs_[i] - max_running_jobs_[i - 1]; |
} |
- MaybeDispatchJob(handle, handle.priority()); |
+ |
+ return limits; |
} |
+void PrioritizedDispatcher::SetLimits(const Limits& limits) { |
+ DCHECK_EQ(queue_.num_priorities(), limits.reserved_slots.size()); |
+ size_t total = 0; |
+ for (size_t i = 0; i < limits.reserved_slots.size(); ++i) { |
+ total += limits.reserved_slots[i]; |
+ max_running_jobs_[i] = total; |
+ } |
+ // Unreserved slots are available for all priorities. |
+ DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs"; |
+ size_t spare = limits.total_jobs - total; |
+ for (size_t i = limits.reserved_slots.size(); i > 0; --i) { |
+ max_running_jobs_[i - 1] += spare; |
+ } |
+ |
+ // Start pending jobs, if limits permit. |
+ while (true) { |
+ if (!MaybeDispatchNextJob()) |
+ break; |
+ } |
+} |
+ |
+void PrioritizedDispatcher::SetLimitsToZero() { |
+ SetLimits(Limits(queue_.num_priorities(), 0)); |
+} |
+ |
bool PrioritizedDispatcher::MaybeDispatchJob(const Handle& handle, |
Priority job_priority) { |
DCHECK_LT(job_priority, num_priorities()); |
@@ -98,4 +133,13 @@ |
return true; |
} |
+bool PrioritizedDispatcher::MaybeDispatchNextJob() { |
+ Handle handle = queue_.FirstMax(); |
+ if (handle.is_null()) { |
+ DCHECK_EQ(0u, queue_.size()); |
+ return false; |
+ } |
+ return MaybeDispatchJob(handle, handle.priority()); |
+} |
+ |
} // namespace net |