Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1152)

Unified Diff: webrtc/call/call.cc

Issue 2709723003: Initial implementation of RtpTransportControllerReceive and related interfaces.
Patch Set: Rebased. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/call/call.cc
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index c7e49dee1a2ef587052eadd88edd406ba74bd218..21051689526c943bccd0cc8efdb2655e0bd3e5dc 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -33,6 +33,7 @@
#include "webrtc/call/bitrate_allocator.h"
#include "webrtc/call/call.h"
#include "webrtc/call/flexfec_receive_stream_impl.h"
+#include "webrtc/call/rtp_transport_controller_receive.h"
#include "webrtc/call/rtp_transport_controller_send.h"
#include "webrtc/config.h"
#include "webrtc/logging/rtc_event_log/rtc_event_log.h"
@@ -211,11 +212,6 @@ class Call : public webrtc::Call,
MediaType media_type)
SHARED_LOCKS_REQUIRED(receive_crit_);
- rtc::Optional<RtpPacketReceived> ParseRtpPacket(const uint8_t* packet,
- size_t length,
- const PacketTime& packet_time)
- SHARED_LOCKS_REQUIRED(receive_crit_);
-
void UpdateSendHistograms(int64_t first_sent_packet_ms)
EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
void UpdateReceiveHistograms();
@@ -238,45 +234,16 @@ class Call : public webrtc::Call,
std::unique_ptr<RWLockWrapper> receive_crit_;
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
+ // TODO(nisse): Try to eliminate these additional mappings. Two of
+ // the users are DeliverRTCP and OnRecoveredPacket.
std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_
GUARDED_BY(receive_crit_);
std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
GUARDED_BY(receive_crit_);
std::set<VideoReceiveStream*> video_receive_streams_
GUARDED_BY(receive_crit_);
- // Each media stream could conceivably be protected by multiple FlexFEC
- // streams.
- std::multimap<uint32_t, FlexfecReceiveStreamImpl*>
- flexfec_receive_ssrcs_media_ GUARDED_BY(receive_crit_);
- std::map<uint32_t, FlexfecReceiveStreamImpl*>
- flexfec_receive_ssrcs_protection_ GUARDED_BY(receive_crit_);
- std::set<FlexfecReceiveStreamImpl*> flexfec_receive_streams_
- GUARDED_BY(receive_crit_);
- std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
- GUARDED_BY(receive_crit_);
- // This extra map is used for receive processing which is
- // independent of media type.
-
- // TODO(nisse): In the RTP transport refactoring, we should have a
- // single mapping from ssrc to a more abstract receive stream, with
- // accessor methods for all configuration we need at this level.
- struct ReceiveRtpConfig {
- ReceiveRtpConfig() = default; // Needed by std::map
- ReceiveRtpConfig(const std::vector<RtpExtension>& extensions,
- bool use_send_side_bwe)
- : extensions(extensions), use_send_side_bwe(use_send_side_bwe) {}
-
- // Registered RTP header extensions for each stream. Note that RTP header
- // extensions are negotiated per track ("m= line") in the SDP, but we have
- // no notion of tracks at the Call level. We therefore store the RTP header
- // extensions per SSRC instead, which leads to some storage overhead.
- RtpHeaderExtensionMap extensions;
- // Set if both RTP extension the RTCP feedback message needed for
- // send side BWE are negotiated.
- bool use_send_side_bwe = false;
- };
- std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
+ std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
GUARDED_BY(receive_crit_);
std::unique_ptr<RWLockWrapper> send_crit_;
@@ -308,6 +275,15 @@ class Call : public webrtc::Call,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
ReceiveSideCongestionController receive_side_cc_;
+ // TODO(nisse): Currently we always use separate demuxers. These
+ // should be created and owned outside of Call, passing pointers
+ // when Call is created. Then we should have two separate objects in
+ // the unbundled case, and two pointers to the same object in the
+ // bundled case.
+ std::unique_ptr<RtpTransportControllerReceiveInterface>
+ rtp_transport_receive_audio_ GUARDED_BY(receive_crit_);
+ std::unique_ptr<RtpTransportControllerReceiveInterface>
+ rtp_transport_receive_video_ GUARDED_BY(receive_crit_);
const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
const int64_t start_ms_;
// TODO(perkj): |worker_queue_| is supposed to replace
@@ -364,6 +340,14 @@ Call::Call(const Call::Config& config,
estimated_send_bitrate_kbps_counter_(clock_, nullptr, true),
pacer_bitrate_kbps_counter_(clock_, nullptr, true),
receive_side_cc_(clock_, transport_send->packet_router()),
+ rtp_transport_receive_audio_(
+ RtpTransportControllerReceiveInterface::Create(
+ &receive_side_cc_,
+ false /* enable_receive_side_bwe */)),
+ rtp_transport_receive_video_(
+ RtpTransportControllerReceiveInterface::Create(
+ &receive_side_cc_,
+ true /* enable_receive_side_bwe */)),
video_send_delay_stats_(new SendDelayStats(clock_)),
start_ms_(clock_->TimeInMilliseconds()),
worker_queue_("call_worker_queue") {
@@ -406,9 +390,6 @@ Call::~Call() {
RTC_CHECK(audio_send_ssrcs_.empty());
RTC_CHECK(video_send_ssrcs_.empty());
RTC_CHECK(video_send_streams_.empty());
- RTC_CHECK(audio_receive_ssrcs_.empty());
- RTC_CHECK(video_receive_ssrcs_.empty());
- RTC_CHECK(video_receive_streams_.empty());
pacer_thread_->Stop();
pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer());
@@ -435,29 +416,6 @@ Call::~Call() {
Trace::ReturnTrace();
}
-rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
- const uint8_t* packet,
- size_t length,
- const PacketTime& packet_time) {
- RtpPacketReceived parsed_packet;
- if (!parsed_packet.Parse(packet, length))
- return rtc::Optional<RtpPacketReceived>();
-
- auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
- if (it != receive_rtp_config_.end())
- parsed_packet.IdentifyExtensions(it->second.extensions);
-
- int64_t arrival_time_ms;
- if (packet_time.timestamp != -1) {
- arrival_time_ms = (packet_time.timestamp + 500) / 1000;
- } else {
- arrival_time_ms = clock_->TimeInMilliseconds();
- }
- parsed_packet.set_arrival_time_ms(arrival_time_ms);
-
- return rtc::Optional<RtpPacketReceived>(std::move(parsed_packet));
-}
-
void Call::UpdateHistograms() {
RTC_HISTOGRAM_COUNTS_100000(
"WebRTC.Call.LifetimeInSeconds",
@@ -595,14 +553,15 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
AudioReceiveStream* receive_stream =
new AudioReceiveStream(transport_send_->packet_router(), config,
config_.audio_state, event_log_);
+ RtpTransportControllerReceiveInterface::Config receive_config;
+ receive_config.use_send_side_bwe = UseSendSideBwe(config);
+
{
WriteLockScoped write_lock(*receive_crit_);
- RTC_DCHECK(audio_receive_ssrcs_.find(config.rtp.remote_ssrc) ==
- audio_receive_ssrcs_.end());
- audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
- receive_rtp_config_[config.rtp.remote_ssrc] =
- ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
+ rtp_transport_receive_audio_->AddReceiver(
+ config.rtp.remote_ssrc, receive_config, receive_stream);
+ audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
ConfigureSync(config.sync_group);
}
{
@@ -626,10 +585,10 @@ void Call::DestroyAudioReceiveStream(
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
{
WriteLockScoped write_lock(*receive_crit_);
+ rtp_transport_receive_audio_->RemoveReceiver(audio_receive_stream);
+
const AudioReceiveStream::Config& config = audio_receive_stream->config();
uint32_t ssrc = config.rtp.remote_ssrc;
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(ssrc);
size_t num_deleted = audio_receive_ssrcs_.erase(ssrc);
RTC_DCHECK(num_deleted == 1);
const std::string& sync_group = audio_receive_stream->config().sync_group;
@@ -639,7 +598,6 @@ void Call::DestroyAudioReceiveStream(
sync_stream_mapping_.erase(it);
ConfigureSync(sync_group);
}
- receive_rtp_config_.erase(ssrc);
}
UpdateAggregateNetworkState();
delete audio_receive_stream;
@@ -724,22 +682,26 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
module_process_thread_.get(), call_stats_.get());
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
- ReceiveRtpConfig receive_config(config.rtp.extensions,
- UseSendSideBwe(config));
+ RtpTransportControllerReceiveInterface::Config receive_config;
+ receive_config.use_send_side_bwe = UseSendSideBwe(config);
+
{
WriteLockScoped write_lock(*receive_crit_);
RTC_DCHECK(video_receive_ssrcs_.find(config.rtp.remote_ssrc) ==
video_receive_ssrcs_.end());
video_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
+ rtp_transport_receive_video_->AddReceiver(
+ config.rtp.remote_ssrc, receive_config, receive_stream);
+
if (config.rtp.rtx_ssrc) {
video_receive_ssrcs_[config.rtp.rtx_ssrc] = receive_stream;
// We record identical config for the rtx stream as for the main
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
// that is unlikely to matter in practice.
- receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config;
+ rtp_transport_receive_video_->AddReceiver(
+ config.rtp.rtx_ssrc, receive_config, receive_stream);
}
- receive_rtp_config_[config.rtp.remote_ssrc] = receive_config;
video_receive_streams_.insert(receive_stream);
ConfigureSync(config.sync_group);
}
@@ -765,7 +727,6 @@ void Call::DestroyVideoReceiveStream(
if (receive_stream_impl != nullptr)
RTC_DCHECK(receive_stream_impl == it->second);
receive_stream_impl = it->second;
- receive_rtp_config_.erase(it->first);
it = video_receive_ssrcs_.erase(it);
} else {
++it;
@@ -774,11 +735,8 @@ void Call::DestroyVideoReceiveStream(
video_receive_streams_.erase(receive_stream_impl);
RTC_CHECK(receive_stream_impl != nullptr);
ConfigureSync(receive_stream_impl->config().sync_group);
+ rtp_transport_receive_video_->RemoveReceiver(receive_stream_impl);
}
- const VideoReceiveStream::Config& config = receive_stream_impl->config();
-
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(config.rtp.remote_ssrc);
UpdateAggregateNetworkState();
delete receive_stream_impl;
@@ -794,24 +752,17 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
module_process_thread_.get());
+ RtpTransportControllerReceiveInterface::Config receive_config;
+ receive_config.use_send_side_bwe = UseSendSideBwe(config);
+
{
WriteLockScoped write_lock(*receive_crit_);
+ rtp_transport_receive_video_->AddReceiver(config.remote_ssrc,
+ receive_config, receive_stream);
- RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) ==
- flexfec_receive_streams_.end());
- flexfec_receive_streams_.insert(receive_stream);
-
- for (auto ssrc : config.protected_media_ssrcs)
- flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
-
- RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) ==
- flexfec_receive_ssrcs_protection_.end());
- flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream;
-
- RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
- receive_rtp_config_.end());
- receive_rtp_config_[config.remote_ssrc] =
- ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config));
+ for (auto ssrc : config.protected_media_ssrcs) {
+ rtp_transport_receive_video_->AddSink(ssrc, receive_stream);
+ }
}
// TODO(brandtr): Store config in RtcEventLog here.
@@ -830,33 +781,8 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
{
WriteLockScoped write_lock(*receive_crit_);
-
- const FlexfecReceiveStream::Config& config =
- receive_stream_impl->GetConfig();
- uint32_t ssrc = config.remote_ssrc;
- receive_rtp_config_.erase(ssrc);
-
- // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
- // destroyed.
- auto prot_it = flexfec_receive_ssrcs_protection_.begin();
- while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
- if (prot_it->second == receive_stream_impl)
- prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
- else
- ++prot_it;
- }
- auto media_it = flexfec_receive_ssrcs_media_.begin();
- while (media_it != flexfec_receive_ssrcs_media_.end()) {
- if (media_it->second == receive_stream_impl)
- media_it = flexfec_receive_ssrcs_media_.erase(media_it);
- else
- ++media_it;
- }
-
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(ssrc);
-
- flexfec_receive_streams_.erase(receive_stream_impl);
+ rtp_transport_receive_video_->RemoveSink(receive_stream_impl);
+ rtp_transport_receive_video_->RemoveReceiver(receive_stream_impl);
}
delete receive_stream_impl;
@@ -873,8 +799,12 @@ Call::Stats Call::GetStats() const {
&send_bandwidth);
std::vector<unsigned int> ssrcs;
uint32_t recv_bandwidth = 0;
+
+ // TODO(nisse): Is this thread safe? Most access to |receive_side_cc_| is done
+ // via |rtp_transport_receive_|, and protected by |receive_crit_|.
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth);
+
stats.send_bandwidth_bps = send_bandwidth;
stats.recv_bandwidth_bps = recv_bandwidth;
stats.pacer_delay_ms =
@@ -1214,57 +1144,27 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO);
- ReadLockScoped read_lock(*receive_crit_);
- // TODO(nisse): We should parse the RTP header only here, and pass
- // on parsed_packet to the receive streams.
- rtc::Optional<RtpPacketReceived> parsed_packet =
- ParseRtpPacket(packet, length, packet_time);
-
- if (!parsed_packet)
- return DELIVERY_PACKET_ERROR;
-
- NotifyBweOfReceivedPacket(*parsed_packet, media_type);
+ int64_t arrival_time_ms;
+ if (packet_time.timestamp != -1) {
+ arrival_time_ms = (packet_time.timestamp + 500) / 1000;
+ } else {
+ arrival_time_ms = clock_->TimeInMilliseconds();
+ }
- uint32_t ssrc = parsed_packet->Ssrc();
+ ReadLockScoped read_lock(*receive_crit_);
+ received_bytes_per_second_counter_.Add(static_cast<int>(length));
if (media_type == MediaType::AUDIO) {
- auto it = audio_receive_ssrcs_.find(ssrc);
- if (it != audio_receive_ssrcs_.end()) {
- received_bytes_per_second_counter_.Add(static_cast<int>(length));
- received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
- it->second->OnRtpPacket(*parsed_packet);
- event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
- return DELIVERY_OK;
- }
- }
- if (media_type == MediaType::VIDEO) {
- auto it = video_receive_ssrcs_.find(ssrc);
- if (it != video_receive_ssrcs_.end()) {
- received_bytes_per_second_counter_.Add(static_cast<int>(length));
- received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
- it->second->OnRtpPacket(*parsed_packet);
-
- // Deliver media packets to FlexFEC subsystem.
- auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
- for (auto it = it_bounds.first; it != it_bounds.second; ++it)
- it->second->OnRtpPacket(*parsed_packet);
-
- event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
- return DELIVERY_OK;
- }
- }
- if (media_type == MediaType::VIDEO) {
- received_bytes_per_second_counter_.Add(static_cast<int>(length));
- // TODO(brandtr): Update here when FlexFEC supports protecting audio.
+ received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
+ return rtp_transport_receive_audio_->OnRtpPacket(
+ arrival_time_ms, rtc::ArrayView<const uint8_t>(packet, length));
+ } else if (media_type == MediaType::VIDEO) {
received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
- auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
- if (it != flexfec_receive_ssrcs_protection_.end()) {
- it->second->OnRtpPacket(*parsed_packet);
- event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
- return DELIVERY_OK;
- }
+ return rtp_transport_receive_video_->OnRtpPacket(
+ arrival_time_ms, rtc::ArrayView<const uint8_t>(packet, length));
}
- return DELIVERY_UNKNOWN_SSRC;
+ RTC_NOTREACHED();
+ return PacketReceiver::DELIVERY_PACKET_ERROR;
}
PacketReceiver::DeliveryStatus Call::DeliverPacket(
@@ -1284,6 +1184,9 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
// TODO(brandtr): Update this member function when we support protecting
// audio packets with FlexFEC.
+
+// TODO(nisse): Add a recovered flag to RtpParsedPacket, if needed for stats,
+// and demux recovered packets in the same way as ordinary packets.
bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]);
ReadLockScoped read_lock(*receive_crit_);
@@ -1293,34 +1196,6 @@ bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
return it->second->OnRecoveredPacket(packet, length);
}
-void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
- MediaType media_type) {
- auto it = receive_rtp_config_.find(packet.Ssrc());
- bool use_send_side_bwe =
- (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
-
- RTPHeader header;
- packet.GetHeader(&header);
-
- if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) {
- // Inconsistent configuration of send side BWE. Do nothing.
- // TODO(nisse): Without this check, we may produce RTCP feedback
- // packets even when not negotiated. But it would be cleaner to
- // move the check down to RTCPSender::SendFeedbackPacket, which
- // would also help the PacketRouter to select an appropriate rtp
- // module in the case that some, but not all, have RTCP feedback
- // enabled.
- return;
- }
- // For audio, we only support send side BWE.
- if (media_type == MediaType::VIDEO ||
- (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
- receive_side_cc_.OnReceivedPacket(
- packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
- header);
- }
-}
-
} // namespace internal
} // namespace webrtc

Powered by Google App Engine
This is Rietveld 408576698