Index: net/base/upload_data_stream.cc |
diff --git a/net/base/upload_data_stream.cc b/net/base/upload_data_stream.cc |
index a62528d9bf816adacef194387cd06d21178f2fab..5b57a26b1aaa9d3a40277c3152521d515aa2e14e 100644 |
--- a/net/base/upload_data_stream.cc |
+++ b/net/base/upload_data_stream.cc |
@@ -25,7 +25,7 @@ UploadDataStream::UploadDataStream(UploadData* upload_data) |
total_size_(0), |
current_position_(0), |
initialized_successfully_(false), |
- weak_ptr_factory_(this) { |
+ weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
const std::vector<UploadElement>& elements = *upload_data_->elements(); |
for (size_t i = 0; i < elements.size(); ++i) |
element_readers_.push_back(UploadElementReader::Create(elements[i])); |
@@ -62,8 +62,26 @@ int UploadDataStream::InitSync() { |
return OK; |
} |
-int UploadDataStream::Read(IOBuffer* buf, int buf_len) { |
+int UploadDataStream::Read(IOBuffer* buf, |
+ int buf_len, |
+ const CompletionCallback& callback) { |
DCHECK(initialized_successfully_); |
+ DCHECK(!callback.is_null()); |
+ DCHECK_GT(buf_len, 0); |
+ |
+ // Process chunked data with ReadSync() since it requires special logic, and |
+ // is always in memory. |
+ if (is_chunked()) |
+ return ReadSync(buf, buf_len); |
+ |
+ const bool invoked_asynchronously = false; |
+ return ReadInternal(new DrainableIOBuffer(buf, buf_len), |
+ invoked_asynchronously, callback, 0); |
+} |
+ |
+int UploadDataStream::ReadSync(IOBuffer* buf, int buf_len) { |
+ DCHECK(initialized_successfully_); |
+ DCHECK_GT(buf_len, 0); |
// Initialize readers for newly appended chunks. |
if (is_chunked()) { |
@@ -71,6 +89,9 @@ int UploadDataStream::Read(IOBuffer* buf, int buf_len) { |
DCHECK_LE(element_readers_.size(), elements.size()); |
for (size_t i = element_readers_.size(); i < elements.size(); ++i) { |
+ // We can initialize readers synchronously here because only bytes can be |
+ // appended for chunked data. We leave |total_size_| at zero, since for |
+ // chunked uploads, we may not know the total size. |
const UploadElement& element = elements[i]; |
DCHECK_EQ(UploadElement::TYPE_BYTES, element.type()); |
UploadElementReader* reader = UploadElementReader::Create(element); |
@@ -84,8 +105,10 @@ int UploadDataStream::Read(IOBuffer* buf, int buf_len) { |
int bytes_copied = 0; |
while (bytes_copied < buf_len && element_index_ < element_readers_.size()) { |
UploadElementReader* reader = element_readers_[element_index_]; |
- bytes_copied += reader->ReadSync(buf->data() + bytes_copied, |
- buf_len - bytes_copied); |
+ scoped_refptr<DrainableIOBuffer> sub_buffer = |
+ new DrainableIOBuffer(buf, buf_len); |
+ sub_buffer->SetOffset(bytes_copied); |
+ bytes_copied += reader->ReadSync(sub_buffer, sub_buffer->BytesRemaining()); |
if (reader->BytesRemaining() == 0) |
++element_index_; |
@@ -179,4 +202,48 @@ void UploadDataStream::FinalizeInitialization() { |
initialized_successfully_ = true; |
} |
+int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf, |
+ bool invoked_asynchronously, |
+ const CompletionCallback& callback, |
+ int previous_result) { |
+ DCHECK(initialized_successfully_); |
+ DCHECK_GE(previous_result, 0); |
+ |
+ // Add the last result. |
+ buf->DidConsume(previous_result); |
+ |
+ while (element_index_ < element_readers_.size()) { |
+ UploadElementReader* reader = element_readers_[element_index_]; |
+ |
+ if (reader->BytesRemaining() == 0) { |
+ ++element_index_; |
+ continue; |
+ } |
+ |
+ if (buf->BytesRemaining() == 0) |
+ break; |
+ |
+ const int result = reader->Read( |
+ buf, |
+ buf->BytesRemaining(), |
+ base::Bind(base::IgnoreResult(&UploadDataStream::ReadInternal), |
+ weak_ptr_factory_.GetWeakPtr(), |
+ buf, |
+ true, // invoked_asynchronously |
+ callback)); |
+ if (result == ERR_IO_PENDING) |
+ return ERR_IO_PENDING; |
+ DCHECK_GE(result, 0); |
+ buf->DidConsume(result); |
+ } |
+ |
+ const int bytes_copied = buf->BytesConsumed(); |
+ current_position_ += bytes_copied; |
+ |
+ // When invoked asynchronously, callback is the only way to return the result. |
+ if (invoked_asynchronously) |
+ callback.Run(bytes_copied); |
+ return bytes_copied; |
+} |
+ |
} // namespace net |