Index: net/websockets/websocket_throttle.cc |
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc |
index 281a687ac6f2129b406e05f50f2134e18dff57a6..8853cc68991c1413da65e54bfe9471d548b36409 100644 |
--- a/net/websockets/websocket_throttle.cc |
+++ b/net/websockets/websocket_throttle.cc |
@@ -7,6 +7,7 @@ |
#include <algorithm> |
#include <set> |
#include <string> |
+#include <utility> |
#include "base/memory/singleton.h" |
#include "base/message_loop.h" |
@@ -19,6 +20,12 @@ |
namespace net { |
+namespace { |
+ |
+const size_t kMaxWebSocketJobsThrottled = 1024; |
+ |
+} // namespace |
+ |
WebSocketThrottle::WebSocketThrottle() { |
} |
@@ -32,7 +39,10 @@ WebSocketThrottle* WebSocketThrottle::GetInstance() { |
return Singleton<WebSocketThrottle>::get(); |
} |
-void WebSocketThrottle::PutInQueue(WebSocketJob* job) { |
+bool WebSocketThrottle::PutInQueue(WebSocketJob* job) { |
+ if (queue_.size() >= kMaxWebSocketJobsThrottled) |
+ return false; |
+ |
queue_.push_back(job); |
const AddressList& address_list = job->address_list(); |
std::set<IPEndPoint> address_set; |
@@ -46,15 +56,18 @@ void WebSocketThrottle::PutInQueue(WebSocketJob* job) { |
ConnectingAddressMap::iterator iter = addr_map_.find(address); |
if (iter == addr_map_.end()) { |
- ConnectingQueue* queue = new ConnectingQueue(); |
- queue->push_back(job); |
- addr_map_[address] = queue; |
+ ConnectingAddressMap::iterator new_queue = |
+ addr_map_.insert(make_pair(address, ConnectingQueue())).first; |
+ new_queue->second.push_back(job); |
} else { |
- iter->second->push_back(job); |
+ DCHECK(!iter->second.empty()); |
+ iter->second.push_back(job); |
job->SetWaiting(); |
DVLOG(1) << "Waiting on " << address.ToString(); |
} |
} |
+ |
+ return true; |
} |
void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { |
@@ -63,10 +76,13 @@ void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { |
if (queue_iter == queue_.end()) |
return; |
queue_.erase(queue_iter); |
- const AddressList& address_list = job->address_list(); |
+ |
+ std::set<WebSocketJob*> wakeup_candidates; |
+ |
+ const AddressList& resolved_address_list = job->address_list(); |
std::set<IPEndPoint> address_set; |
- for (AddressList::const_iterator addr_iter = address_list.begin(); |
- addr_iter != address_list.end(); |
+ for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); |
+ addr_iter != resolved_address_list.end(); |
++addr_iter) { |
const IPEndPoint& address = *addr_iter; |
// If |address| is already processed, don't do it again. |
@@ -76,37 +92,46 @@ void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { |
ConnectingAddressMap::iterator map_iter = addr_map_.find(address); |
DCHECK(map_iter != addr_map_.end()); |
- ConnectingQueue* queue = map_iter->second; |
- // Job may not be front of queue if the socket is closed while waiting. |
- ConnectingQueue::iterator address_queue_iter = |
- std::find(queue->begin(), queue->end(), job); |
- if (address_queue_iter != queue->end()) |
- queue->erase(address_queue_iter); |
- if (queue->empty()) { |
- delete queue; |
+ ConnectingQueue& per_address_queue = map_iter->second; |
+ DCHECK(!per_address_queue.empty()); |
+ // Job may not be front of the queue if the socket is closed while waiting. |
+ ConnectingQueue::iterator per_address_queue_iter = |
+ std::find(per_address_queue.begin(), per_address_queue.end(), job); |
+ bool was_front = false; |
+ if (per_address_queue_iter != per_address_queue.end()) { |
+ was_front = (per_address_queue_iter == per_address_queue.begin()); |
+ per_address_queue.erase(per_address_queue_iter); |
+ } |
+ if (per_address_queue.empty()) { |
addr_map_.erase(map_iter); |
+ } else if (was_front) { |
+ // The new front is a wake-up candidate. |
+ wakeup_candidates.insert(per_address_queue.front()); |
} |
} |
+ |
+ WakeupSocketIfNecessary(wakeup_candidates); |
} |
-void WebSocketThrottle::WakeupSocketIfNecessary() { |
- for (ConnectingQueue::iterator iter = queue_.begin(); |
- iter != queue_.end(); |
+void WebSocketThrottle::WakeupSocketIfNecessary( |
+ const std::set<WebSocketJob*>& wakeup_candidates) { |
+ for (std::set<WebSocketJob*>::const_iterator iter = wakeup_candidates.begin(); |
+ iter != wakeup_candidates.end(); |
++iter) { |
WebSocketJob* job = *iter; |
if (!job->IsWaiting()) |
continue; |
bool should_wakeup = true; |
- const AddressList& address_list = job->address_list(); |
- for (AddressList::const_iterator addr_iter = address_list.begin(); |
- addr_iter != address_list.end(); |
+ const AddressList& resolved_address_list = job->address_list(); |
+ for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); |
+ addr_iter != resolved_address_list.end(); |
++addr_iter) { |
const IPEndPoint& address = *addr_iter; |
ConnectingAddressMap::iterator map_iter = addr_map_.find(address); |
DCHECK(map_iter != addr_map_.end()); |
- ConnectingQueue* queue = map_iter->second; |
- if (job != queue->front()) { |
+ const ConnectingQueue& per_address_queue = map_iter->second; |
+ if (job != per_address_queue.front()) { |
should_wakeup = false; |
break; |
} |