OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
6 | 6 |
7 #include <deque> | 7 #include <deque> |
8 #include <utility> | 8 #include <utility> |
9 #include <vector> | 9 #include <vector> |
10 | 10 |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
112 size_t max_num_threads, | 112 size_t max_num_threads, |
113 const scoped_refptr<ProxyResolverScriptData>& script_data, | 113 const scoped_refptr<ProxyResolverScriptData>& script_data, |
114 scoped_refptr<Executor> executor); | 114 scoped_refptr<Executor> executor); |
115 | 115 |
116 ~MultiThreadedProxyResolver() override; | 116 ~MultiThreadedProxyResolver() override; |
117 | 117 |
118 // ProxyResolver implementation: | 118 // ProxyResolver implementation: |
119 int GetProxyForURL(const GURL& url, | 119 int GetProxyForURL(const GURL& url, |
120 ProxyInfo* results, | 120 ProxyInfo* results, |
121 const CompletionCallback& callback, | 121 const CompletionCallback& callback, |
122 RequestHandle* request, | 122 std::unique_ptr<Request>* request, |
123 const NetLogWithSource& net_log) override; | 123 const NetLogWithSource& net_log) override; |
124 void CancelRequest(RequestHandle request) override; | |
125 LoadState GetLoadState(RequestHandle request) const override; | |
126 | 124 |
127 private: | 125 private: |
128 class GetProxyForURLJob; | 126 class GetProxyForURLJob; |
| 127 class RequestImpl; |
129 // FIFO queue of pending jobs waiting to be started. | 128 // FIFO queue of pending jobs waiting to be started. |
130 // TODO(eroman): Make this priority queue. | 129 // TODO(eroman): Make this priority queue. |
131 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; | 130 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; |
132 typedef std::vector<scoped_refptr<Executor>> ExecutorList; | 131 typedef std::vector<scoped_refptr<Executor>> ExecutorList; |
133 | 132 |
134 // Returns an idle worker thread which is ready to receive GetProxyForURL() | 133 // Returns an idle worker thread which is ready to receive GetProxyForURL() |
135 // requests. If all threads are occupied, returns NULL. | 134 // requests. If all threads are occupied, returns NULL. |
136 Executor* FindIdleExecutor(); | 135 Executor* FindIdleExecutor(); |
137 | 136 |
138 // Creates a new worker thread, and appends it to |executors_|. | 137 // Creates a new worker thread, and appends it to |executors_|. |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
226 | 225 |
227 virtual ~Job() {} | 226 virtual ~Job() {} |
228 | 227 |
229 private: | 228 private: |
230 const Type type_; | 229 const Type type_; |
231 CompletionCallback callback_; | 230 CompletionCallback callback_; |
232 Executor* executor_; | 231 Executor* executor_; |
233 bool was_cancelled_; | 232 bool was_cancelled_; |
234 }; | 233 }; |
235 | 234 |
| 235 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { |
| 236 public: |
| 237 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {} |
| 238 |
| 239 ~RequestImpl() override { job_->Cancel(); } |
| 240 |
| 241 LoadState GetLoadState() override { |
| 242 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; |
| 243 } |
| 244 |
| 245 private: |
| 246 scoped_refptr<Job> job_; |
| 247 }; |
| 248 |
236 // CreateResolverJob ----------------------------------------------------------- | 249 // CreateResolverJob ----------------------------------------------------------- |
237 | 250 |
238 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. | 251 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. |
239 class CreateResolverJob : public Job { | 252 class CreateResolverJob : public Job { |
240 public: | 253 public: |
241 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | 254 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, |
242 ProxyResolverFactory* factory) | 255 ProxyResolverFactory* factory) |
243 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), | 256 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), |
244 script_data_(script_data), | 257 script_data_(script_data), |
245 factory_(factory) {} | 258 factory_(factory) {} |
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
440 | 453 |
441 for (auto& executor : executors_) { | 454 for (auto& executor : executors_) { |
442 executor->Destroy(); | 455 executor->Destroy(); |
443 } | 456 } |
444 } | 457 } |
445 | 458 |
446 int MultiThreadedProxyResolver::GetProxyForURL( | 459 int MultiThreadedProxyResolver::GetProxyForURL( |
447 const GURL& url, | 460 const GURL& url, |
448 ProxyInfo* results, | 461 ProxyInfo* results, |
449 const CompletionCallback& callback, | 462 const CompletionCallback& callback, |
450 RequestHandle* request, | 463 std::unique_ptr<Request>* request, |
451 const NetLogWithSource& net_log) { | 464 const NetLogWithSource& net_log) { |
452 DCHECK(CalledOnValidThread()); | 465 DCHECK(CalledOnValidThread()); |
453 DCHECK(!callback.is_null()); | 466 DCHECK(!callback.is_null()); |
454 | 467 |
455 scoped_refptr<GetProxyForURLJob> job( | 468 scoped_refptr<GetProxyForURLJob> job( |
456 new GetProxyForURLJob(url, results, callback, net_log)); | 469 new GetProxyForURLJob(url, results, callback, net_log)); |
457 | 470 |
458 // Completion will be notified through |callback|, unless the caller cancels | 471 // Completion will be notified through |callback|, unless the caller cancels |
459 // the request using |request|. | 472 // the request using |request|. |
460 if (request) | 473 if (request) |
461 *request = reinterpret_cast<RequestHandle>(job.get()); | 474 request->reset(new RequestImpl(job)); |
462 | 475 |
463 // If there is an executor that is ready to run this request, submit it! | 476 // If there is an executor that is ready to run this request, submit it! |
464 Executor* executor = FindIdleExecutor(); | 477 Executor* executor = FindIdleExecutor(); |
465 if (executor) { | 478 if (executor) { |
466 DCHECK_EQ(0u, pending_jobs_.size()); | 479 DCHECK_EQ(0u, pending_jobs_.size()); |
467 executor->StartJob(job.get()); | 480 executor->StartJob(job.get()); |
468 return ERR_IO_PENDING; | 481 return ERR_IO_PENDING; |
469 } | 482 } |
470 | 483 |
471 // Otherwise queue this request. (We will schedule it to a thread once one | 484 // Otherwise queue this request. (We will schedule it to a thread once one |
472 // becomes available). | 485 // becomes available). |
473 job->WaitingForThread(); | 486 job->WaitingForThread(); |
474 pending_jobs_.push_back(job); | 487 pending_jobs_.push_back(job); |
475 | 488 |
476 // If we haven't already reached the thread limit, provision a new thread to | 489 // If we haven't already reached the thread limit, provision a new thread to |
477 // drain the requests more quickly. | 490 // drain the requests more quickly. |
478 if (executors_.size() < max_num_threads_) | 491 if (executors_.size() < max_num_threads_) |
479 AddNewExecutor(); | 492 AddNewExecutor(); |
480 | 493 |
481 return ERR_IO_PENDING; | 494 return ERR_IO_PENDING; |
482 } | 495 } |
483 | 496 |
484 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { | |
485 DCHECK(CalledOnValidThread()); | |
486 DCHECK(req); | |
487 | |
488 Job* job = reinterpret_cast<Job*>(req); | |
489 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); | |
490 | |
491 if (job->executor()) { | |
492 // If the job was already submitted to the executor, just mark it | |
493 // as cancelled so the user callback isn't run on completion. | |
494 job->Cancel(); | |
495 } else { | |
496 // Otherwise the job is just sitting in a queue. | |
497 PendingJobsQueue::iterator it = | |
498 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); | |
499 DCHECK(it != pending_jobs_.end()); | |
500 pending_jobs_.erase(it); | |
501 } | |
502 } | |
503 | |
504 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { | |
505 DCHECK(CalledOnValidThread()); | |
506 DCHECK(req); | |
507 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
508 } | |
509 | |
510 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { | 497 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { |
511 DCHECK(CalledOnValidThread()); | 498 DCHECK(CalledOnValidThread()); |
512 for (ExecutorList::iterator it = executors_.begin(); | 499 for (ExecutorList::iterator it = executors_.begin(); |
513 it != executors_.end(); ++it) { | 500 it != executors_.end(); ++it) { |
514 Executor* executor = it->get(); | 501 Executor* executor = it->get(); |
515 if (!executor->outstanding_job()) | 502 if (!executor->outstanding_job()) |
516 return executor; | 503 return executor; |
517 } | 504 } |
518 return NULL; | 505 return NULL; |
519 } | 506 } |
520 | 507 |
521 void MultiThreadedProxyResolver::AddNewExecutor() { | 508 void MultiThreadedProxyResolver::AddNewExecutor() { |
522 DCHECK(CalledOnValidThread()); | 509 DCHECK(CalledOnValidThread()); |
523 DCHECK_LT(executors_.size(), max_num_threads_); | 510 DCHECK_LT(executors_.size(), max_num_threads_); |
524 // The "thread number" is used to give the thread a unique name. | 511 // The "thread number" is used to give the thread a unique name. |
525 int thread_number = executors_.size(); | 512 int thread_number = executors_.size(); |
526 Executor* executor = new Executor(this, thread_number); | 513 Executor* executor = new Executor(this, thread_number); |
527 executor->StartJob( | 514 executor->StartJob( |
528 new CreateResolverJob(script_data_, resolver_factory_.get())); | 515 new CreateResolverJob(script_data_, resolver_factory_.get())); |
529 executors_.push_back(make_scoped_refptr(executor)); | 516 executors_.push_back(make_scoped_refptr(executor)); |
530 } | 517 } |
531 | 518 |
532 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 519 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
533 DCHECK(CalledOnValidThread()); | 520 DCHECK(CalledOnValidThread()); |
534 if (pending_jobs_.empty()) | 521 while (!pending_jobs_.empty()) { |
535 return; | 522 scoped_refptr<Job> job = pending_jobs_.front(); |
536 | 523 pending_jobs_.pop_front(); |
537 // Get the next job to process (FIFO). Transfer it from the pending queue | 524 if (!job->was_cancelled()) { |
538 // to the executor. | 525 executor->StartJob(job.get()); |
539 scoped_refptr<Job> job = pending_jobs_.front(); | 526 return; |
540 pending_jobs_.pop_front(); | 527 } |
541 executor->StartJob(job.get()); | 528 } |
542 } | 529 } |
543 | 530 |
544 } // namespace | 531 } // namespace |
545 | 532 |
546 class MultiThreadedProxyResolverFactory::Job | 533 class MultiThreadedProxyResolverFactory::Job |
547 : public ProxyResolverFactory::Request, | 534 : public ProxyResolverFactory::Request, |
548 public Executor::Coordinator { | 535 public Executor::Coordinator { |
549 public: | 536 public: |
550 Job(MultiThreadedProxyResolverFactory* factory, | 537 Job(MultiThreadedProxyResolverFactory* factory, |
551 const scoped_refptr<ProxyResolverScriptData>& script_data, | 538 const scoped_refptr<ProxyResolverScriptData>& script_data, |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
629 return ERR_IO_PENDING; | 616 return ERR_IO_PENDING; |
630 } | 617 } |
631 | 618 |
632 void MultiThreadedProxyResolverFactory::RemoveJob( | 619 void MultiThreadedProxyResolverFactory::RemoveJob( |
633 MultiThreadedProxyResolverFactory::Job* job) { | 620 MultiThreadedProxyResolverFactory::Job* job) { |
634 size_t erased = jobs_.erase(job); | 621 size_t erased = jobs_.erase(job); |
635 DCHECK_EQ(1u, erased); | 622 DCHECK_EQ(1u, erased); |
636 } | 623 } |
637 | 624 |
638 } // namespace net | 625 } // namespace net |
OLD | NEW |