OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/bind_helpers.h" | 8 #include "base/bind_helpers.h" |
9 #include "base/message_loop_proxy.h" | 9 #include "base/message_loop_proxy.h" |
10 #include "base/metrics/histogram.h" | 10 #include "base/metrics/histogram.h" |
(...skipping 307 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
318 // Start up the thread. | 318 // Start up the thread. |
319 // Note that it is safe to pass a temporary C-String to Thread(), as it will | 319 // Note that it is safe to pass a temporary C-String to Thread(), as it will |
320 // make a copy. | 320 // make a copy. |
321 std::string thread_name = | 321 std::string thread_name = |
322 base::StringPrintf("PAC thread #%d", thread_number); | 322 base::StringPrintf("PAC thread #%d", thread_number); |
323 thread_.reset(new base::Thread(thread_name.c_str())); | 323 thread_.reset(new base::Thread(thread_name.c_str())); |
324 CHECK(thread_->Start()); | 324 CHECK(thread_->Start()); |
325 } | 325 } |
326 | 326 |
327 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { | 327 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { |
328 DCHECK(!outstanding_job_); | 328 DCHECK(!outstanding_job_.get()); |
329 outstanding_job_ = job; | 329 outstanding_job_ = job; |
330 | 330 |
331 // Run the job. Once it has completed (regardless of whether it was | 331 // Run the job. Once it has completed (regardless of whether it was |
332 // cancelled), it will invoke OnJobCompleted() on this thread. | 332 // cancelled), it will invoke OnJobCompleted() on this thread. |
333 job->set_executor(this); | 333 job->set_executor(this); |
334 job->FinishedWaitingForThread(); | 334 job->FinishedWaitingForThread(); |
335 thread_->message_loop()->PostTask( | 335 thread_->message_loop()->PostTask( |
336 FROM_HERE, | 336 FROM_HERE, |
337 base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); | 337 base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); |
338 } | 338 } |
339 | 339 |
340 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { | 340 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { |
341 DCHECK_EQ(job, outstanding_job_.get()); | 341 DCHECK_EQ(job, outstanding_job_.get()); |
342 outstanding_job_ = NULL; | 342 outstanding_job_ = NULL; |
343 coordinator_->OnExecutorReady(this); | 343 coordinator_->OnExecutorReady(this); |
344 } | 344 } |
345 | 345 |
346 void MultiThreadedProxyResolver::Executor::Destroy() { | 346 void MultiThreadedProxyResolver::Executor::Destroy() { |
347 DCHECK(coordinator_); | 347 DCHECK(coordinator_); |
348 | 348 |
349 { | 349 { |
350 // See http://crbug.com/69710. | 350 // See http://crbug.com/69710. |
351 base::ThreadRestrictions::ScopedAllowIO allow_io; | 351 base::ThreadRestrictions::ScopedAllowIO allow_io; |
352 | 352 |
353 // Join the worker thread. | 353 // Join the worker thread. |
354 thread_.reset(); | 354 thread_.reset(); |
355 } | 355 } |
356 | 356 |
357 // Cancel any outstanding job. | 357 // Cancel any outstanding job. |
358 if (outstanding_job_) { | 358 if (outstanding_job_.get()) { |
359 outstanding_job_->Cancel(); | 359 outstanding_job_->Cancel(); |
360 // Orphan the job (since this executor may be deleted soon). | 360 // Orphan the job (since this executor may be deleted soon). |
361 outstanding_job_->set_executor(NULL); | 361 outstanding_job_->set_executor(NULL); |
362 } | 362 } |
363 | 363 |
364 // It is now safe to free the ProxyResolver, since all the tasks that | 364 // It is now safe to free the ProxyResolver, since all the tasks that |
365 // were using it on the resolver thread have completed. | 365 // were using it on the resolver thread have completed. |
366 resolver_.reset(); | 366 resolver_.reset(); |
367 | 367 |
368 // Null some stuff as a precaution. | 368 // Null some stuff as a precaution. |
369 coordinator_ = NULL; | 369 coordinator_ = NULL; |
370 outstanding_job_ = NULL; | 370 outstanding_job_ = NULL; |
371 } | 371 } |
372 | 372 |
373 void MultiThreadedProxyResolver::Executor::PurgeMemory() { | 373 void MultiThreadedProxyResolver::Executor::PurgeMemory() { |
374 thread_->message_loop()->PostTask( | 374 thread_->message_loop()->PostTask( |
375 FROM_HERE, | 375 FROM_HERE, |
376 base::Bind(&ProxyResolver::PurgeMemory, | 376 base::Bind(&ProxyResolver::PurgeMemory, |
377 base::Unretained(resolver_.get()))); | 377 base::Unretained(resolver_.get()))); |
378 } | 378 } |
379 | 379 |
380 MultiThreadedProxyResolver::Executor::~Executor() { | 380 MultiThreadedProxyResolver::Executor::~Executor() { |
381 // The important cleanup happens as part of Destroy(), which should always be | 381 // The important cleanup happens as part of Destroy(), which should always be |
382 // called first. | 382 // called first. |
383 DCHECK(!coordinator_) << "Destroy() was not called"; | 383 DCHECK(!coordinator_) << "Destroy() was not called"; |
384 DCHECK(!thread_.get()); | 384 DCHECK(!thread_.get()); |
385 DCHECK(!resolver_.get()); | 385 DCHECK(!resolver_.get()); |
386 DCHECK(!outstanding_job_); | 386 DCHECK(!outstanding_job_.get()); |
387 } | 387 } |
388 | 388 |
389 // MultiThreadedProxyResolver -------------------------------------------------- | 389 // MultiThreadedProxyResolver -------------------------------------------------- |
390 | 390 |
391 MultiThreadedProxyResolver::MultiThreadedProxyResolver( | 391 MultiThreadedProxyResolver::MultiThreadedProxyResolver( |
392 ProxyResolverFactory* resolver_factory, | 392 ProxyResolverFactory* resolver_factory, |
393 size_t max_num_threads) | 393 size_t max_num_threads) |
394 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), | 394 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), |
395 resolver_factory_(resolver_factory), | 395 resolver_factory_(resolver_factory), |
396 max_num_threads_(max_num_threads) { | 396 max_num_threads_(max_num_threads) { |
(...skipping 19 matching lines...) Expand all Loading... |
416 | 416 |
417 // Completion will be notified through |callback|, unless the caller cancels | 417 // Completion will be notified through |callback|, unless the caller cancels |
418 // the request using |request|. | 418 // the request using |request|. |
419 if (request) | 419 if (request) |
420 *request = reinterpret_cast<RequestHandle>(job.get()); | 420 *request = reinterpret_cast<RequestHandle>(job.get()); |
421 | 421 |
422 // If there is an executor that is ready to run this request, submit it! | 422 // If there is an executor that is ready to run this request, submit it! |
423 Executor* executor = FindIdleExecutor(); | 423 Executor* executor = FindIdleExecutor(); |
424 if (executor) { | 424 if (executor) { |
425 DCHECK_EQ(0u, pending_jobs_.size()); | 425 DCHECK_EQ(0u, pending_jobs_.size()); |
426 executor->StartJob(job); | 426 executor->StartJob(job.get()); |
427 return ERR_IO_PENDING; | 427 return ERR_IO_PENDING; |
428 } | 428 } |
429 | 429 |
430 // Otherwise queue this request. (We will schedule it to a thread once one | 430 // Otherwise queue this request. (We will schedule it to a thread once one |
431 // becomes available). | 431 // becomes available). |
432 job->WaitingForThread(); | 432 job->WaitingForThread(); |
433 pending_jobs_.push_back(job); | 433 pending_jobs_.push_back(job); |
434 | 434 |
435 // If we haven't already reached the thread limit, provision a new thread to | 435 // If we haven't already reached the thread limit, provision a new thread to |
436 // drain the requests more quickly. | 436 // drain the requests more quickly. |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
480 // anymore. | 480 // anymore. |
481 current_script_data_ = NULL; | 481 current_script_data_ = NULL; |
482 | 482 |
483 ReleaseAllExecutors(); | 483 ReleaseAllExecutors(); |
484 } | 484 } |
485 | 485 |
486 void MultiThreadedProxyResolver::PurgeMemory() { | 486 void MultiThreadedProxyResolver::PurgeMemory() { |
487 DCHECK(CalledOnValidThread()); | 487 DCHECK(CalledOnValidThread()); |
488 for (ExecutorList::iterator it = executors_.begin(); | 488 for (ExecutorList::iterator it = executors_.begin(); |
489 it != executors_.end(); ++it) { | 489 it != executors_.end(); ++it) { |
490 Executor* executor = *it; | 490 Executor* executor = it->get(); |
491 executor->PurgeMemory(); | 491 executor->PurgeMemory(); |
492 } | 492 } |
493 } | 493 } |
494 | 494 |
495 int MultiThreadedProxyResolver::SetPacScript( | 495 int MultiThreadedProxyResolver::SetPacScript( |
496 const scoped_refptr<ProxyResolverScriptData>& script_data, | 496 const scoped_refptr<ProxyResolverScriptData>& script_data, |
497 const CompletionCallback&callback) { | 497 const CompletionCallback&callback) { |
498 DCHECK(CalledOnValidThread()); | 498 DCHECK(CalledOnValidThread()); |
499 DCHECK(!callback.is_null()); | 499 DCHECK(!callback.is_null()); |
500 | 500 |
(...skipping 13 matching lines...) Expand all Loading... |
514 executor->StartJob(new SetPacScriptJob(script_data, callback)); | 514 executor->StartJob(new SetPacScriptJob(script_data, callback)); |
515 return ERR_IO_PENDING; | 515 return ERR_IO_PENDING; |
516 } | 516 } |
517 | 517 |
518 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { | 518 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { |
519 DCHECK(CalledOnValidThread()); | 519 DCHECK(CalledOnValidThread()); |
520 CHECK_EQ(0u, pending_jobs_.size()); | 520 CHECK_EQ(0u, pending_jobs_.size()); |
521 | 521 |
522 for (ExecutorList::const_iterator it = executors_.begin(); | 522 for (ExecutorList::const_iterator it = executors_.begin(); |
523 it != executors_.end(); ++it) { | 523 it != executors_.end(); ++it) { |
524 const Executor* executor = *it; | 524 const Executor* executor = it->get(); |
525 Job* job = executor->outstanding_job(); | 525 Job* job = executor->outstanding_job(); |
526 // The "has_user_callback()" is to exclude jobs for which the callback | 526 // The "has_user_callback()" is to exclude jobs for which the callback |
527 // has already been invoked, or was not user-initiated (as in the case of | 527 // has already been invoked, or was not user-initiated (as in the case of |
528 // lazy thread provisions). User-initiated jobs may !has_user_callback() | 528 // lazy thread provisions). User-initiated jobs may !has_user_callback() |
529 // when the callback has already been run. (Since we only clear the | 529 // when the callback has already been run. (Since we only clear the |
530 // outstanding job AFTER the callback has been invoked, it is possible | 530 // outstanding job AFTER the callback has been invoked, it is possible |
531 // for a new request to be started from within the callback). | 531 // for a new request to be started from within the callback). |
532 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); | 532 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); |
533 } | 533 } |
534 } | 534 } |
535 | 535 |
536 void MultiThreadedProxyResolver::ReleaseAllExecutors() { | 536 void MultiThreadedProxyResolver::ReleaseAllExecutors() { |
537 DCHECK(CalledOnValidThread()); | 537 DCHECK(CalledOnValidThread()); |
538 for (ExecutorList::iterator it = executors_.begin(); | 538 for (ExecutorList::iterator it = executors_.begin(); |
539 it != executors_.end(); ++it) { | 539 it != executors_.end(); ++it) { |
540 Executor* executor = *it; | 540 Executor* executor = it->get(); |
541 executor->Destroy(); | 541 executor->Destroy(); |
542 } | 542 } |
543 executors_.clear(); | 543 executors_.clear(); |
544 } | 544 } |
545 | 545 |
546 MultiThreadedProxyResolver::Executor* | 546 MultiThreadedProxyResolver::Executor* |
547 MultiThreadedProxyResolver::FindIdleExecutor() { | 547 MultiThreadedProxyResolver::FindIdleExecutor() { |
548 DCHECK(CalledOnValidThread()); | 548 DCHECK(CalledOnValidThread()); |
549 for (ExecutorList::iterator it = executors_.begin(); | 549 for (ExecutorList::iterator it = executors_.begin(); |
550 it != executors_.end(); ++it) { | 550 it != executors_.end(); ++it) { |
551 Executor* executor = *it; | 551 Executor* executor = it->get(); |
552 if (!executor->outstanding_job()) | 552 if (!executor->outstanding_job()) |
553 return executor; | 553 return executor; |
554 } | 554 } |
555 return NULL; | 555 return NULL; |
556 } | 556 } |
557 | 557 |
558 MultiThreadedProxyResolver::Executor* | 558 MultiThreadedProxyResolver::Executor* |
559 MultiThreadedProxyResolver::AddNewExecutor() { | 559 MultiThreadedProxyResolver::AddNewExecutor() { |
560 DCHECK(CalledOnValidThread()); | 560 DCHECK(CalledOnValidThread()); |
561 DCHECK_LT(executors_.size(), max_num_threads_); | 561 DCHECK_LT(executors_.size(), max_num_threads_); |
562 // The "thread number" is used to give the thread a unique name. | 562 // The "thread number" is used to give the thread a unique name. |
563 int thread_number = executors_.size(); | 563 int thread_number = executors_.size(); |
564 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); | 564 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); |
565 Executor* executor = new Executor( | 565 Executor* executor = new Executor( |
566 this, resolver, thread_number); | 566 this, resolver, thread_number); |
567 executors_.push_back(make_scoped_refptr(executor)); | 567 executors_.push_back(make_scoped_refptr(executor)); |
568 return executor; | 568 return executor; |
569 } | 569 } |
570 | 570 |
571 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 571 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
572 DCHECK(CalledOnValidThread()); | 572 DCHECK(CalledOnValidThread()); |
573 if (pending_jobs_.empty()) | 573 if (pending_jobs_.empty()) |
574 return; | 574 return; |
575 | 575 |
576 // Get the next job to process (FIFO). Transfer it from the pending queue | 576 // Get the next job to process (FIFO). Transfer it from the pending queue |
577 // to the executor. | 577 // to the executor. |
578 scoped_refptr<Job> job = pending_jobs_.front(); | 578 scoped_refptr<Job> job = pending_jobs_.front(); |
579 pending_jobs_.pop_front(); | 579 pending_jobs_.pop_front(); |
580 executor->StartJob(job); | 580 executor->StartJob(job.get()); |
581 } | 581 } |
582 | 582 |
583 } // namespace net | 583 } // namespace net |
OLD | NEW |