OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "content/browser/download/byte_stream.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/location.h" | |
9 #include "base/memory/weak_ptr.h" | |
10 #include "base/memory/ref_counted.h" | |
11 #include "base/sequenced_task_runner.h" | |
12 | |
13 namespace content { | |
14 namespace { | |
15 | |
16 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> > | |
17 ContentVector; | |
18 | |
19 class ByteStreamReaderImpl; | |
20 | |
21 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be | |
22 // cleared in an object destructor and accessed to check for object | |
23 // existence. We can't use weak pointers because they're tightly tied to | |
24 // threads rather than task runners. | |
25 // TODO(rdsmith): A better solution would be extending weak pointers | |
26 // to support SequencedTaskRunners. | |
27 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { | |
28 public: | |
29 LifetimeFlag() : is_alive(true) { } | |
30 bool is_alive; | |
31 | |
32 protected: | |
33 friend class base::RefCountedThreadSafe<LifetimeFlag>; | |
34 virtual ~LifetimeFlag() { } | |
35 | |
36 private: | |
37 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag); | |
38 }; | |
39 | |
40 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and | |
41 // SetPeer may happen anywhere; all other operations on each class must | |
42 // happen in the context of their SequencedTaskRunner. | |
43 class ByteStreamWriterImpl : public ByteStreamWriter { | |
44 public: | |
45 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
46 scoped_refptr<LifetimeFlag> lifetime_flag, | |
47 size_t buffer_size); | |
48 virtual ~ByteStreamWriterImpl(); | |
49 | |
50 // Must be called before any operations are performed. | |
51 void SetPeer(ByteStreamReaderImpl* peer, | |
52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
54 | |
55 // Overridden from ByteStreamWriter. | |
56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | |
57 size_t byte_count) OVERRIDE; | |
58 virtual void Close(DownloadInterruptReason status) OVERRIDE; | |
59 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | |
60 | |
61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | |
62 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | |
63 ByteStreamWriterImpl* target, | |
64 size_t bytes_consumed); | |
65 | |
66 private: | |
67 // Called from UpdateWindow when object existence has been validated. | |
68 void UpdateWindowInternal(size_t bytes_consumed); | |
69 | |
70 void PostToPeer(bool complete, DownloadInterruptReason status); | |
71 | |
72 const size_t total_buffer_size_; | |
73 | |
74 // All data objects in this class are only valid to access on | |
75 // this task runner except as otherwise noted. | |
76 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
77 | |
78 // True while this object is alive. | |
79 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
80 | |
81 base::Closure space_available_callback_; | |
82 ContentVector input_contents_; | |
83 size_t input_contents_size_; | |
84 | |
85 // ** Peer information. | |
86 | |
87 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
88 | |
89 // How much we've sent to the output that for flow control purposes we | |
90 // must assume hasn't been read yet. | |
91 size_t output_size_used_; | |
92 | |
93 // Only valid to access on peer_task_runner_. | |
94 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
95 | |
96 // Only valid to access on peer_task_runner_ if | |
97 // |*peer_lifetime_flag_ == true| | |
98 ByteStreamReaderImpl* peer_; | |
99 }; | |
100 | |
101 class ByteStreamReaderImpl : public ByteStreamReader { | |
102 public: | |
103 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, | |
104 scoped_refptr<LifetimeFlag> lifetime_flag, | |
105 size_t buffer_size); | |
106 virtual ~ByteStreamReaderImpl(); | |
107 | |
108 // Must be called before any operations are performed. | |
109 void SetPeer(ByteStreamWriterImpl* peer, | |
110 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
111 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | |
112 | |
113 // Overridden from ByteStreamReader. | |
114 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | |
115 size_t* length) OVERRIDE; | |
116 virtual DownloadInterruptReason GetStatus() const OVERRIDE; | |
117 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | |
118 | |
119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and | |
120 // |ByteStreamWriterImpl::Close|. | |
121 // Receive data from our peer. | |
122 // static because it may be called after the object it is targeting | |
123 // has been destroyed. It may not access |*target| | |
124 // if |*object_lifetime_flag| is false. | |
125 static void TransferData( | |
126 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
127 ByteStreamReaderImpl* target, | |
128 scoped_ptr<ContentVector> transfer_buffer, | |
129 size_t transfer_buffer_bytes, | |
130 bool source_complete, | |
131 DownloadInterruptReason status); | |
132 | |
133 private: | |
134 // Called from TransferData once object existence has been validated. | |
135 void TransferDataInternal( | |
136 scoped_ptr<ContentVector> transfer_buffer, | |
137 size_t transfer_buffer_bytes, | |
138 bool source_complete, | |
139 DownloadInterruptReason status); | |
140 | |
141 void MaybeUpdateInput(); | |
142 | |
143 const size_t total_buffer_size_; | |
144 | |
145 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | |
146 | |
147 // True while this object is alive. | |
148 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | |
149 | |
150 ContentVector available_contents_; | |
151 | |
152 bool received_status_; | |
153 DownloadInterruptReason status_; | |
154 | |
155 base::Closure data_available_callback_; | |
156 | |
157 // Time of last point at which data in stream transitioned from full | |
158 // to non-full. Nulled when a callback is sent. | |
159 base::Time last_non_full_time_; | |
160 | |
161 // ** Peer information | |
162 | |
163 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | |
164 | |
165 // How much has been removed from this class that we haven't told | |
166 // the input about yet. | |
167 size_t unreported_consumed_bytes_; | |
168 | |
169 // Only valid to access on peer_task_runner_. | |
170 scoped_refptr<LifetimeFlag> peer_lifetime_flag_; | |
171 | |
172 // Only valid to access on peer_task_runner_ if | |
173 // |*peer_lifetime_flag_ == true| | |
174 ByteStreamWriterImpl* peer_; | |
175 }; | |
176 | |
177 ByteStreamWriterImpl::ByteStreamWriterImpl( | |
178 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
179 scoped_refptr<LifetimeFlag> lifetime_flag, | |
180 size_t buffer_size) | |
181 : total_buffer_size_(buffer_size), | |
182 my_task_runner_(task_runner), | |
183 my_lifetime_flag_(lifetime_flag), | |
184 input_contents_size_(0), | |
185 output_size_used_(0), | |
186 peer_(NULL) { | |
187 DCHECK(my_lifetime_flag_.get()); | |
188 my_lifetime_flag_->is_alive = true; | |
189 } | |
190 | |
191 ByteStreamWriterImpl::~ByteStreamWriterImpl() { | |
192 my_lifetime_flag_->is_alive = false; | |
193 } | |
194 | |
195 void ByteStreamWriterImpl::SetPeer( | |
196 ByteStreamReaderImpl* peer, | |
197 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
198 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
199 peer_ = peer; | |
200 peer_task_runner_ = peer_task_runner; | |
201 peer_lifetime_flag_ = peer_lifetime_flag; | |
202 } | |
203 | |
204 bool ByteStreamWriterImpl::Write( | |
205 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | |
206 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
207 | |
208 input_contents_.push_back(std::make_pair(buffer, byte_count)); | |
209 input_contents_size_ += byte_count; | |
210 | |
211 // Arbitrarily, we buffer to a third of the total size before sending. | |
212 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | |
213 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | |
214 | |
215 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | |
216 } | |
217 | |
218 void ByteStreamWriterImpl::Close( | |
219 DownloadInterruptReason status) { | |
220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
221 PostToPeer(true, status); | |
222 } | |
223 | |
224 void ByteStreamWriterImpl::RegisterCallback( | |
225 const base::Closure& source_callback) { | |
226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
227 space_available_callback_ = source_callback; | |
228 } | |
229 | |
230 // static | |
231 void ByteStreamWriterImpl::UpdateWindow( | |
232 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, | |
233 size_t bytes_consumed) { | |
234 // If the target object isn't alive anymore, we do nothing. | |
235 if (!lifetime_flag->is_alive) return; | |
236 | |
237 target->UpdateWindowInternal(bytes_consumed); | |
238 } | |
239 | |
240 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { | |
241 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
242 DCHECK_GE(output_size_used_, bytes_consumed); | |
243 output_size_used_ -= bytes_consumed; | |
244 | |
245 // Callback if we were above the limit and we're now <= to it. | |
246 size_t total_known_size_used = | |
247 input_contents_size_ + output_size_used_; | |
248 | |
249 if (total_known_size_used <= total_buffer_size_ && | |
250 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
251 !space_available_callback_.is_null()) | |
252 space_available_callback_.Run(); | |
253 } | |
254 | |
255 void ByteStreamWriterImpl::PostToPeer( | |
256 bool complete, DownloadInterruptReason status) { | |
257 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
258 // Valid contexts in which to call. | |
259 DCHECK(complete || 0 != input_contents_size_); | |
260 | |
261 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); | |
262 size_t buffer_size = 0; | |
263 if (0 != input_contents_size_) { | |
264 transfer_buffer.reset(new ContentVector); | |
265 transfer_buffer->swap(input_contents_); | |
266 buffer_size = input_contents_size_; | |
267 output_size_used_ += input_contents_size_; | |
268 input_contents_size_ = 0; | |
269 } | |
270 peer_task_runner_->PostTask( | |
271 FROM_HERE, base::Bind( | |
272 &ByteStreamReaderImpl::TransferData, | |
273 peer_lifetime_flag_, | |
274 peer_, | |
275 base::Passed(&transfer_buffer), | |
276 buffer_size, | |
277 complete, | |
278 status)); | |
279 } | |
280 | |
281 ByteStreamReaderImpl::ByteStreamReaderImpl( | |
282 scoped_refptr<base::SequencedTaskRunner> task_runner, | |
283 scoped_refptr<LifetimeFlag> lifetime_flag, | |
284 size_t buffer_size) | |
285 : total_buffer_size_(buffer_size), | |
286 my_task_runner_(task_runner), | |
287 my_lifetime_flag_(lifetime_flag), | |
288 received_status_(false), | |
289 status_(DOWNLOAD_INTERRUPT_REASON_NONE), | |
290 unreported_consumed_bytes_(0), | |
291 peer_(NULL) { | |
292 DCHECK(my_lifetime_flag_.get()); | |
293 my_lifetime_flag_->is_alive = true; | |
294 } | |
295 | |
296 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | |
297 my_lifetime_flag_->is_alive = false; | |
298 } | |
299 | |
300 void ByteStreamReaderImpl::SetPeer( | |
301 ByteStreamWriterImpl* peer, | |
302 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | |
303 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | |
304 peer_ = peer; | |
305 peer_task_runner_ = peer_task_runner; | |
306 peer_lifetime_flag_ = peer_lifetime_flag; | |
307 } | |
308 | |
309 ByteStreamReaderImpl::StreamState | |
310 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data, | |
311 size_t* length) { | |
312 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
313 | |
314 if (available_contents_.size()) { | |
315 *data = available_contents_.front().first; | |
316 *length = available_contents_.front().second; | |
317 available_contents_.pop_front(); | |
318 unreported_consumed_bytes_ += *length; | |
319 | |
320 MaybeUpdateInput(); | |
321 return STREAM_HAS_DATA; | |
322 } | |
323 if (received_status_) { | |
324 return STREAM_COMPLETE; | |
325 } | |
326 return STREAM_EMPTY; | |
327 } | |
328 | |
329 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { | |
330 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
331 DCHECK(received_status_); | |
332 return status_; | |
333 } | |
334 | |
335 void ByteStreamReaderImpl::RegisterCallback( | |
336 const base::Closure& sink_callback) { | |
337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
338 | |
339 data_available_callback_ = sink_callback; | |
340 } | |
341 | |
342 // static | |
343 void ByteStreamReaderImpl::TransferData( | |
344 scoped_refptr<LifetimeFlag> object_lifetime_flag, | |
345 ByteStreamReaderImpl* target, | |
346 scoped_ptr<ContentVector> transfer_buffer, | |
347 size_t buffer_size, | |
348 bool source_complete, | |
349 DownloadInterruptReason status) { | |
350 // If our target is no longer alive, do nothing. | |
351 if (!object_lifetime_flag->is_alive) return; | |
352 | |
353 target->TransferDataInternal( | |
354 transfer_buffer.Pass(), buffer_size, source_complete, status); | |
355 } | |
356 | |
357 void ByteStreamReaderImpl::TransferDataInternal( | |
358 scoped_ptr<ContentVector> transfer_buffer, | |
359 size_t buffer_size, | |
360 bool source_complete, | |
361 DownloadInterruptReason status) { | |
362 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
363 | |
364 bool was_empty = available_contents_.empty(); | |
365 | |
366 if (transfer_buffer.get()) { | |
367 available_contents_.insert(available_contents_.end(), | |
368 transfer_buffer->begin(), | |
369 transfer_buffer->end()); | |
370 } | |
371 | |
372 if (source_complete) { | |
373 received_status_ = true; | |
374 status_ = status; | |
375 } | |
376 | |
377 // Callback on transition from empty to non-empty, or | |
378 // source complete. | |
379 if (((was_empty && !available_contents_.empty()) || | |
380 source_complete) && | |
381 !data_available_callback_.is_null()) | |
382 data_available_callback_.Run(); | |
383 } | |
384 | |
385 // Decide whether or not to send the input a window update. | |
386 // Currently we do that whenever we've got unreported consumption | |
387 // greater than 1/3 of total size. | |
388 void ByteStreamReaderImpl::MaybeUpdateInput() { | |
389 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
390 | |
391 if (unreported_consumed_bytes_ <= | |
392 total_buffer_size_ / kFractionReadBeforeWindowUpdate) | |
393 return; | |
394 | |
395 peer_task_runner_->PostTask( | |
396 FROM_HERE, base::Bind( | |
397 &ByteStreamWriterImpl::UpdateWindow, | |
398 peer_lifetime_flag_, | |
399 peer_, | |
400 unreported_consumed_bytes_)); | |
401 unreported_consumed_bytes_ = 0; | |
402 } | |
403 | |
404 } // namespace | |
405 | |
406 | |
407 const int ByteStreamWriter::kFractionBufferBeforeSending = 3; | |
408 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3; | |
409 | |
410 ByteStreamReader::~ByteStreamReader() { } | |
411 | |
412 ByteStreamWriter::~ByteStreamWriter() { } | |
413 | |
414 void CreateByteStream( | |
415 scoped_refptr<base::SequencedTaskRunner> input_task_runner, | |
416 scoped_refptr<base::SequencedTaskRunner> output_task_runner, | |
417 size_t buffer_size, | |
418 scoped_ptr<ByteStreamWriter>* input, | |
419 scoped_ptr<ByteStreamReader>* output) { | |
420 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); | |
421 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); | |
422 | |
423 ByteStreamWriterImpl* in = new ByteStreamWriterImpl( | |
424 input_task_runner, input_flag, buffer_size); | |
425 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | |
426 output_task_runner, output_flag, buffer_size); | |
427 | |
428 in->SetPeer(out, output_task_runner, output_flag); | |
429 out->SetPeer(in, input_task_runner, input_flag); | |
430 input->reset(in); | |
431 output->reset(out); | |
432 } | |
433 | |
434 } // namespace content | |
OLD | NEW |