Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(475)

Side by Side Diff: net/base/upload_data_stream.cc

Issue 10910268: net: Make UploadDataStream::Read() asynchronous (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add comments about chunk upload behavior Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/base/upload_data_stream.h" 5 #include "net/base/upload_data_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "base/message_loop.h"
8 #include "net/base/io_buffer.h" 9 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h" 10 #include "net/base/net_errors.h"
10 #include "net/base/upload_element_reader.h" 11 #include "net/base/upload_element_reader.h"
11 12
12 namespace net { 13 namespace net {
13 14
14 bool UploadDataStream::merge_chunks_ = true; 15 bool UploadDataStream::merge_chunks_ = true;
15 16
16 // static 17 // static
17 void UploadDataStream::ResetMergeChunks() { 18 void UploadDataStream::ResetMergeChunks() {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 if (result != OK) { 56 if (result != OK) {
56 element_readers_.clear(); 57 element_readers_.clear();
57 return result; 58 return result;
58 } 59 }
59 } 60 }
60 61
61 FinalizeInitialization(); 62 FinalizeInitialization();
62 return OK; 63 return OK;
63 } 64 }
64 65
65 int UploadDataStream::Read(IOBuffer* buf, int buf_len) { 66 int UploadDataStream::Read(IOBuffer* buf,
67 int buf_len,
68 const CompletionCallback& callback) {
69 DCHECK(initialized_successfully_);
mmenke 2012/10/15 19:54:45 DCHECK(!callback.is_null()); Probably also a good
hashimoto 2012/10/16 11:52:20 Done.
70
71 // Use fast path when all data is in memory, chunked data is also processed
72 // with ReadSync() since it requires special logic, and is always in memory.
73 if (IsInMemory() || is_chunked())
74 return ReadSync(buf, buf_len);
75
76 const bool invoked_asynchronously = false;
77 return ReadInternal(buf, buf_len, 0, invoked_asynchronously, callback, 0);
78 }
79
80 int UploadDataStream::ReadSync(IOBuffer* buf, int buf_len) {
66 DCHECK(initialized_successfully_); 81 DCHECK(initialized_successfully_);
67 82
68 // Initialize readers for newly appended chunks. 83 // Initialize readers for newly appended chunks.
69 if (is_chunked()) { 84 if (is_chunked()) {
70 const std::vector<UploadElement>& elements = *upload_data_->elements(); 85 const std::vector<UploadElement>& elements = *upload_data_->elements();
71 DCHECK_LE(element_readers_.size(), elements.size()); 86 DCHECK_LE(element_readers_.size(), elements.size());
72 87
73 for (size_t i = element_readers_.size(); i < elements.size(); ++i) { 88 for (size_t i = element_readers_.size(); i < elements.size(); ++i) {
89 // We can initialize readeres synchrnously here because only bytes can be
mmenke 2012/10/15 19:54:45 nit: readeres -> readers synchrnously -> synchron
hashimoto 2012/10/16 11:52:20 Oops, thanks. Done.
90 // appended for chunked data. We don't add each element's size to
91 // |toal_size_| because we treat the size of chunked data as zero.
mmenke 2012/10/15 19:54:45 nit: |toal_size_| -> |total_size_|
mmenke 2012/10/15 19:54:45 "because we treat the size of chunked data as zero
hashimoto 2012/10/16 11:52:20 Done.
74 const UploadElement& element = elements[i]; 92 const UploadElement& element = elements[i];
75 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type()); 93 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type());
76 UploadElementReader* reader = UploadElementReader::Create(element); 94 UploadElementReader* reader = UploadElementReader::Create(element);
77 95
78 const int rv = reader->InitSync(); 96 const int rv = reader->InitSync();
79 DCHECK_EQ(rv, OK); 97 DCHECK_EQ(rv, OK);
80 element_readers_.push_back(reader); 98 element_readers_.push_back(reader);
81 } 99 }
82 } 100 }
83 101
84 int bytes_copied = 0; 102 int bytes_copied = 0;
85 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) { 103 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) {
86 UploadElementReader* reader = element_readers_[element_index_]; 104 UploadElementReader* reader = element_readers_[element_index_];
87 bytes_copied += reader->ReadSync(buf->data() + bytes_copied, 105 scoped_refptr<DrainableIOBuffer> sub_buffer =
88 buf_len - bytes_copied); 106 new DrainableIOBuffer(buf, buf_len);
107 sub_buffer->SetOffset(bytes_copied);
108 bytes_copied += reader->ReadSync(sub_buffer, sub_buffer->BytesRemaining());
89 if (reader->BytesRemaining() == 0) 109 if (reader->BytesRemaining() == 0)
90 ++element_index_; 110 ++element_index_;
91 111
92 if (is_chunked() && !merge_chunks_) 112 if (is_chunked() && !merge_chunks_)
93 break; 113 break;
94 } 114 }
95 115
96 current_position_ += bytes_copied; 116 current_position_ += bytes_copied;
97 if (is_chunked() && !IsEOF() && bytes_copied == 0) 117 if (is_chunked() && !IsEOF() && bytes_copied == 0)
98 return ERR_IO_PENDING; 118 return ERR_IO_PENDING;
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
172 uint64 total_size = 0; 192 uint64 total_size = 0;
173 for (size_t i = 0; i < element_readers_.size(); ++i) { 193 for (size_t i = 0; i < element_readers_.size(); ++i) {
174 UploadElementReader* reader = element_readers_[i]; 194 UploadElementReader* reader = element_readers_[i];
175 total_size += reader->GetContentLength(); 195 total_size += reader->GetContentLength();
176 } 196 }
177 total_size_ = total_size; 197 total_size_ = total_size;
178 } 198 }
179 initialized_successfully_ = true; 199 initialized_successfully_ = true;
180 } 200 }
181 201
202 int UploadDataStream::ReadInternal(scoped_refptr<IOBuffer> buf,
203 int buf_len,
204 int bytes_copied,
mmenke 2012/10/15 19:54:45 I think this would be simpler if instead of passin
hashimoto 2012/10/16 11:52:20 Sounds good, done.
mmenke 2012/10/16 19:13:41 I don't have particularly strong feelings about th
hashimoto 2012/10/17 08:04:31 IMHO the destination buffer and the callback are t
205 bool invoked_asynchronously,
206 const CompletionCallback& callback,
207 int previous_result) {
208 DCHECK(initialized_successfully_);
209 DCHECK_GE(previous_result, 0);
210
211 // Add the last result.
212 bytes_copied += previous_result;
213
214 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) {
215 UploadElementReader* reader = element_readers_[element_index_];
216
217 if (reader->BytesRemaining() == 0) {
218 ++element_index_;
219 continue;
220 }
221 scoped_refptr<DrainableIOBuffer> sub_buffer =
222 new DrainableIOBuffer(buf, buf_len);
223 sub_buffer->SetOffset(bytes_copied);
224 const int result = reader->Read(
225 sub_buffer,
226 sub_buffer->BytesRemaining(),
227 base::Bind(base::IgnoreResult(&UploadDataStream::ReadInternal),
228 weak_ptr_factory_.GetWeakPtr(),
229 buf,
230 buf_len,
231 bytes_copied,
232 true, // invoked_asynchronously
233 callback));
234 if (result == ERR_IO_PENDING)
235 return ERR_IO_PENDING;
236
237 DCHECK_GE(bytes_copied, 0);
238 bytes_copied += result;
239 }
240 current_position_ += bytes_copied;
241
242 // When invoked asynchronously, callback is the only way to return the result.
243 if (invoked_asynchronously)
244 callback.Run(bytes_copied);
245 return bytes_copied;
246 }
247
182 } // namespace net 248 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698