Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Side by Side Diff: net/base/upload_file_element_reader.cc

Issue 10910268: net: Make UploadDataStream::Read() asynchronous (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add comments about chunk upload behavior Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/base/upload_file_element_reader.h" 5 #include "net/base/upload_file_element_reader.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/file_util.h" 8 #include "base/file_util.h"
9 #include "base/location.h" 9 #include "base/location.h"
10 #include "base/threading/thread_restrictions.h" 10 #include "base/threading/thread_restrictions.h"
11 #include "base/threading/worker_pool.h" 11 #include "base/threading/worker_pool.h"
12 #include "net/base/file_stream.h" 12 #include "net/base/file_stream.h"
13 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h" 14 #include "net/base/net_errors.h"
14 15
15 namespace net { 16 namespace net {
16 17
17 namespace { 18 namespace {
18 19
19 // In tests, this value is used to override the return value of 20 // In tests, this value is used to override the return value of
20 // UploadFileElementReader::GetContentLength() when set to non-zero. 21 // UploadFileElementReader::GetContentLength() when set to non-zero.
21 uint64 overriding_content_length = 0; 22 uint64 overriding_content_length = 0;
22 23
23 // This method is used to implement Init(). 24 // This function is used to implement Init().
24 void InitInternal(const FilePath& path, 25 void InitInternal(const FilePath& path,
25 uint64 range_offset, 26 uint64 range_offset,
26 uint64 range_length, 27 uint64 range_length,
27 const base::Time& expected_modification_time, 28 const base::Time& expected_modification_time,
28 scoped_ptr<FileStream>* out_file_stream, 29 scoped_ptr<FileStream>* out_file_stream,
29 uint64* out_content_length, 30 uint64* out_content_length,
30 int* out_result) { 31 int* out_result) {
31 scoped_ptr<FileStream> file_stream(new FileStream(NULL)); 32 scoped_ptr<FileStream> file_stream(new FileStream(NULL));
32 int64 rv = file_stream->OpenSync( 33 int64 rv = file_stream->OpenSync(
33 path, base::PLATFORM_FILE_OPEN | base::PLATFORM_FILE_READ); 34 path, base::PLATFORM_FILE_OPEN | base::PLATFORM_FILE_READ);
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
65 if (file_util::GetFileInfo(path, &info) && 66 if (file_util::GetFileInfo(path, &info) &&
66 expected_modification_time.ToTimeT() != info.last_modified.ToTimeT()) { 67 expected_modification_time.ToTimeT() != info.last_modified.ToTimeT()) {
67 *out_result = ERR_UPLOAD_FILE_CHANGED; 68 *out_result = ERR_UPLOAD_FILE_CHANGED;
68 return; 69 return;
69 } 70 }
70 } 71 }
71 72
72 *out_result = OK; 73 *out_result = OK;
73 } 74 }
74 75
76 // This function is used to implement Read().
77 void ReadInternal(scoped_refptr<IOBuffer> buf,
78 int buf_length,
79 uint64 bytes_remaining,
80 FileStream* file_stream,
81 int* result) {
82 DCHECK_LT(0, buf_length);
83
84 const uint64 num_bytes_to_read =
85 std::min(bytes_remaining, static_cast<uint64>(buf_length));
86
87 if (num_bytes_to_read > 0) {
88 int num_bytes_consumed = 0;
89 // file_stream is NULL if the target file is
mmenke 2012/10/15 19:54:45 nit: |file_stream|.
hashimoto 2012/10/16 11:52:20 I don't think pipes are necessary here.
90 // missing or not readable.
91 if (file_stream) {
92 num_bytes_consumed = file_stream->ReadSync(buf->data(),
93 num_bytes_to_read);
94 }
95 if (num_bytes_consumed <= 0) {
96 // If there's less data to read than we initially observed, then
97 // pad with zero. Otherwise the server will hang waiting for the
98 // rest of the data.
99 memset(buf->data(), 0, num_bytes_to_read);
100 }
101 }
102 *result = num_bytes_to_read;
103 }
104
75 } // namespace 105 } // namespace
76 106
77 UploadFileElementReader::UploadFileElementReader( 107 UploadFileElementReader::UploadFileElementReader(
78 const FilePath& path, 108 const FilePath& path,
79 uint64 range_offset, 109 uint64 range_offset,
80 uint64 range_length, 110 uint64 range_length,
81 const base::Time& expected_modification_time) 111 const base::Time& expected_modification_time)
82 : path_(path), 112 : path_(path),
83 range_offset_(range_offset), 113 range_offset_(range_offset),
84 range_length_(range_length), 114 range_length_(range_length),
85 expected_modification_time_(expected_modification_time), 115 expected_modification_time_(expected_modification_time),
86 content_length_(0), 116 content_length_(0),
87 bytes_remaining_(0), 117 bytes_remaining_(0),
88 weak_ptr_factory_(this) { 118 weak_ptr_factory_(this) {
89 } 119 }
90 120
91 UploadFileElementReader::~UploadFileElementReader() { 121 UploadFileElementReader::~UploadFileElementReader() {
92 if (file_stream_.get()) { 122 if (file_stream_.get()) {
93 base::WorkerPool::PostTask(FROM_HERE, 123 base::WorkerPool::PostTask(FROM_HERE,
94 base::Bind(&base::DeletePointer<FileStream>, 124 base::Bind(&base::DeletePointer<FileStream>,
95 file_stream_.release()), 125 file_stream_.release()),
mmenke 2012/10/15 21:34:01 Are we leaking stuff here? It looks like FileStre
hashimoto 2012/10/16 11:52:20 UploadFileElementReader is using the constructor t
mmenke 2012/10/16 19:13:41 Ahhh...right. For some reason, I was thinking bot
96 true /* task_is_slow */); 126 true /* task_is_slow */);
97 } 127 }
98 } 128 }
99 129
100 int UploadFileElementReader::Init(const CompletionCallback& callback) { 130 int UploadFileElementReader::Init(const CompletionCallback& callback) {
101 scoped_ptr<FileStream>* file_stream = new scoped_ptr<FileStream>; 131 scoped_ptr<FileStream>* file_stream = new scoped_ptr<FileStream>;
102 uint64* content_length = new uint64; 132 uint64* content_length = new uint64;
103 int* result = new int; 133 int* result = new int;
104 const bool posted = base::WorkerPool::PostTaskAndReply( 134 const bool posted = base::WorkerPool::PostTaskAndReply(
105 FROM_HERE, 135 FROM_HERE,
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 uint64 UploadFileElementReader::GetContentLength() const { 168 uint64 UploadFileElementReader::GetContentLength() const {
139 if (overriding_content_length) 169 if (overriding_content_length)
140 return overriding_content_length; 170 return overriding_content_length;
141 return content_length_; 171 return content_length_;
142 } 172 }
143 173
144 uint64 UploadFileElementReader::BytesRemaining() const { 174 uint64 UploadFileElementReader::BytesRemaining() const {
145 return bytes_remaining_; 175 return bytes_remaining_;
146 } 176 }
147 177
148 int UploadFileElementReader::ReadSync(char* buf, int buf_length) { 178 int UploadFileElementReader::Read(IOBuffer* buf,
179 int buf_length,
180 const CompletionCallback& callback) {
mmenke 2012/10/15 19:54:45 DCHECK(!callback.is_null());
hashimoto 2012/10/16 11:52:20 Done.
181 if (BytesRemaining() == 0)
182 return 0;
183
184 // Save the value of file_stream_.get() before base::Passed() invalidates it.
185 FileStream* file_stream_ptr = file_stream_.get();
186 int* result = new int;
187 // Pass the ownership of file_stream_ to the worker pool to safely perform
188 // operation even when |this| is destructed before the read completes.
189 const bool posted = base::WorkerPool::PostTaskAndReply(
190 FROM_HERE,
191 base::Bind(&ReadInternal,
192 scoped_refptr<IOBuffer>(buf),
193 buf_length,
194 BytesRemaining(),
195 file_stream_ptr,
196 result),
197 base::Bind(&UploadFileElementReader::OnReadCompleted,
198 weak_ptr_factory_.GetWeakPtr(),
199 base::Passed(&file_stream_),
200 base::Owned(result),
201 callback),
202 true /* task_is_slow */);
203 DCHECK(posted);
204 return ERR_IO_PENDING;
205 }
206
207 int UploadFileElementReader::ReadSync(IOBuffer* buf, int buf_length) {
149 // Temporarily allow until fix: http://crbug.com/72001. 208 // Temporarily allow until fix: http://crbug.com/72001.
150 base::ThreadRestrictions::ScopedAllowIO allow_io; 209 base::ThreadRestrictions::ScopedAllowIO allow_io;
151 DCHECK_LT(0, buf_length); 210 int result = 0;
152 211 ReadInternal(buf, buf_length, BytesRemaining(), file_stream_.get(), &result);
153 const uint64 num_bytes_to_read = 212 OnReadCompleted(file_stream_.Pass(), &result, CompletionCallback());
154 static_cast<int>(std::min(BytesRemaining(), 213 return result;
155 static_cast<uint64>(buf_length)));
156 if (num_bytes_to_read > 0) {
157 int num_bytes_consumed = 0;
158 // file_stream_ is NULL if the target file is
159 // missing or not readable.
160 if (file_stream_.get()) {
161 num_bytes_consumed =
162 file_stream_->ReadSync(buf, num_bytes_to_read);
163 }
164 if (num_bytes_consumed <= 0) {
165 // If there's less data to read than we initially observed, then
166 // pad with zero. Otherwise the server will hang waiting for the
167 // rest of the data.
168 memset(buf, 0, num_bytes_to_read);
169 }
170 }
171 DCHECK_GE(bytes_remaining_, num_bytes_to_read);
172 bytes_remaining_ -= num_bytes_to_read;
173 return num_bytes_to_read;
174 } 214 }
175 215
176 void UploadFileElementReader::OnInitCompleted( 216 void UploadFileElementReader::OnInitCompleted(
177 scoped_ptr<FileStream>* file_stream, 217 scoped_ptr<FileStream>* file_stream,
178 uint64* content_length, 218 uint64* content_length,
179 int* result, 219 int* result,
180 const CompletionCallback& callback) { 220 const CompletionCallback& callback) {
181 file_stream_.swap(*file_stream); 221 file_stream_.swap(*file_stream);
182 content_length_ = *content_length; 222 content_length_ = *content_length;
183 bytes_remaining_ = GetContentLength(); 223 bytes_remaining_ = GetContentLength();
184 if (!callback.is_null()) 224 if (!callback.is_null())
185 callback.Run(*result); 225 callback.Run(*result);
186 } 226 }
187 227
228 void UploadFileElementReader::OnReadCompleted(
229 scoped_ptr<FileStream> file_stream,
230 int* result,
231 const CompletionCallback& callback) {
232 file_stream_.swap(file_stream);
233 DCHECK_GE(static_cast<int>(bytes_remaining_), *result);
234 bytes_remaining_ -= *result;
235 if (!callback.is_null())
236 callback.Run(*result);
237 }
238
188 UploadFileElementReader::ScopedOverridingContentLengthForTests:: 239 UploadFileElementReader::ScopedOverridingContentLengthForTests::
189 ScopedOverridingContentLengthForTests(uint64 value) { 240 ScopedOverridingContentLengthForTests(uint64 value) {
190 overriding_content_length = value; 241 overriding_content_length = value;
191 } 242 }
192 243
193 UploadFileElementReader::ScopedOverridingContentLengthForTests:: 244 UploadFileElementReader::ScopedOverridingContentLengthForTests::
194 ~ScopedOverridingContentLengthForTests() { 245 ~ScopedOverridingContentLengthForTests() {
195 overriding_content_length = 0; 246 overriding_content_length = 0;
196 } 247 }
197 248
198 } // namespace net 249 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698