Let FlexfecReceiveStreamImpl send RTCP RRs.
This CL adds an RTP module to FlexfecReceiveStreamImpl, and wires it up
to send RTCP RRs. It further makes some methods take const refs instead
of values, to make it more clear where packet copies are made. This
change reduces the number of copies by one, for the case when media
packets are added to the FlexFEC receiver.
The end-to-end test is modified to check for RTCP RRs being sent.
Part of this modification involves some indentation changes, and the
diff thus looks bigger than it logically is.
BUG=webrtc:5654
Review-Url: https://codereview.webrtc.org/2625633003
Cr-Commit-Position: refs/heads/master@{#16106}
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index c7998b2..48072d0 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -702,8 +702,9 @@
RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
RecoveredPacketReceiver* recovered_packet_receiver = this;
- FlexfecReceiveStreamImpl* receive_stream =
- new FlexfecReceiveStreamImpl(config, recovered_packet_receiver);
+ FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl(
+ config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
+ module_process_thread_.get());
{
WriteLockScoped write_lock(*receive_crit_);
@@ -1165,10 +1166,9 @@
ParseRtpPacket(packet, length, packet_time);
if (parsed_packet) {
NotifyBweOfReceivedPacket(*parsed_packet);
- auto status =
- it->second->AddAndProcessReceivedPacket(std::move(*parsed_packet))
- ? DELIVERY_OK
- : DELIVERY_PACKET_ERROR;
+ auto status = it->second->AddAndProcessReceivedPacket(*parsed_packet)
+ ? DELIVERY_OK
+ : DELIVERY_PACKET_ERROR;
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc
index 95bcfc1..f5272c9 100644
--- a/webrtc/call/flexfec_receive_stream_impl.cc
+++ b/webrtc/call/flexfec_receive_stream_impl.cc
@@ -10,10 +10,16 @@
#include "webrtc/call/flexfec_receive_stream_impl.h"
-#include <utility>
+#include <string>
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
+#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
+#include "webrtc/modules/rtp_rtcp/include/receive_statistics.h"
+#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
+#include "webrtc/modules/utility/include/process_thread.h"
+#include "webrtc/system_wrappers/include/clock.h"
namespace webrtc {
@@ -97,33 +103,77 @@
recovered_packet_receiver));
}
+std::unique_ptr<RtpRtcp> CreateRtpRtcpModule(
+ ReceiveStatistics* receive_statistics,
+ Transport* rtcp_send_transport,
+ RtcpRttStats* rtt_stats) {
+ RtpRtcp::Configuration configuration;
+ configuration.audio = false;
+ configuration.receiver_only = true;
+ configuration.clock = Clock::GetRealTimeClock();
+ configuration.receive_statistics = receive_statistics;
+ configuration.outgoing_transport = rtcp_send_transport;
+ configuration.rtt_stats = rtt_stats;
+ std::unique_ptr<RtpRtcp> rtp_rtcp(RtpRtcp::CreateRtpRtcp(configuration));
+ return rtp_rtcp;
+}
+
} // namespace
FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
const Config& config,
- RecoveredPacketReceiver* recovered_packet_receiver)
- : started_(false),
- config_(config),
- receiver_(
- MaybeCreateFlexfecReceiver(config_, recovered_packet_receiver)) {
+ RecoveredPacketReceiver* recovered_packet_receiver,
+ RtcpRttStats* rtt_stats,
+ ProcessThread* process_thread)
+ : config_(config),
+ started_(false),
+ receiver_(MaybeCreateFlexfecReceiver(config_, recovered_packet_receiver)),
+ rtp_receive_statistics_(
+ ReceiveStatistics::Create(Clock::GetRealTimeClock())),
+ rtp_rtcp_(CreateRtpRtcpModule(rtp_receive_statistics_.get(),
+ config_.rtcp_send_transport,
+ rtt_stats)),
+ process_thread_(process_thread) {
LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString();
+
+ // RTCP reporting.
+ rtp_rtcp_->SetSendingMediaStatus(false);
+ rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
+ rtp_rtcp_->SetSSRC(config_.local_ssrc);
+ process_thread_->RegisterModule(rtp_rtcp_.get());
}
FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
LOG(LS_INFO) << "~FlexfecReceiveStreamImpl: " << config_.ToString();
Stop();
+ process_thread_->DeRegisterModule(rtp_rtcp_.get());
}
bool FlexfecReceiveStreamImpl::AddAndProcessReceivedPacket(
- RtpPacketReceived packet) {
+ const RtpPacketReceived& packet) {
{
rtc::CritScope cs(&crit_);
if (!started_)
return false;
}
+
if (!receiver_)
return false;
- return receiver_->AddAndProcessReceivedPacket(std::move(packet));
+
+ if (!receiver_->AddAndProcessReceivedPacket(packet))
+ return false;
+
+ // Do not report media packets in the RTCP RRs generated by |rtp_rtcp_|.
+ if (packet.Ssrc() == config_.remote_ssrc) {
+ RTPHeader header;
+ packet.GetHeader(&header);
+ // FlexFEC packets are never retransmitted.
+ const bool kNotRetransmitted = false;
+ rtp_receive_statistics_->IncomingPacket(header, packet.size(),
+ kNotRetransmitted);
+ }
+
+ return true;
}
void FlexfecReceiveStreamImpl::Start() {
diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h
index 7267dc0..36ea623 100644
--- a/webrtc/call/flexfec_receive_stream_impl.h
+++ b/webrtc/call/flexfec_receive_stream_impl.h
@@ -12,25 +12,31 @@
#define WEBRTC_CALL_FLEXFEC_RECEIVE_STREAM_IMPL_H_
#include <memory>
-#include <string>
-#include "webrtc/base/basictypes.h"
#include "webrtc/base/criticalsection.h"
#include "webrtc/call/flexfec_receive_stream.h"
-#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
-#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
namespace webrtc {
+class FlexfecReceiver;
+class ProcessThread;
+class ReceiveStatistics;
+class RecoveredPacketReceiver;
+class RtcpRttStats;
+class RtpPacketReceived;
+class RtpRtcp;
+
class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
public:
FlexfecReceiveStreamImpl(const Config& config,
- RecoveredPacketReceiver* recovered_packet_receiver);
+ RecoveredPacketReceiver* recovered_packet_receiver,
+ RtcpRttStats* rtt_stats,
+ ProcessThread* process_thread);
~FlexfecReceiveStreamImpl() override;
const Config& GetConfig() const { return config_; }
- bool AddAndProcessReceivedPacket(RtpPacketReceived packet);
+ bool AddAndProcessReceivedPacket(const RtpPacketReceived& packet);
// Implements FlexfecReceiveStream.
void Start() override;
@@ -38,11 +44,18 @@
Stats GetStats() const override;
private:
- rtc::CriticalSection crit_;
- bool started_ GUARDED_BY(crit_);
-
+ // Config.
const Config config_;
+ bool started_ GUARDED_BY(crit_);
+ rtc::CriticalSection crit_;
+
+ // Erasure code interfacing.
const std::unique_ptr<FlexfecReceiver> receiver_;
+
+ // RTCP reporting.
+ const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
+ const std::unique_ptr<RtpRtcp> rtp_rtcp_;
+ ProcessThread* process_thread_;
};
} // namespace webrtc
diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc
index 2402636..2365811 100644
--- a/webrtc/call/flexfec_receive_stream_unittest.cc
+++ b/webrtc/call/flexfec_receive_stream_unittest.cc
@@ -12,10 +12,13 @@
#include "webrtc/base/array_view.h"
#include "webrtc/call/flexfec_receive_stream_impl.h"
+#include "webrtc/modules/pacing/packet_router.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
#include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h"
+#include "webrtc/modules/rtp_rtcp/mocks/mock_rtcp_rtt_stats.h"
#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_header_extensions.h"
+#include "webrtc/modules/utility/include/mock/mock_process_thread.h"
#include "webrtc/test/gmock.h"
#include "webrtc/test/gtest.h"
#include "webrtc/test/mock_transport.h"
@@ -74,11 +77,16 @@
protected:
FlexfecReceiveStreamTest()
: config_(CreateDefaultConfig(&rtcp_send_transport_)),
- receive_stream_(config_, &recovered_packet_receiver_) {}
+ receive_stream_(config_,
+ &recovered_packet_receiver_,
+ &rtt_stats_,
+ &process_thread_) {}
+ MockTransport rtcp_send_transport_;
FlexfecReceiveStream::Config config_;
MockRecoveredPacketReceiver recovered_packet_receiver_;
- MockTransport rtcp_send_transport_;
+ MockRtcpRttStats rtt_stats_;
+ MockProcessThread process_thread_;
FlexfecReceiveStreamImpl receive_stream_;
};
@@ -126,7 +134,8 @@
// clang-format on
testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
- FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver);
+ FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver,
+ &rtt_stats_, &process_thread_);
// Do not call back before being started.
receive_stream.AddAndProcessReceivedPacket(ParsePacket(kFlexfecPacket));
diff --git a/webrtc/media/engine/webrtcvideoengine2.cc b/webrtc/media/engine/webrtcvideoengine2.cc
index 996e2bd..b0d3b89 100644
--- a/webrtc/media/engine/webrtcvideoengine2.cc
+++ b/webrtc/media/engine/webrtcvideoengine2.cc
@@ -2219,6 +2219,7 @@
}
config_.rtp.local_ssrc = local_ssrc;
+ flexfec_config_.local_ssrc = local_ssrc;
LOG(LS_INFO)
<< "RecreateWebRtcStream (recv) because of SetLocalSsrc; local_ssrc="
<< local_ssrc;
@@ -2246,6 +2247,7 @@
config_.rtp.nack.rtp_history_ms = nack_history_ms;
config_.rtp.transport_cc = transport_cc_enabled;
config_.rtp.rtcp_mode = rtcp_mode;
+ flexfec_config_.rtcp_mode = rtcp_mode;
LOG(LS_INFO)
<< "RecreateWebRtcStream (recv) because of SetFeedbackParameters; nack="
<< nack_enabled << ", remb=" << remb_enabled
diff --git a/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h b/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h
index 9ad0931..a986fbb 100644
--- a/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h
+++ b/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h
@@ -43,13 +43,13 @@
// Inserts a received packet (can be either media or FlexFEC) into the
// internal buffer, and sends the received packets to the erasure code.
// All newly recovered packets are sent back through the callback.
- bool AddAndProcessReceivedPacket(RtpPacketReceived packet);
+ bool AddAndProcessReceivedPacket(const RtpPacketReceived& packet);
// Returns a counter describing the added and recovered packets.
FecPacketCounter GetPacketCounter() const;
private:
- bool AddReceivedPacket(RtpPacketReceived packet);
+ bool AddReceivedPacket(const RtpPacketReceived& packet);
bool ProcessReceivedPackets();
// Config.
diff --git a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
index 80f5b1f..b81a039 100644
--- a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
+++ b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
@@ -45,7 +45,8 @@
FlexfecReceiver::~FlexfecReceiver() = default;
-bool FlexfecReceiver::AddAndProcessReceivedPacket(RtpPacketReceived packet) {
+bool FlexfecReceiver::AddAndProcessReceivedPacket(
+ const RtpPacketReceived& packet) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
if (!AddReceivedPacket(std::move(packet))) {
return false;
@@ -58,7 +59,7 @@
return packet_counter_;
}
-bool FlexfecReceiver::AddReceivedPacket(RtpPacketReceived packet) {
+bool FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
// RTP packets with a full base header (12 bytes), but without payload,
diff --git a/webrtc/test/call_test.cc b/webrtc/test/call_test.cc
index 290992a..9521d34 100644
--- a/webrtc/test/call_test.cc
+++ b/webrtc/test/call_test.cc
@@ -279,6 +279,7 @@
config.payload_type = kFlexfecPayloadType;
config.remote_ssrc = kFlexfecSendSsrc;
config.protected_media_ssrcs = {kVideoSendSsrcs[0]};
+ config.local_ssrc = kReceiverLocalVideoSsrc;
for (const RtpExtension& extension : video_send_config_.rtp.extensions)
config.rtp_header_extensions.push_back(extension);
flexfec_receive_configs_.push_back(config);
diff --git a/webrtc/test/call_test.h b/webrtc/test/call_test.h
index 03bc5bb..b4101a4 100644
--- a/webrtc/test/call_test.h
+++ b/webrtc/test/call_test.h
@@ -56,7 +56,6 @@
static const uint32_t kFlexfecSendSsrc;
static const uint32_t kReceiverLocalVideoSsrc;
static const uint32_t kReceiverLocalAudioSsrc;
- static const uint32_t kReceiverLocalFlexfecSsrc;
static const int kNackRtpHistoryMs;
protected:
diff --git a/webrtc/video/end_to_end_tests.cc b/webrtc/video/end_to_end_tests.cc
index 946d81d..502851b 100644
--- a/webrtc/video/end_to_end_tests.cc
+++ b/webrtc/video/end_to_end_tests.cc
@@ -705,84 +705,126 @@
RunBaseTest(&test);
}
-TEST_P(EndToEndTest, CanReceiveFlexfec) {
- class FlexfecRenderObserver : public test::EndToEndTest,
- public rtc::VideoSinkInterface<VideoFrame> {
- public:
- FlexfecRenderObserver()
- : EndToEndTest(kDefaultTimeoutMs), random_(0xcafef00d1) {}
+class FlexfecRenderObserver : public test::EndToEndTest,
+ public rtc::VideoSinkInterface<VideoFrame> {
+ public:
+ static constexpr uint32_t kVideoLocalSsrc = 123;
+ static constexpr uint32_t kFlexfecLocalSsrc = 456;
- size_t GetNumFlexfecStreams() const override { return 1; }
+ explicit FlexfecRenderObserver(bool expect_flexfec_rtcp)
+ : test::EndToEndTest(test::CallTest::kDefaultTimeoutMs),
+ expect_flexfec_rtcp_(expect_flexfec_rtcp),
+ received_flexfec_rtcp_(false),
+ random_(0xcafef00d1) {}
- private:
- Action OnSendRtp(const uint8_t* packet, size_t length) override {
- rtc::CritScope lock(&crit_);
- RTPHeader header;
- EXPECT_TRUE(parser_->Parse(packet, length, &header));
+ size_t GetNumFlexfecStreams() const override { return 1; }
- uint8_t payload_type = header.payloadType;
- if (payload_type != kFakeVideoSendPayloadType) {
- EXPECT_EQ(kFlexfecPayloadType, payload_type);
- }
+ private:
+ Action OnSendRtp(const uint8_t* packet, size_t length) override {
+ rtc::CritScope lock(&crit_);
+ RTPHeader header;
+ EXPECT_TRUE(parser_->Parse(packet, length, &header));
- // Is this a retransmitted media packet? From the perspective of FEC, this
- // packet is then no longer dropped, so remove it from the list of
- // dropped packets.
- if (payload_type == kFakeVideoSendPayloadType) {
- auto seq_num_it = dropped_sequence_numbers_.find(header.sequenceNumber);
- if (seq_num_it != dropped_sequence_numbers_.end()) {
- dropped_sequence_numbers_.erase(seq_num_it);
- auto ts_it = dropped_timestamps_.find(header.timestamp);
- EXPECT_NE(ts_it, dropped_timestamps_.end());
- dropped_timestamps_.erase(ts_it);
-
- return SEND_PACKET;
- }
- }
-
- // Simulate 5% packet loss. Record what media packets, and corresponding
- // timestamps, that were dropped.
- if (random_.Rand(1, 100) <= 5) {
- if (payload_type == kFakeVideoSendPayloadType) {
- dropped_sequence_numbers_.insert(header.sequenceNumber);
- dropped_timestamps_.insert(header.timestamp);
- }
-
- return DROP_PACKET;
- }
-
- return SEND_PACKET;
+ uint8_t payload_type = header.payloadType;
+ if (payload_type != test::CallTest::kFakeVideoSendPayloadType) {
+ EXPECT_EQ(test::CallTest::kFlexfecPayloadType, payload_type);
}
- void OnFrame(const VideoFrame& video_frame) override {
- rtc::CritScope lock(&crit_);
- // Rendering frame with timestamp of packet that was dropped -> FEC
- // protection worked.
- auto it = dropped_timestamps_.find(video_frame.timestamp());
- if (it != dropped_timestamps_.end())
+ // Is this a retransmitted media packet? From the perspective of FEC, this
+ // packet is then no longer dropped, so remove it from the list of
+ // dropped packets.
+ if (payload_type == test::CallTest::kFakeVideoSendPayloadType) {
+ auto seq_num_it = dropped_sequence_numbers_.find(header.sequenceNumber);
+ if (seq_num_it != dropped_sequence_numbers_.end()) {
+ dropped_sequence_numbers_.erase(seq_num_it);
+ auto ts_it = dropped_timestamps_.find(header.timestamp);
+ EXPECT_NE(ts_it, dropped_timestamps_.end());
+ dropped_timestamps_.erase(ts_it);
+
+ return SEND_PACKET;
+ }
+ }
+
+ // Simulate 5% packet loss. Record what media packets, and corresponding
+ // timestamps, that were dropped.
+ if (random_.Rand(1, 100) <= 5) {
+ if (payload_type == test::CallTest::kFakeVideoSendPayloadType) {
+ dropped_sequence_numbers_.insert(header.sequenceNumber);
+ dropped_timestamps_.insert(header.timestamp);
+ }
+
+ return DROP_PACKET;
+ }
+
+ return SEND_PACKET;
+ }
+
+ Action OnReceiveRtcp(const uint8_t* data, size_t length) override {
+ test::RtcpPacketParser parser;
+
+ parser.Parse(data, length);
+ if (parser.sender_ssrc() == kFlexfecLocalSsrc) {
+ EXPECT_EQ(1, parser.receiver_report()->num_packets());
+ const std::vector<rtcp::ReportBlock>& report_blocks =
+ parser.receiver_report()->report_blocks();
+ if (!report_blocks.empty()) {
+ EXPECT_EQ(1U, report_blocks.size());
+ EXPECT_EQ(test::CallTest::kFlexfecSendSsrc,
+ report_blocks[0].source_ssrc());
+ received_flexfec_rtcp_ = true;
+ }
+ }
+
+ return SEND_PACKET;
+ }
+
+ void OnFrame(const VideoFrame& video_frame) override {
+ rtc::CritScope lock(&crit_);
+ // Rendering frame with timestamp of packet that was dropped -> FEC
+ // protection worked.
+ auto it = dropped_timestamps_.find(video_frame.timestamp());
+ if (it != dropped_timestamps_.end()) {
+ if (!expect_flexfec_rtcp_ || received_flexfec_rtcp_) {
observation_complete_.Set();
+ }
}
+ }
- void ModifyVideoConfigs(
- VideoSendStream::Config* send_config,
- std::vector<VideoReceiveStream::Config>* receive_configs,
- VideoEncoderConfig* encoder_config) override {
- (*receive_configs)[0].renderer = this;
- }
+ void ModifyVideoConfigs(
+ VideoSendStream::Config* send_config,
+ std::vector<VideoReceiveStream::Config>* receive_configs,
+ VideoEncoderConfig* encoder_config) override {
+ (*receive_configs)[0].rtp.local_ssrc = kVideoLocalSsrc;
+ (*receive_configs)[0].renderer = this;
+ }
- void PerformTest() override {
- EXPECT_TRUE(Wait())
- << "Timed out waiting for dropped frames to be rendered.";
- }
+ void ModifyFlexfecConfigs(
+ std::vector<FlexfecReceiveStream::Config>* receive_configs) override {
+ (*receive_configs)[0].local_ssrc = kFlexfecLocalSsrc;
+ }
- rtc::CriticalSection crit_;
- std::set<uint32_t> dropped_sequence_numbers_ GUARDED_BY(crit_);
- // Since several packets can have the same timestamp a multiset is used
- // instead of a set.
- std::multiset<uint32_t> dropped_timestamps_ GUARDED_BY(crit_);
- Random random_;
- } test;
+ void PerformTest() override {
+ EXPECT_TRUE(Wait())
+ << "Timed out waiting for dropped frames to be rendered.";
+ }
+ rtc::CriticalSection crit_;
+ std::set<uint32_t> dropped_sequence_numbers_ GUARDED_BY(crit_);
+ // Since several packets can have the same timestamp a multiset is used
+ // instead of a set.
+ std::multiset<uint32_t> dropped_timestamps_ GUARDED_BY(crit_);
+ bool expect_flexfec_rtcp_;
+ bool received_flexfec_rtcp_;
+ Random random_;
+};
+
+TEST_P(EndToEndTest, ReceivesFlexfec) {
+ FlexfecRenderObserver test(false);
+ RunBaseTest(&test);
+}
+
+TEST_P(EndToEndTest, ReceivesFlexfecAndSendsCorrespondingRtcp) {
+ FlexfecRenderObserver test(true);
RunBaseTest(&test);
}
diff --git a/webrtc/video/video_quality_test.cc b/webrtc/video/video_quality_test.cc
index e3ae616..c438274 100644
--- a/webrtc/video/video_quality_test.cc
+++ b/webrtc/video/video_quality_test.cc
@@ -1121,6 +1121,7 @@
flexfec_receive_config.remote_ssrc = video_send_config_.rtp.flexfec.ssrc;
flexfec_receive_config.protected_media_ssrcs =
video_send_config_.rtp.flexfec.protected_media_ssrcs;
+ flexfec_receive_config.local_ssrc = kReceiverLocalVideoSsrc;
flexfec_receive_config.transport_cc = params_.call.send_side_bwe;
if (params_.call.send_side_bwe) {
flexfec_receive_config.rtp_header_extensions.push_back(