OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/base/upload_data_stream.h" | 5 #include "net/base/upload_data_stream.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "net/base/io_buffer.h" | 8 #include "net/base/io_buffer.h" |
9 #include "net/base/net_errors.h" | 9 #include "net/base/net_errors.h" |
10 #include "net/base/upload_element_reader.h" | 10 #include "net/base/upload_element_reader.h" |
11 | 11 |
12 namespace net { | 12 namespace net { |
13 | 13 |
14 bool UploadDataStream::merge_chunks_ = true; | 14 bool UploadDataStream::merge_chunks_ = true; |
15 | 15 |
16 // static | 16 // static |
17 void UploadDataStream::ResetMergeChunks() { | 17 void UploadDataStream::ResetMergeChunks() { |
18 // WARNING: merge_chunks_ must match the above initializer. | 18 // WARNING: merge_chunks_ must match the above initializer. |
19 merge_chunks_ = true; | 19 merge_chunks_ = true; |
20 } | 20 } |
21 | 21 |
22 UploadDataStream::UploadDataStream(UploadData* upload_data) | 22 UploadDataStream::UploadDataStream(UploadData* upload_data) |
23 : upload_data_(upload_data), | 23 : upload_data_(upload_data), |
24 element_index_(0), | 24 element_index_(0), |
25 total_size_(0), | 25 total_size_(0), |
26 current_position_(0), | 26 current_position_(0), |
27 initialized_successfully_(false), | 27 initialized_successfully_(false), |
28 weak_ptr_factory_(this) { | 28 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { |
29 const std::vector<UploadElement>& elements = *upload_data_->elements(); | 29 const std::vector<UploadElement>& elements = *upload_data_->elements(); |
30 for (size_t i = 0; i < elements.size(); ++i) | 30 for (size_t i = 0; i < elements.size(); ++i) |
31 element_readers_.push_back(UploadElementReader::Create(elements[i])); | 31 element_readers_.push_back(UploadElementReader::Create(elements[i])); |
32 } | 32 } |
33 | 33 |
34 UploadDataStream::~UploadDataStream() { | 34 UploadDataStream::~UploadDataStream() { |
35 } | 35 } |
36 | 36 |
37 int UploadDataStream::Init(const CompletionCallback& callback) { | 37 int UploadDataStream::Init(const CompletionCallback& callback) { |
38 DCHECK(!initialized_successfully_); | 38 DCHECK(!initialized_successfully_); |
(...skipping 16 matching lines...) Expand all Loading... |
55 if (result != OK) { | 55 if (result != OK) { |
56 element_readers_.clear(); | 56 element_readers_.clear(); |
57 return result; | 57 return result; |
58 } | 58 } |
59 } | 59 } |
60 | 60 |
61 FinalizeInitialization(); | 61 FinalizeInitialization(); |
62 return OK; | 62 return OK; |
63 } | 63 } |
64 | 64 |
65 int UploadDataStream::Read(IOBuffer* buf, int buf_len) { | 65 int UploadDataStream::Read(IOBuffer* buf, |
| 66 int buf_len, |
| 67 const CompletionCallback& callback) { |
66 DCHECK(initialized_successfully_); | 68 DCHECK(initialized_successfully_); |
| 69 DCHECK(!callback.is_null()); |
| 70 DCHECK_GT(buf_len, 0); |
| 71 |
| 72 // Process chunked data with ReadSync() since it requires special logic, and |
| 73 // is always in memory. |
| 74 if (is_chunked()) |
| 75 return ReadSync(buf, buf_len); |
| 76 |
| 77 const bool invoked_asynchronously = false; |
| 78 return ReadInternal(new DrainableIOBuffer(buf, buf_len), |
| 79 invoked_asynchronously, callback, 0); |
| 80 } |
| 81 |
| 82 int UploadDataStream::ReadSync(IOBuffer* buf, int buf_len) { |
| 83 DCHECK(initialized_successfully_); |
| 84 DCHECK_GT(buf_len, 0); |
67 | 85 |
68 // Initialize readers for newly appended chunks. | 86 // Initialize readers for newly appended chunks. |
69 if (is_chunked()) { | 87 if (is_chunked()) { |
70 const std::vector<UploadElement>& elements = *upload_data_->elements(); | 88 const std::vector<UploadElement>& elements = *upload_data_->elements(); |
71 DCHECK_LE(element_readers_.size(), elements.size()); | 89 DCHECK_LE(element_readers_.size(), elements.size()); |
72 | 90 |
73 for (size_t i = element_readers_.size(); i < elements.size(); ++i) { | 91 for (size_t i = element_readers_.size(); i < elements.size(); ++i) { |
| 92 // We can initialize readers synchronously here because only bytes can be |
| 93 // appended for chunked data. We leave |total_size_| at zero, since for |
| 94 // chunked uploads, we may not know the total size. |
74 const UploadElement& element = elements[i]; | 95 const UploadElement& element = elements[i]; |
75 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type()); | 96 DCHECK_EQ(UploadElement::TYPE_BYTES, element.type()); |
76 UploadElementReader* reader = UploadElementReader::Create(element); | 97 UploadElementReader* reader = UploadElementReader::Create(element); |
77 | 98 |
78 const int rv = reader->InitSync(); | 99 const int rv = reader->InitSync(); |
79 DCHECK_EQ(rv, OK); | 100 DCHECK_EQ(rv, OK); |
80 element_readers_.push_back(reader); | 101 element_readers_.push_back(reader); |
81 } | 102 } |
82 } | 103 } |
83 | 104 |
84 int bytes_copied = 0; | 105 int bytes_copied = 0; |
85 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) { | 106 while (bytes_copied < buf_len && element_index_ < element_readers_.size()) { |
86 UploadElementReader* reader = element_readers_[element_index_]; | 107 UploadElementReader* reader = element_readers_[element_index_]; |
87 bytes_copied += reader->ReadSync(buf->data() + bytes_copied, | 108 scoped_refptr<DrainableIOBuffer> sub_buffer = |
88 buf_len - bytes_copied); | 109 new DrainableIOBuffer(buf, buf_len); |
| 110 sub_buffer->SetOffset(bytes_copied); |
| 111 bytes_copied += reader->ReadSync(sub_buffer, sub_buffer->BytesRemaining()); |
89 if (reader->BytesRemaining() == 0) | 112 if (reader->BytesRemaining() == 0) |
90 ++element_index_; | 113 ++element_index_; |
91 | 114 |
92 if (is_chunked() && !merge_chunks_) | 115 if (is_chunked() && !merge_chunks_) |
93 break; | 116 break; |
94 } | 117 } |
95 | 118 |
96 current_position_ += bytes_copied; | 119 current_position_ += bytes_copied; |
97 if (is_chunked() && !IsEOF() && bytes_copied == 0) | 120 if (is_chunked() && !IsEOF() && bytes_copied == 0) |
98 return ERR_IO_PENDING; | 121 return ERR_IO_PENDING; |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
172 uint64 total_size = 0; | 195 uint64 total_size = 0; |
173 for (size_t i = 0; i < element_readers_.size(); ++i) { | 196 for (size_t i = 0; i < element_readers_.size(); ++i) { |
174 UploadElementReader* reader = element_readers_[i]; | 197 UploadElementReader* reader = element_readers_[i]; |
175 total_size += reader->GetContentLength(); | 198 total_size += reader->GetContentLength(); |
176 } | 199 } |
177 total_size_ = total_size; | 200 total_size_ = total_size; |
178 } | 201 } |
179 initialized_successfully_ = true; | 202 initialized_successfully_ = true; |
180 } | 203 } |
181 | 204 |
| 205 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf, |
| 206 bool invoked_asynchronously, |
| 207 const CompletionCallback& callback, |
| 208 int previous_result) { |
| 209 DCHECK(initialized_successfully_); |
| 210 DCHECK_GE(previous_result, 0); |
| 211 |
| 212 // Add the last result. |
| 213 buf->DidConsume(previous_result); |
| 214 |
| 215 while (element_index_ < element_readers_.size()) { |
| 216 UploadElementReader* reader = element_readers_[element_index_]; |
| 217 |
| 218 if (reader->BytesRemaining() == 0) { |
| 219 ++element_index_; |
| 220 continue; |
| 221 } |
| 222 |
| 223 if (buf->BytesRemaining() == 0) |
| 224 break; |
| 225 |
| 226 const int result = reader->Read( |
| 227 buf, |
| 228 buf->BytesRemaining(), |
| 229 base::Bind(base::IgnoreResult(&UploadDataStream::ReadInternal), |
| 230 weak_ptr_factory_.GetWeakPtr(), |
| 231 buf, |
| 232 true, // invoked_asynchronously |
| 233 callback)); |
| 234 if (result == ERR_IO_PENDING) |
| 235 return ERR_IO_PENDING; |
| 236 DCHECK_GE(result, 0); |
| 237 buf->DidConsume(result); |
| 238 } |
| 239 |
| 240 const int bytes_copied = buf->BytesConsumed(); |
| 241 current_position_ += bytes_copied; |
| 242 |
| 243 // When invoked asynchronously, callback is the only way to return the result. |
| 244 if (invoked_asynchronously) |
| 245 callback.Run(bytes_copied); |
| 246 return bytes_copied; |
| 247 } |
| 248 |
182 } // namespace net | 249 } // namespace net |
OLD | NEW |