Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: content/browser/byte_stream.cc

Issue 18284005: Make ByteStream independent from DownloadInterruptReason (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: jam's comment Created 7 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « content/browser/byte_stream.h ('k') | content/browser/byte_stream_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "content/browser/byte_stream.h" 5 #include "content/browser/byte_stream.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/memory/ref_counted.h" 9 #include "base/memory/ref_counted.h"
10 #include "base/memory/weak_ptr.h" 10 #include "base/memory/weak_ptr.h"
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
49 49
50 // Must be called before any operations are performed. 50 // Must be called before any operations are performed.
51 void SetPeer(ByteStreamReaderImpl* peer, 51 void SetPeer(ByteStreamReaderImpl* peer,
52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
54 54
55 // Overridden from ByteStreamWriter. 55 // Overridden from ByteStreamWriter.
56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
57 size_t byte_count) OVERRIDE; 57 size_t byte_count) OVERRIDE;
58 virtual void Flush() OVERRIDE; 58 virtual void Flush() OVERRIDE;
59 virtual void Close(DownloadInterruptReason status) OVERRIDE; 59 virtual void Close(int status) OVERRIDE;
60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; 60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
61 61
62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. 62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, 63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
64 ByteStreamWriterImpl* target, 64 ByteStreamWriterImpl* target,
65 size_t bytes_consumed); 65 size_t bytes_consumed);
66 66
67 private: 67 private:
68 // Called from UpdateWindow when object existence has been validated. 68 // Called from UpdateWindow when object existence has been validated.
69 void UpdateWindowInternal(size_t bytes_consumed); 69 void UpdateWindowInternal(size_t bytes_consumed);
70 70
71 void PostToPeer(bool complete, DownloadInterruptReason status); 71 void PostToPeer(bool complete, int status);
72 72
73 const size_t total_buffer_size_; 73 const size_t total_buffer_size_;
74 74
75 // All data objects in this class are only valid to access on 75 // All data objects in this class are only valid to access on
76 // this task runner except as otherwise noted. 76 // this task runner except as otherwise noted.
77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
78 78
79 // True while this object is alive. 79 // True while this object is alive.
80 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 80 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
81 81
(...skipping 25 matching lines...) Expand all
107 virtual ~ByteStreamReaderImpl(); 107 virtual ~ByteStreamReaderImpl();
108 108
109 // Must be called before any operations are performed. 109 // Must be called before any operations are performed.
110 void SetPeer(ByteStreamWriterImpl* peer, 110 void SetPeer(ByteStreamWriterImpl* peer,
111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, 111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
112 scoped_refptr<LifetimeFlag> peer_lifetime_flag); 112 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
113 113
114 // Overridden from ByteStreamReader. 114 // Overridden from ByteStreamReader.
115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, 115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
116 size_t* length) OVERRIDE; 116 size_t* length) OVERRIDE;
117 virtual DownloadInterruptReason GetStatus() const OVERRIDE; 117 virtual int GetStatus() const OVERRIDE;
118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; 118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
119 119
120 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and 120 // PostTask target from |ByteStreamWriterImpl::Write| and
121 // |ByteStreamWriterImpl::Close|. 121 // |ByteStreamWriterImpl::Close|.
122 // Receive data from our peer. 122 // Receive data from our peer.
123 // static because it may be called after the object it is targeting 123 // static because it may be called after the object it is targeting
124 // has been destroyed. It may not access |*target| 124 // has been destroyed. It may not access |*target|
125 // if |*object_lifetime_flag| is false. 125 // if |*object_lifetime_flag| is false.
126 static void TransferData( 126 static void TransferData(
127 scoped_refptr<LifetimeFlag> object_lifetime_flag, 127 scoped_refptr<LifetimeFlag> object_lifetime_flag,
128 ByteStreamReaderImpl* target, 128 ByteStreamReaderImpl* target,
129 scoped_ptr<ContentVector> transfer_buffer, 129 scoped_ptr<ContentVector> transfer_buffer,
130 size_t transfer_buffer_bytes, 130 size_t transfer_buffer_bytes,
131 bool source_complete, 131 bool source_complete,
132 DownloadInterruptReason status); 132 int status);
133 133
134 private: 134 private:
135 // Called from TransferData once object existence has been validated. 135 // Called from TransferData once object existence has been validated.
136 void TransferDataInternal( 136 void TransferDataInternal(
137 scoped_ptr<ContentVector> transfer_buffer, 137 scoped_ptr<ContentVector> transfer_buffer,
138 size_t transfer_buffer_bytes, 138 size_t transfer_buffer_bytes,
139 bool source_complete, 139 bool source_complete,
140 DownloadInterruptReason status); 140 int status);
141 141
142 void MaybeUpdateInput(); 142 void MaybeUpdateInput();
143 143
144 const size_t total_buffer_size_; 144 const size_t total_buffer_size_;
145 145
146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; 146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
147 147
148 // True while this object is alive. 148 // True while this object is alive.
149 scoped_refptr<LifetimeFlag> my_lifetime_flag_; 149 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
150 150
151 ContentVector available_contents_; 151 ContentVector available_contents_;
152 152
153 bool received_status_; 153 bool received_status_;
154 DownloadInterruptReason status_; 154 int status_;
155 155
156 base::Closure data_available_callback_; 156 base::Closure data_available_callback_;
157 157
158 // Time of last point at which data in stream transitioned from full 158 // Time of last point at which data in stream transitioned from full
159 // to non-full. Nulled when a callback is sent. 159 // to non-full. Nulled when a callback is sent.
160 base::Time last_non_full_time_; 160 base::Time last_non_full_time_;
161 161
162 // ** Peer information 162 // ** Peer information
163 163
164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; 164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
204 204
205 bool ByteStreamWriterImpl::Write( 205 bool ByteStreamWriterImpl::Write(
206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { 206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
208 208
209 input_contents_.push_back(std::make_pair(buffer, byte_count)); 209 input_contents_.push_back(std::make_pair(buffer, byte_count));
210 input_contents_size_ += byte_count; 210 input_contents_size_ += byte_count;
211 211
212 // Arbitrarily, we buffer to a third of the total size before sending. 212 // Arbitrarily, we buffer to a third of the total size before sending.
213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) 213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
214 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); 214 PostToPeer(false, 0);
215 215
216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); 216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
217 } 217 }
218 218
219 void ByteStreamWriterImpl::Flush() { 219 void ByteStreamWriterImpl::Flush() {
220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
221 if (input_contents_size_ > 0) 221 if (input_contents_size_ > 0)
222 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); 222 PostToPeer(false, 0);
223 } 223 }
224 224
225 void ByteStreamWriterImpl::Close( 225 void ByteStreamWriterImpl::Close(int status) {
226 DownloadInterruptReason status) {
227 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
228 PostToPeer(true, status); 227 PostToPeer(true, status);
229 } 228 }
230 229
231 void ByteStreamWriterImpl::RegisterCallback( 230 void ByteStreamWriterImpl::RegisterCallback(
232 const base::Closure& source_callback) { 231 const base::Closure& source_callback) {
233 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 232 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
234 space_available_callback_ = source_callback; 233 space_available_callback_ = source_callback;
235 } 234 }
236 235
(...skipping 15 matching lines...) Expand all
252 // Callback if we were above the limit and we're now <= to it. 251 // Callback if we were above the limit and we're now <= to it.
253 size_t total_known_size_used = 252 size_t total_known_size_used =
254 input_contents_size_ + output_size_used_; 253 input_contents_size_ + output_size_used_;
255 254
256 if (total_known_size_used <= total_buffer_size_ && 255 if (total_known_size_used <= total_buffer_size_ &&
257 (total_known_size_used + bytes_consumed > total_buffer_size_) && 256 (total_known_size_used + bytes_consumed > total_buffer_size_) &&
258 !space_available_callback_.is_null()) 257 !space_available_callback_.is_null())
259 space_available_callback_.Run(); 258 space_available_callback_.Run();
260 } 259 }
261 260
262 void ByteStreamWriterImpl::PostToPeer( 261 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
263 bool complete, DownloadInterruptReason status) {
264 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 262 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
265 // Valid contexts in which to call. 263 // Valid contexts in which to call.
266 DCHECK(complete || 0 != input_contents_size_); 264 DCHECK(complete || 0 != input_contents_size_);
267 265
268 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); 266 scoped_ptr<ContentVector> transfer_buffer(new ContentVector);
269 size_t buffer_size = 0; 267 size_t buffer_size = 0;
270 if (0 != input_contents_size_) { 268 if (0 != input_contents_size_) {
271 transfer_buffer.reset(new ContentVector); 269 transfer_buffer.reset(new ContentVector);
272 transfer_buffer->swap(input_contents_); 270 transfer_buffer->swap(input_contents_);
273 buffer_size = input_contents_size_; 271 buffer_size = input_contents_size_;
(...skipping 12 matching lines...) Expand all
286 } 284 }
287 285
288 ByteStreamReaderImpl::ByteStreamReaderImpl( 286 ByteStreamReaderImpl::ByteStreamReaderImpl(
289 scoped_refptr<base::SequencedTaskRunner> task_runner, 287 scoped_refptr<base::SequencedTaskRunner> task_runner,
290 scoped_refptr<LifetimeFlag> lifetime_flag, 288 scoped_refptr<LifetimeFlag> lifetime_flag,
291 size_t buffer_size) 289 size_t buffer_size)
292 : total_buffer_size_(buffer_size), 290 : total_buffer_size_(buffer_size),
293 my_task_runner_(task_runner), 291 my_task_runner_(task_runner),
294 my_lifetime_flag_(lifetime_flag), 292 my_lifetime_flag_(lifetime_flag),
295 received_status_(false), 293 received_status_(false),
296 status_(DOWNLOAD_INTERRUPT_REASON_NONE), 294 status_(0),
297 unreported_consumed_bytes_(0), 295 unreported_consumed_bytes_(0),
298 peer_(NULL) { 296 peer_(NULL) {
299 DCHECK(my_lifetime_flag_.get()); 297 DCHECK(my_lifetime_flag_.get());
300 my_lifetime_flag_->is_alive = true; 298 my_lifetime_flag_->is_alive = true;
301 } 299 }
302 300
303 ByteStreamReaderImpl::~ByteStreamReaderImpl() { 301 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
304 my_lifetime_flag_->is_alive = false; 302 my_lifetime_flag_->is_alive = false;
305 } 303 }
306 304
(...skipping 19 matching lines...) Expand all
326 324
327 MaybeUpdateInput(); 325 MaybeUpdateInput();
328 return STREAM_HAS_DATA; 326 return STREAM_HAS_DATA;
329 } 327 }
330 if (received_status_) { 328 if (received_status_) {
331 return STREAM_COMPLETE; 329 return STREAM_COMPLETE;
332 } 330 }
333 return STREAM_EMPTY; 331 return STREAM_EMPTY;
334 } 332 }
335 333
336 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { 334 int ByteStreamReaderImpl::GetStatus() const {
337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 335 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
338 DCHECK(received_status_); 336 DCHECK(received_status_);
339 return status_; 337 return status_;
340 } 338 }
341 339
342 void ByteStreamReaderImpl::RegisterCallback( 340 void ByteStreamReaderImpl::RegisterCallback(
343 const base::Closure& sink_callback) { 341 const base::Closure& sink_callback) {
344 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 342 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
345 343
346 data_available_callback_ = sink_callback; 344 data_available_callback_ = sink_callback;
347 } 345 }
348 346
349 // static 347 // static
350 void ByteStreamReaderImpl::TransferData( 348 void ByteStreamReaderImpl::TransferData(
351 scoped_refptr<LifetimeFlag> object_lifetime_flag, 349 scoped_refptr<LifetimeFlag> object_lifetime_flag,
352 ByteStreamReaderImpl* target, 350 ByteStreamReaderImpl* target,
353 scoped_ptr<ContentVector> transfer_buffer, 351 scoped_ptr<ContentVector> transfer_buffer,
354 size_t buffer_size, 352 size_t buffer_size,
355 bool source_complete, 353 bool source_complete,
356 DownloadInterruptReason status) { 354 int status) {
357 // If our target is no longer alive, do nothing. 355 // If our target is no longer alive, do nothing.
358 if (!object_lifetime_flag->is_alive) return; 356 if (!object_lifetime_flag->is_alive) return;
359 357
360 target->TransferDataInternal( 358 target->TransferDataInternal(
361 transfer_buffer.Pass(), buffer_size, source_complete, status); 359 transfer_buffer.Pass(), buffer_size, source_complete, status);
362 } 360 }
363 361
364 void ByteStreamReaderImpl::TransferDataInternal( 362 void ByteStreamReaderImpl::TransferDataInternal(
365 scoped_ptr<ContentVector> transfer_buffer, 363 scoped_ptr<ContentVector> transfer_buffer,
366 size_t buffer_size, 364 size_t buffer_size,
367 bool source_complete, 365 bool source_complete,
368 DownloadInterruptReason status) { 366 int status) {
369 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); 367 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
370 368
371 bool was_empty = available_contents_.empty(); 369 bool was_empty = available_contents_.empty();
372 370
373 if (transfer_buffer) { 371 if (transfer_buffer) {
374 available_contents_.insert(available_contents_.end(), 372 available_contents_.insert(available_contents_.end(),
375 transfer_buffer->begin(), 373 transfer_buffer->begin(),
376 transfer_buffer->end()); 374 transfer_buffer->end());
377 } 375 }
378 376
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
432 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( 430 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
433 output_task_runner, output_flag, buffer_size); 431 output_task_runner, output_flag, buffer_size);
434 432
435 in->SetPeer(out, output_task_runner, output_flag); 433 in->SetPeer(out, output_task_runner, output_flag);
436 out->SetPeer(in, input_task_runner, input_flag); 434 out->SetPeer(in, input_task_runner, input_flag);
437 input->reset(in); 435 input->reset(in);
438 output->reset(out); 436 output->reset(out);
439 } 437 }
440 438
441 } // namespace content 439 } // namespace content
OLDNEW
« no previous file with comments | « content/browser/byte_stream.h ('k') | content/browser/byte_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698