Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(761)

Side by Side Diff: media/filters/chunk_demuxer.cc

Issue 9295020: Fix ChunkDemuxer seek deadlock (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix copyright year. Created 8 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « media/filters/audio_renderer_base_unittest.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "media/filters/chunk_demuxer.h" 5 #include "media/filters/chunk_demuxer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop.h" 9 #include "base/message_loop.h"
10 #include "media/base/audio_decoder_config.h" 10 #include "media/base/audio_decoder_config.h"
11 #include "media/base/data_buffer.h" 11 #include "media/base/data_buffer.h"
12 #include "media/base/video_decoder_config.h" 12 #include "media/base/video_decoder_config.h"
13 #include "media/filters/chunk_demuxer_client.h" 13 #include "media/filters/chunk_demuxer_client.h"
14 #include "media/webm/webm_stream_parser.h" 14 #include "media/webm/webm_stream_parser.h"
15 15
16 namespace media { 16 namespace media {
17 17
18 // Create an "end of stream" buffer. 18 // Create an "end of stream" buffer.
19 static Buffer* CreateEOSBuffer() { 19 static Buffer* CreateEOSBuffer() {
20 return new DataBuffer(0); 20 return new DataBuffer(0);
21 } 21 }
22 22
23 class ChunkDemuxerStream : public DemuxerStream { 23 class ChunkDemuxerStream : public DemuxerStream {
24 public: 24 public:
25 typedef std::deque<scoped_refptr<Buffer> > BufferQueue; 25 typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
26 typedef std::deque<ReadCallback> ReadCBQueue; 26 typedef std::deque<ReadCallback> ReadCBQueue;
27 typedef std::deque<base::Closure> ClosureQueue;
27 28
28 explicit ChunkDemuxerStream(const AudioDecoderConfig& audio_config); 29 explicit ChunkDemuxerStream(const AudioDecoderConfig& audio_config);
29 explicit ChunkDemuxerStream(const VideoDecoderConfig& video_config); 30 explicit ChunkDemuxerStream(const VideoDecoderConfig& video_config);
30 virtual ~ChunkDemuxerStream(); 31 virtual ~ChunkDemuxerStream();
31 32
32 void Flush(); 33 void Flush();
34 void Seek(base::TimeDelta time);
33 35
34 // Checks if it is ok to add the |buffers| to the stream. 36 // Checks if it is ok to add the |buffers| to the stream.
35 bool CanAddBuffers(const BufferQueue& buffers) const; 37 bool CanAddBuffers(const BufferQueue& buffers) const;
36 38
37 void AddBuffers(const BufferQueue& buffers); 39 void AddBuffers(const BufferQueue& buffers);
38 void Shutdown(); 40 void Shutdown();
39 41
40 bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const; 42 bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const;
41 43
42 // DemuxerStream methods. 44 // DemuxerStream methods.
43 virtual void Read(const ReadCallback& read_callback); 45 virtual void Read(const ReadCallback& read_callback) OVERRIDE;
44 virtual Type type(); 46 virtual Type type() OVERRIDE;
45 virtual void EnableBitstreamConverter(); 47 virtual void EnableBitstreamConverter() OVERRIDE;
46 virtual const AudioDecoderConfig& audio_decoder_config(); 48 virtual const AudioDecoderConfig& audio_decoder_config() OVERRIDE;
47 virtual const VideoDecoderConfig& video_decoder_config(); 49 virtual const VideoDecoderConfig& video_decoder_config() OVERRIDE;
48 50
49 private: 51 private:
52 enum State {
53 RETURNING_DATA_FOR_READS,
54 WAITING_FOR_SEEK,
55 RECEIVED_EOS_WHILE_WAITING_FOR_SEEK, // EOS = End of stream.
56 RECEIVED_EOS,
57 RETURNING_EOS_FOR_READS,
58 SHUTDOWN,
59 };
60
61 // Assigns |state_| to |state|
62 void ChangeState_Locked(State state);
63
64 // Adds the callback to |read_cbs_| so it can be called later when we
65 // have data.
66 void DeferRead_Locked(const ReadCallback& read_cb);
67
68 // Creates closures that bind ReadCallbacks in |read_cbs_| to data in
69 // |buffers_| and pops the callbacks & buffers from the respecive queues.
70 void CreateReadDoneClosures_Locked(ClosureQueue* closures);
71
50 Type type_; 72 Type type_;
51 AudioDecoderConfig audio_config_; 73 AudioDecoderConfig audio_config_;
52 VideoDecoderConfig video_config_; 74 VideoDecoderConfig video_config_;
53 75
54 mutable base::Lock lock_; 76 mutable base::Lock lock_;
77 State state_;
55 ReadCBQueue read_cbs_; 78 ReadCBQueue read_cbs_;
56 BufferQueue buffers_; 79 BufferQueue buffers_;
57 bool shutdown_called_;
58 bool received_end_of_stream_;
59 80
60 // Keeps track of the timestamp of the last buffer we have 81 // Keeps track of the timestamp of the last buffer we have
61 // added to |buffers_|. This is used to enforce buffers with strictly 82 // added to |buffers_|. This is used to enforce buffers with strictly
62 // monotonically increasing timestamps. 83 // monotonically increasing timestamps.
63 base::TimeDelta last_buffer_timestamp_; 84 base::TimeDelta last_buffer_timestamp_;
64 85
65 DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream); 86 DISALLOW_IMPLICIT_CONSTRUCTORS(ChunkDemuxerStream);
66 }; 87 };
67 88
68 ChunkDemuxerStream::ChunkDemuxerStream(const AudioDecoderConfig& audio_config) 89 ChunkDemuxerStream::ChunkDemuxerStream(const AudioDecoderConfig& audio_config)
69 : type_(AUDIO), 90 : type_(AUDIO),
70 shutdown_called_(false), 91 state_(RETURNING_DATA_FOR_READS),
71 received_end_of_stream_(false),
72 last_buffer_timestamp_(kNoTimestamp()) { 92 last_buffer_timestamp_(kNoTimestamp()) {
73 audio_config_.CopyFrom(audio_config); 93 audio_config_.CopyFrom(audio_config);
74 } 94 }
75 95
76 96
77 ChunkDemuxerStream::ChunkDemuxerStream(const VideoDecoderConfig& video_config) 97 ChunkDemuxerStream::ChunkDemuxerStream(const VideoDecoderConfig& video_config)
78 : type_(VIDEO), 98 : type_(VIDEO),
79 shutdown_called_(false), 99 state_(RETURNING_DATA_FOR_READS),
80 received_end_of_stream_(false),
81 last_buffer_timestamp_(kNoTimestamp()) { 100 last_buffer_timestamp_(kNoTimestamp()) {
82 video_config_.CopyFrom(video_config); 101 video_config_.CopyFrom(video_config);
83 } 102 }
84 103
85 ChunkDemuxerStream::~ChunkDemuxerStream() {} 104 ChunkDemuxerStream::~ChunkDemuxerStream() {}
86 105
87 void ChunkDemuxerStream::Flush() { 106 void ChunkDemuxerStream::Flush() {
88 DVLOG(1) << "Flush()"; 107 DVLOG(1) << "Flush()";
108 ReadCBQueue read_cbs;
109 {
110 base::AutoLock auto_lock(lock_);
111 buffers_.clear();
112 ChangeState_Locked(WAITING_FOR_SEEK);
113 last_buffer_timestamp_ = kNoTimestamp();
114
115 std::swap(read_cbs_, read_cbs);
116 }
117
118 for (ReadCBQueue::iterator it = read_cbs.begin(); it != read_cbs.end(); ++it)
119 it->Run(scoped_refptr<Buffer>());
120 }
121
122 void ChunkDemuxerStream::Seek(base::TimeDelta time) {
89 base::AutoLock auto_lock(lock_); 123 base::AutoLock auto_lock(lock_);
90 buffers_.clear(); 124
91 received_end_of_stream_ = false; 125 DCHECK(read_cbs_.empty());
92 last_buffer_timestamp_ = kNoTimestamp(); 126
127 if (state_ == WAITING_FOR_SEEK) {
128 ChangeState_Locked(RETURNING_DATA_FOR_READS);
129 return;
130 }
131
132 if (state_ == RECEIVED_EOS_WHILE_WAITING_FOR_SEEK) {
133 ChangeState_Locked(RECEIVED_EOS);
134 return;
135 }
93 } 136 }
94 137
95 bool ChunkDemuxerStream::CanAddBuffers(const BufferQueue& buffers) const { 138 bool ChunkDemuxerStream::CanAddBuffers(const BufferQueue& buffers) const {
96 base::AutoLock auto_lock(lock_); 139 base::AutoLock auto_lock(lock_);
97 140
98 // If we haven't seen any buffers yet, then anything can be added. 141 // If we haven't seen any buffers yet, then anything can be added.
99 if (last_buffer_timestamp_ == kNoTimestamp()) 142 if (last_buffer_timestamp_ == kNoTimestamp())
100 return true; 143 return true;
101 144
102 if (buffers.empty()) 145 if (buffers.empty())
103 return true; 146 return true;
104 147
105 return (buffers.front()->GetTimestamp() > last_buffer_timestamp_); 148 return (buffers.front()->GetTimestamp() > last_buffer_timestamp_);
106 } 149 }
107 150
108 void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) { 151 void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) {
109 if (buffers.empty()) 152 if (buffers.empty())
110 return; 153 return;
111 154
112 std::deque<base::Closure> callbacks; 155 ClosureQueue closures;
113 { 156 {
114 base::AutoLock auto_lock(lock_); 157 base::AutoLock auto_lock(lock_);
115 158
116 for (BufferQueue::const_iterator itr = buffers.begin(); 159 for (BufferQueue::const_iterator itr = buffers.begin();
117 itr != buffers.end(); itr++) { 160 itr != buffers.end(); itr++) {
118 // Make sure we aren't trying to add a buffer after we have received and 161 // Make sure we aren't trying to add a buffer after we have received and
119 // "end of stream" buffer. 162 // "end of stream" buffer.
120 DCHECK(!received_end_of_stream_); 163 DCHECK_NE(state_, RECEIVED_EOS_WHILE_WAITING_FOR_SEEK);
164 DCHECK_NE(state_, RECEIVED_EOS);
165 DCHECK_NE(state_, RETURNING_EOS_FOR_READS);
121 166
122 if ((*itr)->IsEndOfStream()) { 167 if ((*itr)->IsEndOfStream()) {
123 received_end_of_stream_ = true; 168 if (state_ == WAITING_FOR_SEEK) {
124 169 ChangeState_Locked(RECEIVED_EOS_WHILE_WAITING_FOR_SEEK);
125 // Push enough EOS buffers to satisfy outstanding Read() requests. 170 } else {
126 if (read_cbs_.size() > buffers_.size()) { 171 ChangeState_Locked(RECEIVED_EOS);
127 size_t pending_read_without_data = read_cbs_.size() - buffers_.size();
128 for (size_t i = 0; i <= pending_read_without_data; ++i) {
129 buffers_.push_back(*itr);
130 }
131 } 172 }
132 } else { 173 } else {
133 base::TimeDelta current_ts = (*itr)->GetTimestamp(); 174 base::TimeDelta current_ts = (*itr)->GetTimestamp();
134 if (last_buffer_timestamp_ != kNoTimestamp()) { 175 if (last_buffer_timestamp_ != kNoTimestamp()) {
135 DCHECK_GT(current_ts.ToInternalValue(), 176 DCHECK_GT(current_ts.ToInternalValue(),
136 last_buffer_timestamp_.ToInternalValue()); 177 last_buffer_timestamp_.ToInternalValue());
137 } 178 }
138 179
139 last_buffer_timestamp_ = current_ts; 180 last_buffer_timestamp_ = current_ts;
140 buffers_.push_back(*itr); 181 buffers_.push_back(*itr);
141 } 182 }
142 } 183 }
143 184
144 while (!buffers_.empty() && !read_cbs_.empty()) { 185 CreateReadDoneClosures_Locked(&closures);
145 callbacks.push_back(base::Bind(read_cbs_.front(), buffers_.front()));
146 buffers_.pop_front();
147 read_cbs_.pop_front();
148 }
149 } 186 }
150 187
151 while (!callbacks.empty()) { 188 for (ClosureQueue::iterator it = closures.begin(); it != closures.end(); ++it)
152 callbacks.front().Run(); 189 it->Run();
153 callbacks.pop_front();
154 }
155 } 190 }
156 191
157 void ChunkDemuxerStream::Shutdown() { 192 void ChunkDemuxerStream::Shutdown() {
158 std::deque<ReadCallback> callbacks; 193 ReadCBQueue read_cbs;
159 { 194 {
160 base::AutoLock auto_lock(lock_); 195 base::AutoLock auto_lock(lock_);
161 shutdown_called_ = true; 196 ChangeState_Locked(SHUTDOWN);
162 197
163 // Collect all the pending Read() callbacks. 198 std::swap(read_cbs_, read_cbs);
164 while (!read_cbs_.empty()) { 199 buffers_.clear();
165 callbacks.push_back(read_cbs_.front());
166 read_cbs_.pop_front();
167 }
168 } 200 }
169 201
170 // Pass end of stream buffers to all callbacks to signal that no more data 202 // Pass end of stream buffers to all callbacks to signal that no more data
171 // will be sent. 203 // will be sent.
172 while (!callbacks.empty()) { 204 for (ReadCBQueue::iterator it = read_cbs.begin(); it != read_cbs.end(); ++it)
173 callbacks.front().Run(CreateEOSBuffer()); 205 it->Run(CreateEOSBuffer());
174 callbacks.pop_front();
175 }
176 } 206 }
177 207
178 bool ChunkDemuxerStream::GetLastBufferTimestamp( 208 bool ChunkDemuxerStream::GetLastBufferTimestamp(
179 base::TimeDelta* timestamp) const { 209 base::TimeDelta* timestamp) const {
180 base::AutoLock auto_lock(lock_); 210 base::AutoLock auto_lock(lock_);
181 211
182 if (buffers_.empty()) 212 if (buffers_.empty())
183 return false; 213 return false;
184 214
185 *timestamp = buffers_.back()->GetTimestamp(); 215 *timestamp = buffers_.back()->GetTimestamp();
(...skipping 13 matching lines...) Expand all
199 read_callback.Run(buffer); 229 read_callback.Run(buffer);
200 } 230 }
201 231
202 // DemuxerStream methods. 232 // DemuxerStream methods.
203 void ChunkDemuxerStream::Read(const ReadCallback& read_callback) { 233 void ChunkDemuxerStream::Read(const ReadCallback& read_callback) {
204 scoped_refptr<Buffer> buffer; 234 scoped_refptr<Buffer> buffer;
205 235
206 { 236 {
207 base::AutoLock auto_lock(lock_); 237 base::AutoLock auto_lock(lock_);
208 238
209 if (shutdown_called_ || (received_end_of_stream_ && buffers_.empty())) { 239 switch(state_) {
210 buffer = CreateEOSBuffer(); 240 case RETURNING_DATA_FOR_READS:
211 } else { 241 // If we don't have any buffers ready or already have
212 if (buffers_.empty()) { 242 // pending reads, then defer this read.
213 // Wrap & store |read_callback| so that it will 243 if (buffers_.empty() || !read_cbs_.empty()) {
214 // get called on the current MessageLoop. 244 DeferRead_Locked(read_callback);
215 read_cbs_.push_back(base::Bind(&RunOnMessageLoop, 245 return;
216 read_callback, 246 }
217 MessageLoop::current()));
218 return;
219 }
220 247
221 if (!read_cbs_.empty()) { 248 buffer = buffers_.front();
222 // Wrap & store |read_callback| so that it will 249 buffers_.pop_front();
223 // get called on the current MessageLoop. 250 break;
224 read_cbs_.push_back(base::Bind(&RunOnMessageLoop,
225 read_callback,
226 MessageLoop::current()));
227 return;
228 }
229 251
230 buffer = buffers_.front(); 252 case WAITING_FOR_SEEK:
231 buffers_.pop_front(); 253 case RECEIVED_EOS_WHILE_WAITING_FOR_SEEK:
254 // Null buffers should be returned in this state since we are waiting
255 // for a seek. Any buffers in |buffers_| should NOT be returned because
256 // they are associated with the seek.
257 DCHECK(read_cbs_.empty());
258 break;
259 case RECEIVED_EOS:
260 DCHECK(read_cbs_.empty());
261
262 if (buffers_.empty()) {
263 ChangeState_Locked(RETURNING_EOS_FOR_READS);
264 buffer = CreateEOSBuffer();
265 } else {
266 buffer = buffers_.front();
267 buffers_.pop_front();
268 }
269 break;
270
271 case RETURNING_EOS_FOR_READS:
272 case SHUTDOWN:
273 DCHECK(buffers_.empty());
274 DCHECK(read_cbs_.empty());
275 buffer = CreateEOSBuffer();
232 } 276 }
233 } 277 }
234 278
235 DCHECK(buffer.get());
236 read_callback.Run(buffer); 279 read_callback.Run(buffer);
237 } 280 }
238 281
239 DemuxerStream::Type ChunkDemuxerStream::type() { return type_; } 282 DemuxerStream::Type ChunkDemuxerStream::type() { return type_; }
240 283
241 void ChunkDemuxerStream::EnableBitstreamConverter() {} 284 void ChunkDemuxerStream::EnableBitstreamConverter() {}
242 285
243 const AudioDecoderConfig& ChunkDemuxerStream::audio_decoder_config() { 286 const AudioDecoderConfig& ChunkDemuxerStream::audio_decoder_config() {
244 CHECK_EQ(type_, AUDIO); 287 CHECK_EQ(type_, AUDIO);
245 return audio_config_; 288 return audio_config_;
246 } 289 }
247 290
248 const VideoDecoderConfig& ChunkDemuxerStream::video_decoder_config() { 291 const VideoDecoderConfig& ChunkDemuxerStream::video_decoder_config() {
249 CHECK_EQ(type_, VIDEO); 292 CHECK_EQ(type_, VIDEO);
250 return video_config_; 293 return video_config_;
251 } 294 }
252 295
296 void ChunkDemuxerStream::ChangeState_Locked(State state) {
297 lock_.AssertAcquired();
298 state_ = state;
299 }
300
301 void ChunkDemuxerStream::DeferRead_Locked(const ReadCallback& read_cb) {
302 lock_.AssertAcquired();
303 // Wrap & store |read_callback| so that it will
304 // get called on the current MessageLoop.
305 read_cbs_.push_back(base::Bind(&RunOnMessageLoop, read_cb,
306 MessageLoop::current()));
307 }
308
309 void ChunkDemuxerStream::CreateReadDoneClosures_Locked(ClosureQueue* closures) {
310 lock_.AssertAcquired();
311
312 if (state_ != RETURNING_DATA_FOR_READS && state_ != RECEIVED_EOS)
313 return;
314
315 while (!buffers_.empty() && !read_cbs_.empty()) {
316 closures->push_back(base::Bind(read_cbs_.front(), buffers_.front()));
317 buffers_.pop_front();
318 read_cbs_.pop_front();
319 }
320
321 if (state_ != RECEIVED_EOS || !buffers_.empty() || read_cbs_.empty())
322 return;
323
324 // Push enough EOS buffers to satisfy outstanding Read() requests.
325 scoped_refptr<Buffer> end_of_stream_buffer = CreateEOSBuffer();
326 while (!read_cbs_.empty()) {
327 closures->push_back(base::Bind(read_cbs_.front(), end_of_stream_buffer));
328 read_cbs_.pop_front();
329 }
330
331 ChangeState_Locked(RETURNING_EOS_FOR_READS);
332 }
333
253 ChunkDemuxer::ChunkDemuxer(ChunkDemuxerClient* client) 334 ChunkDemuxer::ChunkDemuxer(ChunkDemuxerClient* client)
254 : state_(WAITING_FOR_INIT), 335 : state_(WAITING_FOR_INIT),
255 client_(client), 336 client_(client),
256 buffered_bytes_(0), 337 buffered_bytes_(0),
257 seek_waits_for_data_(true), 338 seek_waits_for_data_(true),
258 deferred_error_(PIPELINE_OK) { 339 deferred_error_(PIPELINE_OK) {
259 DCHECK(client); 340 DCHECK(client);
260 } 341 }
261 342
262 ChunkDemuxer::~ChunkDemuxer() { 343 ChunkDemuxer::~ChunkDemuxer() {
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
299 } 380 }
300 381
301 void ChunkDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) { 382 void ChunkDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) {
302 DVLOG(1) << "Seek(" << time.InSecondsF() << ")"; 383 DVLOG(1) << "Seek(" << time.InSecondsF() << ")";
303 384
304 PipelineStatus status = PIPELINE_ERROR_INVALID_STATE; 385 PipelineStatus status = PIPELINE_ERROR_INVALID_STATE;
305 { 386 {
306 base::AutoLock auto_lock(lock_); 387 base::AutoLock auto_lock(lock_);
307 388
308 if (state_ == INITIALIZED || state_ == ENDED) { 389 if (state_ == INITIALIZED || state_ == ENDED) {
390 if (audio_)
391 audio_->Seek(time);
392
393 if (video_)
394 video_->Seek(time);
395
309 if (seek_waits_for_data_) { 396 if (seek_waits_for_data_) {
310 DVLOG(1) << "Seek() : waiting for more data to arrive."; 397 DVLOG(1) << "Seek() : waiting for more data to arrive.";
311 seek_cb_ = cb; 398 seek_cb_ = cb;
312 return; 399 return;
313 } 400 }
314 401
315 status = PIPELINE_OK; 402 status = PIPELINE_OK;
316 } 403 }
317 } 404 }
318 405
(...skipping 327 matching lines...) Expand 10 before | Expand all | Expand 10 after
646 if (!video_->CanAddBuffers(buffers)) 733 if (!video_->CanAddBuffers(buffers))
647 return false; 734 return false;
648 735
649 video_->AddBuffers(buffers); 736 video_->AddBuffers(buffers);
650 seek_waits_for_data_ = false; 737 seek_waits_for_data_ = false;
651 738
652 return true; 739 return true;
653 } 740 }
654 741
655 } // namespace media 742 } // namespace media
OLDNEW
« no previous file with comments | « media/filters/audio_renderer_base_unittest.cc ('k') | media/filters/ffmpeg_audio_decoder.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698