OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/browser/byte_stream.h" | 5 #include "content/browser/byte_stream.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | 8 #include "base/location.h" |
9 #include "base/memory/ref_counted.h" | 9 #include "base/memory/ref_counted.h" |
10 #include "base/memory/weak_ptr.h" | 10 #include "base/memory/weak_ptr.h" |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
49 | 49 |
50 // Must be called before any operations are performed. | 50 // Must be called before any operations are performed. |
51 void SetPeer(ByteStreamReaderImpl* peer, | 51 void SetPeer(ByteStreamReaderImpl* peer, |
52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
54 | 54 |
55 // Overridden from ByteStreamWriter. | 55 // Overridden from ByteStreamWriter. |
56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
57 size_t byte_count) OVERRIDE; | 57 size_t byte_count) OVERRIDE; |
58 virtual void Flush() OVERRIDE; | 58 virtual void Flush() OVERRIDE; |
59 virtual void Close(DownloadInterruptReason status) OVERRIDE; | 59 virtual void Close(int status) OVERRIDE; |
60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | 60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; |
61 | 61 |
62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | 62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | 63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, |
64 ByteStreamWriterImpl* target, | 64 ByteStreamWriterImpl* target, |
65 size_t bytes_consumed); | 65 size_t bytes_consumed); |
66 | 66 |
67 private: | 67 private: |
68 // Called from UpdateWindow when object existence has been validated. | 68 // Called from UpdateWindow when object existence has been validated. |
69 void UpdateWindowInternal(size_t bytes_consumed); | 69 void UpdateWindowInternal(size_t bytes_consumed); |
70 | 70 |
71 void PostToPeer(bool complete, DownloadInterruptReason status); | 71 void PostToPeer(bool complete, int status); |
72 | 72 |
73 const size_t total_buffer_size_; | 73 const size_t total_buffer_size_; |
74 | 74 |
75 // All data objects in this class are only valid to access on | 75 // All data objects in this class are only valid to access on |
76 // this task runner except as otherwise noted. | 76 // this task runner except as otherwise noted. |
77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
78 | 78 |
79 // True while this object is alive. | 79 // True while this object is alive. |
80 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 80 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
81 | 81 |
(...skipping 25 matching lines...) Expand all Loading... |
107 virtual ~ByteStreamReaderImpl(); | 107 virtual ~ByteStreamReaderImpl(); |
108 | 108 |
109 // Must be called before any operations are performed. | 109 // Must be called before any operations are performed. |
110 void SetPeer(ByteStreamWriterImpl* peer, | 110 void SetPeer(ByteStreamWriterImpl* peer, |
111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
112 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 112 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
113 | 113 |
114 // Overridden from ByteStreamReader. | 114 // Overridden from ByteStreamReader. |
115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
116 size_t* length) OVERRIDE; | 116 size_t* length) OVERRIDE; |
117 virtual DownloadInterruptReason GetStatus() const OVERRIDE; | 117 virtual int GetStatus() const OVERRIDE; |
118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | 118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; |
119 | 119 |
120 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and | 120 // PostTask target from |ByteStreamWriterImpl::Write| and |
121 // |ByteStreamWriterImpl::Close|. | 121 // |ByteStreamWriterImpl::Close|. |
122 // Receive data from our peer. | 122 // Receive data from our peer. |
123 // static because it may be called after the object it is targeting | 123 // static because it may be called after the object it is targeting |
124 // has been destroyed. It may not access |*target| | 124 // has been destroyed. It may not access |*target| |
125 // if |*object_lifetime_flag| is false. | 125 // if |*object_lifetime_flag| is false. |
126 static void TransferData( | 126 static void TransferData( |
127 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 127 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
128 ByteStreamReaderImpl* target, | 128 ByteStreamReaderImpl* target, |
129 scoped_ptr<ContentVector> transfer_buffer, | 129 scoped_ptr<ContentVector> transfer_buffer, |
130 size_t transfer_buffer_bytes, | 130 size_t transfer_buffer_bytes, |
131 bool source_complete, | 131 bool source_complete, |
132 DownloadInterruptReason status); | 132 int status); |
133 | 133 |
134 private: | 134 private: |
135 // Called from TransferData once object existence has been validated. | 135 // Called from TransferData once object existence has been validated. |
136 void TransferDataInternal( | 136 void TransferDataInternal( |
137 scoped_ptr<ContentVector> transfer_buffer, | 137 scoped_ptr<ContentVector> transfer_buffer, |
138 size_t transfer_buffer_bytes, | 138 size_t transfer_buffer_bytes, |
139 bool source_complete, | 139 bool source_complete, |
140 DownloadInterruptReason status); | 140 int status); |
141 | 141 |
142 void MaybeUpdateInput(); | 142 void MaybeUpdateInput(); |
143 | 143 |
144 const size_t total_buffer_size_; | 144 const size_t total_buffer_size_; |
145 | 145 |
146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
147 | 147 |
148 // True while this object is alive. | 148 // True while this object is alive. |
149 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 149 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
150 | 150 |
151 ContentVector available_contents_; | 151 ContentVector available_contents_; |
152 | 152 |
153 bool received_status_; | 153 bool received_status_; |
154 DownloadInterruptReason status_; | 154 int status_; |
155 | 155 |
156 base::Closure data_available_callback_; | 156 base::Closure data_available_callback_; |
157 | 157 |
158 // Time of last point at which data in stream transitioned from full | 158 // Time of last point at which data in stream transitioned from full |
159 // to non-full. Nulled when a callback is sent. | 159 // to non-full. Nulled when a callback is sent. |
160 base::Time last_non_full_time_; | 160 base::Time last_non_full_time_; |
161 | 161 |
162 // ** Peer information | 162 // ** Peer information |
163 | 163 |
164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | 164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
204 | 204 |
205 bool ByteStreamWriterImpl::Write( | 205 bool ByteStreamWriterImpl::Write( |
206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | 206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
208 | 208 |
209 input_contents_.push_back(std::make_pair(buffer, byte_count)); | 209 input_contents_.push_back(std::make_pair(buffer, byte_count)); |
210 input_contents_size_ += byte_count; | 210 input_contents_size_ += byte_count; |
211 | 211 |
212 // Arbitrarily, we buffer to a third of the total size before sending. | 212 // Arbitrarily, we buffer to a third of the total size before sending. |
213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | 213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) |
214 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | 214 PostToPeer(false, 0); |
215 | 215 |
216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | 216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
217 } | 217 } |
218 | 218 |
219 void ByteStreamWriterImpl::Flush() { | 219 void ByteStreamWriterImpl::Flush() { |
220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
221 if (input_contents_size_ > 0) | 221 if (input_contents_size_ > 0) |
222 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | 222 PostToPeer(false, 0); |
223 } | 223 } |
224 | 224 |
225 void ByteStreamWriterImpl::Close( | 225 void ByteStreamWriterImpl::Close(int status) { |
226 DownloadInterruptReason status) { | |
227 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
228 PostToPeer(true, status); | 227 PostToPeer(true, status); |
229 } | 228 } |
230 | 229 |
231 void ByteStreamWriterImpl::RegisterCallback( | 230 void ByteStreamWriterImpl::RegisterCallback( |
232 const base::Closure& source_callback) { | 231 const base::Closure& source_callback) { |
233 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 232 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
234 space_available_callback_ = source_callback; | 233 space_available_callback_ = source_callback; |
235 } | 234 } |
236 | 235 |
(...skipping 15 matching lines...) Expand all Loading... |
252 // Callback if we were above the limit and we're now <= to it. | 251 // Callback if we were above the limit and we're now <= to it. |
253 size_t total_known_size_used = | 252 size_t total_known_size_used = |
254 input_contents_size_ + output_size_used_; | 253 input_contents_size_ + output_size_used_; |
255 | 254 |
256 if (total_known_size_used <= total_buffer_size_ && | 255 if (total_known_size_used <= total_buffer_size_ && |
257 (total_known_size_used + bytes_consumed > total_buffer_size_) && | 256 (total_known_size_used + bytes_consumed > total_buffer_size_) && |
258 !space_available_callback_.is_null()) | 257 !space_available_callback_.is_null()) |
259 space_available_callback_.Run(); | 258 space_available_callback_.Run(); |
260 } | 259 } |
261 | 260 |
262 void ByteStreamWriterImpl::PostToPeer( | 261 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { |
263 bool complete, DownloadInterruptReason status) { | |
264 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 262 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
265 // Valid contexts in which to call. | 263 // Valid contexts in which to call. |
266 DCHECK(complete || 0 != input_contents_size_); | 264 DCHECK(complete || 0 != input_contents_size_); |
267 | 265 |
268 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); | 266 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); |
269 size_t buffer_size = 0; | 267 size_t buffer_size = 0; |
270 if (0 != input_contents_size_) { | 268 if (0 != input_contents_size_) { |
271 transfer_buffer.reset(new ContentVector); | 269 transfer_buffer.reset(new ContentVector); |
272 transfer_buffer->swap(input_contents_); | 270 transfer_buffer->swap(input_contents_); |
273 buffer_size = input_contents_size_; | 271 buffer_size = input_contents_size_; |
(...skipping 12 matching lines...) Expand all Loading... |
286 } | 284 } |
287 | 285 |
288 ByteStreamReaderImpl::ByteStreamReaderImpl( | 286 ByteStreamReaderImpl::ByteStreamReaderImpl( |
289 scoped_refptr<base::SequencedTaskRunner> task_runner, | 287 scoped_refptr<base::SequencedTaskRunner> task_runner, |
290 scoped_refptr<LifetimeFlag> lifetime_flag, | 288 scoped_refptr<LifetimeFlag> lifetime_flag, |
291 size_t buffer_size) | 289 size_t buffer_size) |
292 : total_buffer_size_(buffer_size), | 290 : total_buffer_size_(buffer_size), |
293 my_task_runner_(task_runner), | 291 my_task_runner_(task_runner), |
294 my_lifetime_flag_(lifetime_flag), | 292 my_lifetime_flag_(lifetime_flag), |
295 received_status_(false), | 293 received_status_(false), |
296 status_(DOWNLOAD_INTERRUPT_REASON_NONE), | 294 status_(0), |
297 unreported_consumed_bytes_(0), | 295 unreported_consumed_bytes_(0), |
298 peer_(NULL) { | 296 peer_(NULL) { |
299 DCHECK(my_lifetime_flag_.get()); | 297 DCHECK(my_lifetime_flag_.get()); |
300 my_lifetime_flag_->is_alive = true; | 298 my_lifetime_flag_->is_alive = true; |
301 } | 299 } |
302 | 300 |
303 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | 301 ByteStreamReaderImpl::~ByteStreamReaderImpl() { |
304 my_lifetime_flag_->is_alive = false; | 302 my_lifetime_flag_->is_alive = false; |
305 } | 303 } |
306 | 304 |
(...skipping 19 matching lines...) Expand all Loading... |
326 | 324 |
327 MaybeUpdateInput(); | 325 MaybeUpdateInput(); |
328 return STREAM_HAS_DATA; | 326 return STREAM_HAS_DATA; |
329 } | 327 } |
330 if (received_status_) { | 328 if (received_status_) { |
331 return STREAM_COMPLETE; | 329 return STREAM_COMPLETE; |
332 } | 330 } |
333 return STREAM_EMPTY; | 331 return STREAM_EMPTY; |
334 } | 332 } |
335 | 333 |
336 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { | 334 int ByteStreamReaderImpl::GetStatus() const { |
337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 335 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
338 DCHECK(received_status_); | 336 DCHECK(received_status_); |
339 return status_; | 337 return status_; |
340 } | 338 } |
341 | 339 |
342 void ByteStreamReaderImpl::RegisterCallback( | 340 void ByteStreamReaderImpl::RegisterCallback( |
343 const base::Closure& sink_callback) { | 341 const base::Closure& sink_callback) { |
344 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 342 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
345 | 343 |
346 data_available_callback_ = sink_callback; | 344 data_available_callback_ = sink_callback; |
347 } | 345 } |
348 | 346 |
349 // static | 347 // static |
350 void ByteStreamReaderImpl::TransferData( | 348 void ByteStreamReaderImpl::TransferData( |
351 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 349 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
352 ByteStreamReaderImpl* target, | 350 ByteStreamReaderImpl* target, |
353 scoped_ptr<ContentVector> transfer_buffer, | 351 scoped_ptr<ContentVector> transfer_buffer, |
354 size_t buffer_size, | 352 size_t buffer_size, |
355 bool source_complete, | 353 bool source_complete, |
356 DownloadInterruptReason status) { | 354 int status) { |
357 // If our target is no longer alive, do nothing. | 355 // If our target is no longer alive, do nothing. |
358 if (!object_lifetime_flag->is_alive) return; | 356 if (!object_lifetime_flag->is_alive) return; |
359 | 357 |
360 target->TransferDataInternal( | 358 target->TransferDataInternal( |
361 transfer_buffer.Pass(), buffer_size, source_complete, status); | 359 transfer_buffer.Pass(), buffer_size, source_complete, status); |
362 } | 360 } |
363 | 361 |
364 void ByteStreamReaderImpl::TransferDataInternal( | 362 void ByteStreamReaderImpl::TransferDataInternal( |
365 scoped_ptr<ContentVector> transfer_buffer, | 363 scoped_ptr<ContentVector> transfer_buffer, |
366 size_t buffer_size, | 364 size_t buffer_size, |
367 bool source_complete, | 365 bool source_complete, |
368 DownloadInterruptReason status) { | 366 int status) { |
369 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 367 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
370 | 368 |
371 bool was_empty = available_contents_.empty(); | 369 bool was_empty = available_contents_.empty(); |
372 | 370 |
373 if (transfer_buffer) { | 371 if (transfer_buffer) { |
374 available_contents_.insert(available_contents_.end(), | 372 available_contents_.insert(available_contents_.end(), |
375 transfer_buffer->begin(), | 373 transfer_buffer->begin(), |
376 transfer_buffer->end()); | 374 transfer_buffer->end()); |
377 } | 375 } |
378 | 376 |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
432 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | 430 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( |
433 output_task_runner, output_flag, buffer_size); | 431 output_task_runner, output_flag, buffer_size); |
434 | 432 |
435 in->SetPeer(out, output_task_runner, output_flag); | 433 in->SetPeer(out, output_task_runner, output_flag); |
436 out->SetPeer(in, input_task_runner, input_flag); | 434 out->SetPeer(in, input_task_runner, input_flag); |
437 input->reset(in); | 435 input->reset(in); |
438 output->reset(out); | 436 output->reset(out); |
439 } | 437 } |
440 | 438 |
441 } // namespace content | 439 } // namespace content |
OLD | NEW |