Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(200)

Side by Side Diff: net/base/upload_data_stream.cc

Issue 10910268: net: Make UploadDataStream::Read() asynchronous (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/base/upload_data_stream.h ('k') | net/base/upload_data_stream_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/base/upload_data_stream.h" 5 #include "net/base/upload_data_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "net/base/io_buffer.h" 8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h" 9 #include "net/base/net_errors.h"
10 #include "net/base/upload_element_reader.h" 10 #include "net/base/upload_element_reader.h"
11 11
12 namespace net { 12 namespace net {
13 13
14 bool UploadDataStream::merge_chunks_ = true; 14 bool UploadDataStream::merge_chunks_ = true;
15 15
16 // static 16 // static
17 void UploadDataStream::ResetMergeChunks() { 17 void UploadDataStream::ResetMergeChunks() {
18 // WARNING: merge_chunks_ must match the above initializer. 18 // WARNING: merge_chunks_ must match the above initializer.
19 merge_chunks_ = true; 19 merge_chunks_ = true;
20 } 20 }
21 21
22 UploadDataStream::UploadDataStream(UploadData* upload_data) 22 UploadDataStream::UploadDataStream(UploadData* upload_data)
23 : upload_data_(upload_data), 23 : upload_data_(upload_data),
24 element_index_(0), 24 element_index_(0),
25 total_size_(0), 25 total_size_(0),
26 current_position_(0), 26 current_position_(0),
27 initialized_successfully_(false), 27 initialized_successfully_(false),
28 weak_ptr_factory_(this) { 28 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
29 const std::vector<UploadElement>& elements = *upload_data_->elements(); 29 const std::vector<UploadElement>& elements = *upload_data_->elements();
30 for (size_t i = 0; i < elements.size(); ++i) 30 for (size_t i = 0; i < elements.size(); ++i)
31 element_readers_.push_back(UploadElementReader::Create(elements[i])); 31 element_readers_.push_back(UploadElementReader::Create(elements[i]));
32 } 32 }
33 33
34 UploadDataStream::~UploadDataStream() { 34 UploadDataStream::~UploadDataStream() {
35 } 35 }
36 36
37 int UploadDataStream::Init(const CompletionCallback& callback) { 37 int UploadDataStream::Init(const CompletionCallback& callback) {
38 DCHECK(!initialized_successfully_); 38 DCHECK(!initialized_successfully_);
(...skipping 16 matching lines...) Expand all
55 if (result != OK) { 55 if (result != OK) {
56 element_readers_.clear(); 56 element_readers_.clear();
57 return result; 57 return result;
58 } 58 }
59 } 59 }
60 60
61 FinalizeInitialization(); 61 FinalizeInitialization();
62 return OK; 62 return OK;
63 } 63 }
64 64
65 int UploadDataStream::Read(IOBuffer* buf, int buf_len) { 65 int UploadDataStream::Read(IOBuffer* buf,
66 int buf_len,
67 const CompletionCallback& callback) {
66 DCHECK(initialized_successfully_); 68 DCHECK(initialized_successfully_);
69 DCHECK(!callback.is_null());
70 DCHECK_GT(buf_len, 0);
71
72 // Process chunked data with ReadSync() since it requires special logic, and
73 // is always in memory.
74 if (is_chunked())
75 return ReadSync(buf, buf_len);
76
77 const bool invoked_asynchronously = false;
78 return ReadInternal(new DrainableIOBuffer(buf, buf_len),
79 invoked_asynchronously, callback, 0);
80 }
81
82 int UploadDataStream::ReadSync(IOBuffer* buf, int buf_len) {
83 DCHECK(initialized_successfully_);
84 DCHECK_GT(buf_len, 0);
67 85
68 // Initialize readers for newly appended chunks. 86 // Initialize readers for newly appended chunks.
69 if (is_chunked()) { 87 if (is_chunked()) {
70 const std::vector<UploadElement>& elements = *upload_data_->elements(); 88 const std::vector<UploadElement>& elements = *upload_data_->elements();
71 DCHECK_LE(element_readers_.size(), elements.size()); 89 DCHECK_LE(element_readers_.size(), elements.size());
72 90
73 for (size_t i = element_readers_.size(); i < elements.size(); ++i) { 91 for (size_t i = element_readers_.size(); i < elements.size(); ++i) {
92 // We can initialize readers synchronously here because only bytes can be
93 // appended for chunked data. We leave |total_size_| at zero, since for
94 // chunked uploads, we may not know the total size.
74 const UploadElement& element = elements[i]; 95 const UploadElement& element = elements[i];
75 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type()); 96 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type());
76 UploadElementReader* reader = UploadElementReader::Create(element); 97 UploadElementReader* reader = UploadElementReader::Create(element);
77 98
78 const int rv = reader->InitSync(); 99 const int rv = reader->InitSync();
79 DCHECK_EQ(rv, OK); 100 DCHECK_EQ(rv, OK);
80 element_readers_.push_back(reader); 101 element_readers_.push_back(reader);
81 } 102 }
82 } 103 }
83 104
84 int bytes_copied = 0; 105 int bytes_copied = 0;
85 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) { 106 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) {
86 UploadElementReader* reader = element_readers_[element_index_]; 107 UploadElementReader* reader = element_readers_[element_index_];
87 bytes_copied += reader->ReadSync(buf->data() + bytes_copied, 108 scoped_refptr<DrainableIOBuffer> sub_buffer =
88 buf_len - bytes_copied); 109 new DrainableIOBuffer(buf, buf_len);
110 sub_buffer->SetOffset(bytes_copied);
111 bytes_copied += reader->ReadSync(sub_buffer, sub_buffer->BytesRemaining());
89 if (reader->BytesRemaining() == 0) 112 if (reader->BytesRemaining() == 0)
90 ++element_index_; 113 ++element_index_;
91 114
92 if (is_chunked() && !merge_chunks_) 115 if (is_chunked() && !merge_chunks_)
93 break; 116 break;
94 } 117 }
95 118
96 current_position_ += bytes_copied; 119 current_position_ += bytes_copied;
97 if (is_chunked() && !IsEOF() && bytes_copied == 0) 120 if (is_chunked() && !IsEOF() && bytes_copied == 0)
98 return ERR_IO_PENDING; 121 return ERR_IO_PENDING;
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
172 uint64 total_size = 0; 195 uint64 total_size = 0;
173 for (size_t i = 0; i < element_readers_.size(); ++i) { 196 for (size_t i = 0; i < element_readers_.size(); ++i) {
174 UploadElementReader* reader = element_readers_[i]; 197 UploadElementReader* reader = element_readers_[i];
175 total_size += reader->GetContentLength(); 198 total_size += reader->GetContentLength();
176 } 199 }
177 total_size_ = total_size; 200 total_size_ = total_size;
178 } 201 }
179 initialized_successfully_ = true; 202 initialized_successfully_ = true;
180 } 203 }
181 204
205 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
206 bool invoked_asynchronously,
207 const CompletionCallback& callback,
208 int previous_result) {
209 DCHECK(initialized_successfully_);
210 DCHECK_GE(previous_result, 0);
211
212 // Add the last result.
213 buf->DidConsume(previous_result);
214
215 while (element_index_ < element_readers_.size()) {
216 UploadElementReader* reader = element_readers_[element_index_];
217
218 if (reader->BytesRemaining() == 0) {
219 ++element_index_;
220 continue;
221 }
222
223 if (buf->BytesRemaining() == 0)
224 break;
225
226 const int result = reader->Read(
227 buf,
228 buf->BytesRemaining(),
229 base::Bind(base::IgnoreResult(&UploadDataStream::ReadInternal),
230 weak_ptr_factory_.GetWeakPtr(),
231 buf,
232 true, // invoked_asynchronously
233 callback));
234 if (result == ERR_IO_PENDING)
235 return ERR_IO_PENDING;
236 DCHECK_GE(result, 0);
237 buf->DidConsume(result);
238 }
239
240 const int bytes_copied = buf->BytesConsumed();
241 current_position_ += bytes_copied;
242
243 // When invoked asynchronously, callback is the only way to return the result.
244 if (invoked_asynchronously)
245 callback.Run(bytes_copied);
246 return bytes_copied;
247 }
248
182 } // namespace net 249 } // namespace net
OLDNEW
« no previous file with comments | « net/base/upload_data_stream.h ('k') | net/base/upload_data_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698