Index: media/filters/chunk_demuxer.cc |
diff --git a/media/filters/chunk_demuxer.cc b/media/filters/chunk_demuxer.cc |
index 0c27733b13c8dcbf7b7ecdc3984afafccd57a656..95cd9e7493f08cbcc3b288786d0572812a9e39ee 100644 |
--- a/media/filters/chunk_demuxer.cc |
+++ b/media/filters/chunk_demuxer.cc |
@@ -24,12 +24,14 @@ class ChunkDemuxerStream : public DemuxerStream { |
public: |
typedef std::deque<scoped_refptr<Buffer> > BufferQueue; |
typedef std::deque<ReadCallback> ReadCBQueue; |
+ typedef std::deque<base::Closure> ClosureQueue; |
explicit ChunkDemuxerStream(const AudioDecoderConfig& audio_config); |
explicit ChunkDemuxerStream(const VideoDecoderConfig& video_config); |
virtual ~ChunkDemuxerStream(); |
void Flush(); |
+ void Seek(base::TimeDelta time); |
// Checks if it is ok to add the |buffers| to the stream. |
bool CanAddBuffers(const BufferQueue& buffers) const; |
@@ -40,22 +42,41 @@ class ChunkDemuxerStream : public DemuxerStream { |
bool GetLastBufferTimestamp(base::TimeDelta* timestamp) const; |
// DemuxerStream methods. |
- virtual void Read(const ReadCallback& read_callback); |
- virtual Type type(); |
- virtual void EnableBitstreamConverter(); |
- virtual const AudioDecoderConfig& audio_decoder_config(); |
- virtual const VideoDecoderConfig& video_decoder_config(); |
+ virtual void Read(const ReadCallback& read_callback) OVERRIDE; |
+ virtual Type type() OVERRIDE; |
+ virtual void EnableBitstreamConverter() OVERRIDE; |
+ virtual const AudioDecoderConfig& audio_decoder_config() OVERRIDE; |
+ virtual const VideoDecoderConfig& video_decoder_config() OVERRIDE; |
private: |
+ enum State { |
+ RETURNING_DATA_FOR_READS, |
+ WAITING_FOR_SEEK, |
+ RECEIVED_EOS_WHILE_WAITING_FOR_SEEK, // EOS = End of stream. |
+ RECEIVED_EOS, |
+ RETURNING_EOS_FOR_READS, |
+ SHUTDOWN, |
+ }; |
+ |
+ // Assigns |state_| to |state| |
+ void ChangeState_Locked(State state); |
+ |
+ // Adds the callback to |read_cbs_| so it can be called later when we |
+ // have data. |
+ void DeferRead_Locked(const ReadCallback& read_cb); |
+ |
+ // Creates closures that bind ReadCallbacks in |read_cbs_| to data in |
+ // |buffers_| and pops the callbacks & buffers from the respecive queues. |
+ void CreateReadDoneClosures_Locked(ClosureQueue* closures); |
+ |
Type type_; |
AudioDecoderConfig audio_config_; |
VideoDecoderConfig video_config_; |
mutable base::Lock lock_; |
+ State state_; |
ReadCBQueue read_cbs_; |
BufferQueue buffers_; |
- bool shutdown_called_; |
- bool received_end_of_stream_; |
// Keeps track of the timestamp of the last buffer we have |
// added to |buffers_|. This is used to enforce buffers with strictly |
@@ -67,8 +88,7 @@ class ChunkDemuxerStream : public DemuxerStream { |
ChunkDemuxerStream::ChunkDemuxerStream(const AudioDecoderConfig& audio_config) |
: type_(AUDIO), |
- shutdown_called_(false), |
- received_end_of_stream_(false), |
+ state_(RETURNING_DATA_FOR_READS), |
last_buffer_timestamp_(kNoTimestamp()) { |
audio_config_.CopyFrom(audio_config); |
} |
@@ -76,8 +96,7 @@ ChunkDemuxerStream::ChunkDemuxerStream(const AudioDecoderConfig& audio_config) |
ChunkDemuxerStream::ChunkDemuxerStream(const VideoDecoderConfig& video_config) |
: type_(VIDEO), |
- shutdown_called_(false), |
- received_end_of_stream_(false), |
+ state_(RETURNING_DATA_FOR_READS), |
last_buffer_timestamp_(kNoTimestamp()) { |
video_config_.CopyFrom(video_config); |
} |
@@ -86,10 +105,34 @@ ChunkDemuxerStream::~ChunkDemuxerStream() {} |
void ChunkDemuxerStream::Flush() { |
DVLOG(1) << "Flush()"; |
+ ReadCBQueue read_cbs; |
+ { |
+ base::AutoLock auto_lock(lock_); |
+ buffers_.clear(); |
+ ChangeState_Locked(WAITING_FOR_SEEK); |
+ last_buffer_timestamp_ = kNoTimestamp(); |
+ |
+ std::swap(read_cbs_, read_cbs); |
+ } |
+ |
+ for (ReadCBQueue::iterator it = read_cbs.begin(); it != read_cbs.end(); ++it) |
+ it->Run(scoped_refptr<Buffer>()); |
+} |
+ |
+void ChunkDemuxerStream::Seek(base::TimeDelta time) { |
base::AutoLock auto_lock(lock_); |
- buffers_.clear(); |
- received_end_of_stream_ = false; |
- last_buffer_timestamp_ = kNoTimestamp(); |
+ |
+ DCHECK(read_cbs_.empty()); |
+ |
+ if (state_ == WAITING_FOR_SEEK) { |
+ ChangeState_Locked(RETURNING_DATA_FOR_READS); |
+ return; |
+ } |
+ |
+ if (state_ == RECEIVED_EOS_WHILE_WAITING_FOR_SEEK) { |
+ ChangeState_Locked(RECEIVED_EOS); |
+ return; |
+ } |
} |
bool ChunkDemuxerStream::CanAddBuffers(const BufferQueue& buffers) const { |
@@ -109,7 +152,7 @@ void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) { |
if (buffers.empty()) |
return; |
- std::deque<base::Closure> callbacks; |
+ ClosureQueue closures; |
{ |
base::AutoLock auto_lock(lock_); |
@@ -117,17 +160,15 @@ void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) { |
itr != buffers.end(); itr++) { |
// Make sure we aren't trying to add a buffer after we have received and |
// "end of stream" buffer. |
- DCHECK(!received_end_of_stream_); |
+ DCHECK_NE(state_, RECEIVED_EOS_WHILE_WAITING_FOR_SEEK); |
+ DCHECK_NE(state_, RECEIVED_EOS); |
+ DCHECK_NE(state_, RETURNING_EOS_FOR_READS); |
if ((*itr)->IsEndOfStream()) { |
- received_end_of_stream_ = true; |
- |
- // Push enough EOS buffers to satisfy outstanding Read() requests. |
- if (read_cbs_.size() > buffers_.size()) { |
- size_t pending_read_without_data = read_cbs_.size() - buffers_.size(); |
- for (size_t i = 0; i <= pending_read_without_data; ++i) { |
- buffers_.push_back(*itr); |
- } |
+ if (state_ == WAITING_FOR_SEEK) { |
+ ChangeState_Locked(RECEIVED_EOS_WHILE_WAITING_FOR_SEEK); |
+ } else { |
+ ChangeState_Locked(RECEIVED_EOS); |
} |
} else { |
base::TimeDelta current_ts = (*itr)->GetTimestamp(); |
@@ -141,38 +182,27 @@ void ChunkDemuxerStream::AddBuffers(const BufferQueue& buffers) { |
} |
} |
- while (!buffers_.empty() && !read_cbs_.empty()) { |
- callbacks.push_back(base::Bind(read_cbs_.front(), buffers_.front())); |
- buffers_.pop_front(); |
- read_cbs_.pop_front(); |
- } |
+ CreateReadDoneClosures_Locked(&closures); |
} |
- while (!callbacks.empty()) { |
- callbacks.front().Run(); |
- callbacks.pop_front(); |
- } |
+ for (ClosureQueue::iterator it = closures.begin(); it != closures.end(); ++it) |
+ it->Run(); |
} |
void ChunkDemuxerStream::Shutdown() { |
- std::deque<ReadCallback> callbacks; |
+ ReadCBQueue read_cbs; |
{ |
base::AutoLock auto_lock(lock_); |
- shutdown_called_ = true; |
+ ChangeState_Locked(SHUTDOWN); |
- // Collect all the pending Read() callbacks. |
- while (!read_cbs_.empty()) { |
- callbacks.push_back(read_cbs_.front()); |
- read_cbs_.pop_front(); |
- } |
+ std::swap(read_cbs_, read_cbs); |
+ buffers_.clear(); |
} |
// Pass end of stream buffers to all callbacks to signal that no more data |
// will be sent. |
- while (!callbacks.empty()) { |
- callbacks.front().Run(CreateEOSBuffer()); |
- callbacks.pop_front(); |
- } |
+ for (ReadCBQueue::iterator it = read_cbs.begin(); it != read_cbs.end(); ++it) |
+ it->Run(CreateEOSBuffer()); |
} |
bool ChunkDemuxerStream::GetLastBufferTimestamp( |
@@ -206,33 +236,46 @@ void ChunkDemuxerStream::Read(const ReadCallback& read_callback) { |
{ |
base::AutoLock auto_lock(lock_); |
- if (shutdown_called_ || (received_end_of_stream_ && buffers_.empty())) { |
- buffer = CreateEOSBuffer(); |
- } else { |
- if (buffers_.empty()) { |
- // Wrap & store |read_callback| so that it will |
- // get called on the current MessageLoop. |
- read_cbs_.push_back(base::Bind(&RunOnMessageLoop, |
- read_callback, |
- MessageLoop::current())); |
- return; |
- } |
+ switch(state_) { |
+ case RETURNING_DATA_FOR_READS: |
+ // If we don't have any buffers ready or already have |
+ // pending reads, then defer this read. |
+ if (buffers_.empty() || !read_cbs_.empty()) { |
+ DeferRead_Locked(read_callback); |
+ return; |
+ } |
- if (!read_cbs_.empty()) { |
- // Wrap & store |read_callback| so that it will |
- // get called on the current MessageLoop. |
- read_cbs_.push_back(base::Bind(&RunOnMessageLoop, |
- read_callback, |
- MessageLoop::current())); |
- return; |
- } |
+ buffer = buffers_.front(); |
+ buffers_.pop_front(); |
+ break; |
+ |
+ case WAITING_FOR_SEEK: |
+ case RECEIVED_EOS_WHILE_WAITING_FOR_SEEK: |
+ // Null buffers should be returned in this state since we are waiting |
+ // for a seek. Any buffers in |buffers_| should NOT be returned because |
+ // they are associated with the seek. |
+ DCHECK(read_cbs_.empty()); |
+ break; |
+ case RECEIVED_EOS: |
+ DCHECK(read_cbs_.empty()); |
+ |
+ if (buffers_.empty()) { |
+ ChangeState_Locked(RETURNING_EOS_FOR_READS); |
+ buffer = CreateEOSBuffer(); |
+ } else { |
+ buffer = buffers_.front(); |
+ buffers_.pop_front(); |
+ } |
+ break; |
- buffer = buffers_.front(); |
- buffers_.pop_front(); |
+ case RETURNING_EOS_FOR_READS: |
+ case SHUTDOWN: |
+ DCHECK(buffers_.empty()); |
+ DCHECK(read_cbs_.empty()); |
+ buffer = CreateEOSBuffer(); |
} |
} |
- DCHECK(buffer.get()); |
read_callback.Run(buffer); |
} |
@@ -250,6 +293,44 @@ const VideoDecoderConfig& ChunkDemuxerStream::video_decoder_config() { |
return video_config_; |
} |
+void ChunkDemuxerStream::ChangeState_Locked(State state) { |
+ lock_.AssertAcquired(); |
+ state_ = state; |
+} |
+ |
+void ChunkDemuxerStream::DeferRead_Locked(const ReadCallback& read_cb) { |
+ lock_.AssertAcquired(); |
+ // Wrap & store |read_callback| so that it will |
+ // get called on the current MessageLoop. |
+ read_cbs_.push_back(base::Bind(&RunOnMessageLoop, read_cb, |
+ MessageLoop::current())); |
+} |
+ |
+void ChunkDemuxerStream::CreateReadDoneClosures_Locked(ClosureQueue* closures) { |
+ lock_.AssertAcquired(); |
+ |
+ if (state_ != RETURNING_DATA_FOR_READS && state_ != RECEIVED_EOS) |
+ return; |
+ |
+ while (!buffers_.empty() && !read_cbs_.empty()) { |
+ closures->push_back(base::Bind(read_cbs_.front(), buffers_.front())); |
+ buffers_.pop_front(); |
+ read_cbs_.pop_front(); |
+ } |
+ |
+ if (state_ != RECEIVED_EOS || !buffers_.empty() || read_cbs_.empty()) |
+ return; |
+ |
+ // Push enough EOS buffers to satisfy outstanding Read() requests. |
+ scoped_refptr<Buffer> end_of_stream_buffer = CreateEOSBuffer(); |
+ while (!read_cbs_.empty()) { |
+ closures->push_back(base::Bind(read_cbs_.front(), end_of_stream_buffer)); |
+ read_cbs_.pop_front(); |
+ } |
+ |
+ ChangeState_Locked(RETURNING_EOS_FOR_READS); |
+} |
+ |
ChunkDemuxer::ChunkDemuxer(ChunkDemuxerClient* client) |
: state_(WAITING_FOR_INIT), |
client_(client), |
@@ -306,6 +387,12 @@ void ChunkDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) { |
base::AutoLock auto_lock(lock_); |
if (state_ == INITIALIZED || state_ == ENDED) { |
+ if (audio_) |
+ audio_->Seek(time); |
+ |
+ if (video_) |
+ video_->Seek(time); |
+ |
if (seek_waits_for_data_) { |
DVLOG(1) << "Seek() : waiting for more data to arrive."; |
seek_cb_ = cb; |