Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1207)

Unified Diff: content/browser/download/byte_stream.cc

Issue 12440036: Move ByteStream to content/browser (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Rebase Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/download/byte_stream.h ('k') | content/browser/download/byte_stream_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/download/byte_stream.cc
diff --git a/content/browser/download/byte_stream.cc b/content/browser/download/byte_stream.cc
deleted file mode 100644
index 26224ad6ff668d622274a652487cb3571af9f451..0000000000000000000000000000000000000000
--- a/content/browser/download/byte_stream.cc
+++ /dev/null
@@ -1,434 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "content/browser/download/byte_stream.h"
-
-#include "base/bind.h"
-#include "base/location.h"
-#include "base/memory/weak_ptr.h"
-#include "base/memory/ref_counted.h"
-#include "base/sequenced_task_runner.h"
-
-namespace content {
-namespace {
-
-typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
-ContentVector;
-
-class ByteStreamReaderImpl;
-
-// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
-// cleared in an object destructor and accessed to check for object
-// existence. We can't use weak pointers because they're tightly tied to
-// threads rather than task runners.
-// TODO(rdsmith): A better solution would be extending weak pointers
-// to support SequencedTaskRunners.
-struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
- public:
- LifetimeFlag() : is_alive(true) { }
- bool is_alive;
-
- protected:
- friend class base::RefCountedThreadSafe<LifetimeFlag>;
- virtual ~LifetimeFlag() { }
-
- private:
- DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
-};
-
-// For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
-// SetPeer may happen anywhere; all other operations on each class must
-// happen in the context of their SequencedTaskRunner.
-class ByteStreamWriterImpl : public ByteStreamWriter {
- public:
- ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
- scoped_refptr<LifetimeFlag> lifetime_flag,
- size_t buffer_size);
- virtual ~ByteStreamWriterImpl();
-
- // Must be called before any operations are performed.
- void SetPeer(ByteStreamReaderImpl* peer,
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
- scoped_refptr<LifetimeFlag> peer_lifetime_flag);
-
- // Overridden from ByteStreamWriter.
- virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
- size_t byte_count) OVERRIDE;
- virtual void Close(DownloadInterruptReason status) OVERRIDE;
- virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
-
- // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
- static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
- ByteStreamWriterImpl* target,
- size_t bytes_consumed);
-
- private:
- // Called from UpdateWindow when object existence has been validated.
- void UpdateWindowInternal(size_t bytes_consumed);
-
- void PostToPeer(bool complete, DownloadInterruptReason status);
-
- const size_t total_buffer_size_;
-
- // All data objects in this class are only valid to access on
- // this task runner except as otherwise noted.
- scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
-
- // True while this object is alive.
- scoped_refptr<LifetimeFlag> my_lifetime_flag_;
-
- base::Closure space_available_callback_;
- ContentVector input_contents_;
- size_t input_contents_size_;
-
- // ** Peer information.
-
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
-
- // How much we've sent to the output that for flow control purposes we
- // must assume hasn't been read yet.
- size_t output_size_used_;
-
- // Only valid to access on peer_task_runner_.
- scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
-
- // Only valid to access on peer_task_runner_ if
- // |*peer_lifetime_flag_ == true|
- ByteStreamReaderImpl* peer_;
-};
-
-class ByteStreamReaderImpl : public ByteStreamReader {
- public:
- ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
- scoped_refptr<LifetimeFlag> lifetime_flag,
- size_t buffer_size);
- virtual ~ByteStreamReaderImpl();
-
- // Must be called before any operations are performed.
- void SetPeer(ByteStreamWriterImpl* peer,
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
- scoped_refptr<LifetimeFlag> peer_lifetime_flag);
-
- // Overridden from ByteStreamReader.
- virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
- size_t* length) OVERRIDE;
- virtual DownloadInterruptReason GetStatus() const OVERRIDE;
- virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
-
- // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and
- // |ByteStreamWriterImpl::Close|.
- // Receive data from our peer.
- // static because it may be called after the object it is targeting
- // has been destroyed. It may not access |*target|
- // if |*object_lifetime_flag| is false.
- static void TransferData(
- scoped_refptr<LifetimeFlag> object_lifetime_flag,
- ByteStreamReaderImpl* target,
- scoped_ptr<ContentVector> transfer_buffer,
- size_t transfer_buffer_bytes,
- bool source_complete,
- DownloadInterruptReason status);
-
- private:
- // Called from TransferData once object existence has been validated.
- void TransferDataInternal(
- scoped_ptr<ContentVector> transfer_buffer,
- size_t transfer_buffer_bytes,
- bool source_complete,
- DownloadInterruptReason status);
-
- void MaybeUpdateInput();
-
- const size_t total_buffer_size_;
-
- scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
-
- // True while this object is alive.
- scoped_refptr<LifetimeFlag> my_lifetime_flag_;
-
- ContentVector available_contents_;
-
- bool received_status_;
- DownloadInterruptReason status_;
-
- base::Closure data_available_callback_;
-
- // Time of last point at which data in stream transitioned from full
- // to non-full. Nulled when a callback is sent.
- base::Time last_non_full_time_;
-
- // ** Peer information
-
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
-
- // How much has been removed from this class that we haven't told
- // the input about yet.
- size_t unreported_consumed_bytes_;
-
- // Only valid to access on peer_task_runner_.
- scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
-
- // Only valid to access on peer_task_runner_ if
- // |*peer_lifetime_flag_ == true|
- ByteStreamWriterImpl* peer_;
-};
-
-ByteStreamWriterImpl::ByteStreamWriterImpl(
- scoped_refptr<base::SequencedTaskRunner> task_runner,
- scoped_refptr<LifetimeFlag> lifetime_flag,
- size_t buffer_size)
- : total_buffer_size_(buffer_size),
- my_task_runner_(task_runner),
- my_lifetime_flag_(lifetime_flag),
- input_contents_size_(0),
- output_size_used_(0),
- peer_(NULL) {
- DCHECK(my_lifetime_flag_.get());
- my_lifetime_flag_->is_alive = true;
-}
-
-ByteStreamWriterImpl::~ByteStreamWriterImpl() {
- my_lifetime_flag_->is_alive = false;
-}
-
-void ByteStreamWriterImpl::SetPeer(
- ByteStreamReaderImpl* peer,
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
- scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
- peer_ = peer;
- peer_task_runner_ = peer_task_runner;
- peer_lifetime_flag_ = peer_lifetime_flag;
-}
-
-bool ByteStreamWriterImpl::Write(
- scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
-
- input_contents_.push_back(std::make_pair(buffer, byte_count));
- input_contents_size_ += byte_count;
-
- // Arbitrarily, we buffer to a third of the total size before sending.
- if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
- PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE);
-
- return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
-}
-
-void ByteStreamWriterImpl::Close(
- DownloadInterruptReason status) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
- PostToPeer(true, status);
-}
-
-void ByteStreamWriterImpl::RegisterCallback(
- const base::Closure& source_callback) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
- space_available_callback_ = source_callback;
-}
-
-// static
-void ByteStreamWriterImpl::UpdateWindow(
- scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
- size_t bytes_consumed) {
- // If the target object isn't alive anymore, we do nothing.
- if (!lifetime_flag->is_alive) return;
-
- target->UpdateWindowInternal(bytes_consumed);
-}
-
-void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
- DCHECK_GE(output_size_used_, bytes_consumed);
- output_size_used_ -= bytes_consumed;
-
- // Callback if we were above the limit and we're now <= to it.
- size_t total_known_size_used =
- input_contents_size_ + output_size_used_;
-
- if (total_known_size_used <= total_buffer_size_ &&
- (total_known_size_used + bytes_consumed > total_buffer_size_) &&
- !space_available_callback_.is_null())
- space_available_callback_.Run();
-}
-
-void ByteStreamWriterImpl::PostToPeer(
- bool complete, DownloadInterruptReason status) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
- // Valid contexts in which to call.
- DCHECK(complete || 0 != input_contents_size_);
-
- scoped_ptr<ContentVector> transfer_buffer(new ContentVector);
- size_t buffer_size = 0;
- if (0 != input_contents_size_) {
- transfer_buffer.reset(new ContentVector);
- transfer_buffer->swap(input_contents_);
- buffer_size = input_contents_size_;
- output_size_used_ += input_contents_size_;
- input_contents_size_ = 0;
- }
- peer_task_runner_->PostTask(
- FROM_HERE, base::Bind(
- &ByteStreamReaderImpl::TransferData,
- peer_lifetime_flag_,
- peer_,
- base::Passed(&transfer_buffer),
- buffer_size,
- complete,
- status));
-}
-
-ByteStreamReaderImpl::ByteStreamReaderImpl(
- scoped_refptr<base::SequencedTaskRunner> task_runner,
- scoped_refptr<LifetimeFlag> lifetime_flag,
- size_t buffer_size)
- : total_buffer_size_(buffer_size),
- my_task_runner_(task_runner),
- my_lifetime_flag_(lifetime_flag),
- received_status_(false),
- status_(DOWNLOAD_INTERRUPT_REASON_NONE),
- unreported_consumed_bytes_(0),
- peer_(NULL) {
- DCHECK(my_lifetime_flag_.get());
- my_lifetime_flag_->is_alive = true;
-}
-
-ByteStreamReaderImpl::~ByteStreamReaderImpl() {
- my_lifetime_flag_->is_alive = false;
-}
-
-void ByteStreamReaderImpl::SetPeer(
- ByteStreamWriterImpl* peer,
- scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
- scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
- peer_ = peer;
- peer_task_runner_ = peer_task_runner;
- peer_lifetime_flag_ = peer_lifetime_flag;
-}
-
-ByteStreamReaderImpl::StreamState
-ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
- size_t* length) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
-
- if (available_contents_.size()) {
- *data = available_contents_.front().first;
- *length = available_contents_.front().second;
- available_contents_.pop_front();
- unreported_consumed_bytes_ += *length;
-
- MaybeUpdateInput();
- return STREAM_HAS_DATA;
- }
- if (received_status_) {
- return STREAM_COMPLETE;
- }
- return STREAM_EMPTY;
-}
-
-DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
- DCHECK(received_status_);
- return status_;
-}
-
-void ByteStreamReaderImpl::RegisterCallback(
- const base::Closure& sink_callback) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
-
- data_available_callback_ = sink_callback;
-}
-
-// static
-void ByteStreamReaderImpl::TransferData(
- scoped_refptr<LifetimeFlag> object_lifetime_flag,
- ByteStreamReaderImpl* target,
- scoped_ptr<ContentVector> transfer_buffer,
- size_t buffer_size,
- bool source_complete,
- DownloadInterruptReason status) {
- // If our target is no longer alive, do nothing.
- if (!object_lifetime_flag->is_alive) return;
-
- target->TransferDataInternal(
- transfer_buffer.Pass(), buffer_size, source_complete, status);
-}
-
-void ByteStreamReaderImpl::TransferDataInternal(
- scoped_ptr<ContentVector> transfer_buffer,
- size_t buffer_size,
- bool source_complete,
- DownloadInterruptReason status) {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
-
- bool was_empty = available_contents_.empty();
-
- if (transfer_buffer.get()) {
- available_contents_.insert(available_contents_.end(),
- transfer_buffer->begin(),
- transfer_buffer->end());
- }
-
- if (source_complete) {
- received_status_ = true;
- status_ = status;
- }
-
- // Callback on transition from empty to non-empty, or
- // source complete.
- if (((was_empty && !available_contents_.empty()) ||
- source_complete) &&
- !data_available_callback_.is_null())
- data_available_callback_.Run();
-}
-
-// Decide whether or not to send the input a window update.
-// Currently we do that whenever we've got unreported consumption
-// greater than 1/3 of total size.
-void ByteStreamReaderImpl::MaybeUpdateInput() {
- DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
-
- if (unreported_consumed_bytes_ <=
- total_buffer_size_ / kFractionReadBeforeWindowUpdate)
- return;
-
- peer_task_runner_->PostTask(
- FROM_HERE, base::Bind(
- &ByteStreamWriterImpl::UpdateWindow,
- peer_lifetime_flag_,
- peer_,
- unreported_consumed_bytes_));
- unreported_consumed_bytes_ = 0;
-}
-
-} // namespace
-
-
-const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
-const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
-
-ByteStreamReader::~ByteStreamReader() { }
-
-ByteStreamWriter::~ByteStreamWriter() { }
-
-void CreateByteStream(
- scoped_refptr<base::SequencedTaskRunner> input_task_runner,
- scoped_refptr<base::SequencedTaskRunner> output_task_runner,
- size_t buffer_size,
- scoped_ptr<ByteStreamWriter>* input,
- scoped_ptr<ByteStreamReader>* output) {
- scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
- scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
-
- ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
- input_task_runner, input_flag, buffer_size);
- ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
- output_task_runner, output_flag, buffer_size);
-
- in->SetPeer(out, output_task_runner, output_flag);
- out->SetPeer(in, input_task_runner, input_flag);
- input->reset(in);
- output->reset(out);
-}
-
-} // namespace content
« no previous file with comments | « content/browser/download/byte_stream.h ('k') | content/browser/download/byte_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698