Replace the implementation of `GetContributingSources()` on the video side.
This change replaces the `ContributingSources`-implementation of `GetContributingSources()` and `GetSynchronizationSources()` on the video side with the spec-compliant `SourceTracker`-implementation.
The most noticeable impact is that the per-frame dictionaries are now updated when frames are delivered to the RTCRtpReceiver's MediaStreamTrack rather than when RTP packets are received on the network.
Bug: webrtc:10545
Change-Id: I895b5790280ac94c1501801d226c643633c67349
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/143177
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Chen Xing <chxg@google.com>
Cr-Commit-Position: refs/heads/master@{#28386}
diff --git a/media/engine/webrtc_video_engine_unittest.cc b/media/engine/webrtc_video_engine_unittest.cc
index f7b2a2f..bb989f4 100644
--- a/media/engine/webrtc_video_engine_unittest.cc
+++ b/media/engine/webrtc_video_engine_unittest.cc
@@ -60,6 +60,8 @@
#include "test/gmock.h"
using ::testing::Field;
+using ::testing::IsEmpty;
+using ::testing::SizeIs;
using webrtc::BitrateConstraints;
using webrtc::RtpExtension;
@@ -7514,137 +7516,44 @@
false);
}
-class WebRtcVideoFakeClock {
- public:
- WebRtcVideoFakeClock() {
- fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1)); // avoid time=0
- }
- rtc::ScopedFakeClock fake_clock_;
-};
+TEST_F(WebRtcVideoChannelBaseTest, GetSources) {
+ EXPECT_THAT(channel_->GetSources(kSsrc), IsEmpty());
-// The fake clock needs to be initialized before the call, and not
-// destroyed until after all threads spawned by the test have been stopped.
-// This mixin ensures that.
-class WebRtcVideoChannelTestWithClock : public WebRtcVideoFakeClock,
- public WebRtcVideoChannelBaseTest {};
-
-TEST_F(WebRtcVideoChannelTestWithClock, GetSources) {
- uint8_t data1[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
-
- EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-
- rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
- rtc::SetBE32(packet1.data() + 8, kSsrc);
- channel_->SetSink(kDefaultReceiveSsrc, NULL);
+ EXPECT_TRUE(channel_->SetSink(kDefaultReceiveSsrc, &renderer_));
EXPECT_TRUE(SetDefaultCodec());
EXPECT_TRUE(SetSend(true));
- EXPECT_EQ(0, renderer_.num_rendered_frames());
- channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
+ EXPECT_EQ(renderer_.num_rendered_frames(), 0);
- std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
- EXPECT_EQ(1u, sources.size());
- EXPECT_EQ(webrtc::RtpSourceType::SSRC, sources[0].source_type());
- int64_t timestamp1 = sources[0].timestamp_ms();
+ // Send and receive one frame.
+ SendFrame();
+ EXPECT_FRAME_WAIT(1, kVideoWidth, kVideoHeight, kTimeout);
- // a new packet.
- int64_t timeDeltaMs = 1;
- fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
- channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
- int64_t timestamp2 = channel_->GetSources(kSsrc)[0].timestamp_ms();
- EXPECT_EQ(timestamp2, timestamp1 + timeDeltaMs);
+ EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
+ EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
+ EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
- // It only keeps 10s of history.
- fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
- fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
- EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-}
+ webrtc::RtpSource source = channel_->GetSources(kSsrc)[0];
+ EXPECT_EQ(source.source_id(), kSsrc);
+ EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
+ int64_t rtp_timestamp_1 = source.rtp_timestamp();
+ int64_t timestamp_ms_1 = source.timestamp_ms();
-TEST_F(WebRtcVideoChannelTestWithClock, GetContributingSources) {
- uint8_t data1[] = {0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+ // Send and receive another frame.
+ SendFrame();
+ EXPECT_FRAME_WAIT(2, kVideoWidth, kVideoHeight, kTimeout);
- uint32_t kCsrc = 4321u;
- EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
- EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
+ EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
+ EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
+ EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
- rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
- rtc::SetBE32(packet1.data() + 8, kSsrc);
- rtc::SetBE32(packet1.data() + 12, kCsrc);
- channel_->SetSink(kDefaultReceiveSsrc, NULL);
- EXPECT_TRUE(SetDefaultCodec());
- EXPECT_TRUE(SetSend(true));
- EXPECT_EQ(0, renderer_.num_rendered_frames());
- channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
+ source = channel_->GetSources(kSsrc)[0];
+ EXPECT_EQ(source.source_id(), kSsrc);
+ EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
+ int64_t rtp_timestamp_2 = source.rtp_timestamp();
+ int64_t timestamp_ms_2 = source.timestamp_ms();
- {
- ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
- EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
- std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
- EXPECT_EQ(sources[0].timestamp_ms(), sources[1].timestamp_ms());
- // 1 SSRC and 1 CSRC.
- EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::SSRC;
- }));
- EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::CSRC;
- }));
- }
- int64_t timestamp1 = channel_->GetSources(kSsrc)[0].timestamp_ms();
-
- // a new packet with only ssrc (i.e no csrc).
- uint8_t data2[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
- rtc::CopyOnWriteBuffer packet2(data2, sizeof(data2));
- rtc::SetBE32(packet2.data() + 8, kSsrc);
-
- int64_t timeDeltaMs = 1;
- fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
- channel_->OnPacketReceived(packet2, /*packet_time_us=*/-1);
-
- {
- ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
- EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
- std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
- EXPECT_NE(sources[0].timestamp_ms(), sources[1].timestamp_ms());
- // 1 SSRC and 1 CSRC.
- EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::SSRC;
- }));
- EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::CSRC;
- }));
- auto ssrcSource =
- absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::SSRC;
- });
- auto csrcSource =
- absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::CSRC;
- });
-
- EXPECT_EQ(ssrcSource->timestamp_ms(), timestamp1 + timeDeltaMs);
- EXPECT_EQ(csrcSource->timestamp_ms(), timestamp1);
- }
-
- // It only keeps 10s of history.
- fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
-
- {
- ASSERT_EQ(1u, channel_->GetSources(kSsrc).size());
- EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
- std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
- EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::SSRC;
- }));
- EXPECT_EQ(0, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
- return source.source_type() == webrtc::RtpSourceType::CSRC;
- }));
- }
-
- fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
- EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
- EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
+ EXPECT_GT(rtp_timestamp_2, rtp_timestamp_1);
+ EXPECT_GT(timestamp_ms_2, timestamp_ms_1);
}
TEST_F(WebRtcVideoChannelTest, SetsRidsOnSendStream) {
diff --git a/video/rtp_video_stream_receiver.cc b/video/rtp_video_stream_receiver.cc
index ea0f663..237a4c2 100644
--- a/video/rtp_video_stream_receiver.cc
+++ b/video/rtp_video_stream_receiver.cc
@@ -307,7 +307,7 @@
return absl::nullopt;
}
{
- rtc::CritScope lock(&rtp_sources_lock_);
+ rtc::CritScope lock(&sync_info_lock_);
if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) {
return absl::nullopt;
}
@@ -423,14 +423,9 @@
// TODO(nisse): Exclude out-of-order packets?
int64_t now_ms = clock_->TimeInMilliseconds();
{
- rtc::CritScope cs(&rtp_sources_lock_);
+ rtc::CritScope cs(&sync_info_lock_);
last_received_rtp_timestamp_ = packet.Timestamp();
last_received_rtp_system_time_ms_ = now_ms;
-
- std::vector<uint32_t> csrcs = packet.Csrcs();
- contributing_sources_.Update(now_ms, csrcs,
- /* audio level */ absl::nullopt,
- packet.Timestamp());
}
// Periodically log the RTP header of incoming packets.
if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) {
@@ -894,22 +889,4 @@
sprop_decoder.pps_nalu());
}
-std::vector<webrtc::RtpSource> RtpVideoStreamReceiver::GetSources() const {
- int64_t now_ms = rtc::TimeMillis();
- std::vector<RtpSource> sources;
- {
- rtc::CritScope cs(&rtp_sources_lock_);
- sources = contributing_sources_.GetSources(now_ms);
- if (last_received_rtp_system_time_ms_ >=
- now_ms - ContributingSources::kHistoryMs) {
- RTC_DCHECK(last_received_rtp_timestamp_.has_value());
- sources.emplace_back(*last_received_rtp_system_time_ms_,
- config_.rtp.remote_ssrc, RtpSourceType::SSRC,
- /* audio_level */ absl::nullopt,
- *last_received_rtp_timestamp_);
- }
- }
- return sources;
-}
-
} // namespace webrtc
diff --git a/video/rtp_video_stream_receiver.h b/video/rtp_video_stream_receiver.h
index d574184..3202a31 100644
--- a/video/rtp_video_stream_receiver.h
+++ b/video/rtp_video_stream_receiver.h
@@ -175,8 +175,6 @@
void AddSecondarySink(RtpPacketSinkInterface* sink);
void RemoveSecondarySink(const RtpPacketSinkInterface* sink);
- std::vector<webrtc::RtpSource> GetSources() const;
-
private:
// Used for buffering RTCP feedback messages and sending them all together.
// Note:
@@ -298,14 +296,13 @@
std::vector<RtpPacketSinkInterface*> secondary_sinks_
RTC_GUARDED_BY(worker_task_checker_);
- // Info for GetSources and GetSyncInfo is updated on network or worker thread,
- // queried on the worker thread.
- rtc::CriticalSection rtp_sources_lock_;
- ContributingSources contributing_sources_ RTC_GUARDED_BY(&rtp_sources_lock_);
+ // Info for GetSyncInfo is updated on network or worker thread, and queried on
+ // the worker thread.
+ rtc::CriticalSection sync_info_lock_;
absl::optional<uint32_t> last_received_rtp_timestamp_
- RTC_GUARDED_BY(rtp_sources_lock_);
+ RTC_GUARDED_BY(sync_info_lock_);
absl::optional<int64_t> last_received_rtp_system_time_ms_
- RTC_GUARDED_BY(rtp_sources_lock_);
+ RTC_GUARDED_BY(sync_info_lock_);
// Used to validate the buffered frame decryptor is always run on the correct
// thread.
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index c8f14d8..b69a0be 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -191,6 +191,7 @@
"DecodingThread",
rtc::kHighestPriority),
call_stats_(call_stats),
+ source_tracker_(clock_),
stats_proxy_(&config_, clock_),
rtp_receive_statistics_(
ReceiveStatistics::Create(clock_, &stats_proxy_, &stats_proxy_)),
@@ -503,6 +504,7 @@
}
config_.renderer->OnFrame(video_frame);
+ source_tracker_.OnFrameDelivered(video_frame.packet_infos());
// TODO(tommi): OnRenderFrame grabs a lock too.
stats_proxy_.OnRenderedFrame(video_frame);
}
@@ -742,7 +744,7 @@
}
std::vector<webrtc::RtpSource> VideoReceiveStream::GetSources() const {
- return rtp_video_stream_receiver_.GetSources();
+ return source_tracker_.GetSources();
}
} // namespace internal
diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h
index 9d18218..3acba46 100644
--- a/video/video_receive_stream.h
+++ b/video/video_receive_stream.h
@@ -20,6 +20,7 @@
#include "call/syncable.h"
#include "call/video_receive_stream.h"
#include "modules/rtp_rtcp/include/flexfec_receiver.h"
+#include "modules/rtp_rtcp/source/source_tracker.h"
#include "modules/video_coding/frame_buffer2.h"
#include "modules/video_coding/video_coding_impl.h"
#include "rtc_base/synchronization/sequence_checker.h"
@@ -164,6 +165,7 @@
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
+ SourceTracker source_tracker_;
ReceiveStatisticsProxy stats_proxy_;
// Shared by media and rtx stream receivers, since the latter has no RtpRtcp
// module of its own.
diff --git a/video/video_receive_stream_unittest.cc b/video/video_receive_stream_unittest.cc
index d8a845e..005b077 100644
--- a/video/video_receive_stream_unittest.cc
+++ b/video/video_receive_stream_unittest.cc
@@ -8,6 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
+#include <algorithm>
#include <utility>
#include <vector>
@@ -37,7 +38,10 @@
namespace {
using ::testing::_;
+using ::testing::ElementsAreArray;
using ::testing::Invoke;
+using ::testing::IsEmpty;
+using ::testing::SizeIs;
constexpr int kDefaultTimeOutMs = 50;
@@ -107,14 +111,14 @@
null_decoder.decoder_factory = &null_decoder_factory_;
config_.decoders.push_back(null_decoder);
- Clock* clock = Clock::GetRealTimeClock();
- timing_ = new VCMTiming(clock);
+ clock_ = Clock::GetRealTimeClock();
+ timing_ = new VCMTiming(clock_);
video_receive_stream_ =
absl::make_unique<webrtc::internal::VideoReceiveStream>(
task_queue_factory_.get(), &rtp_stream_receiver_controller_,
kDefaultNumCpuCores, &packet_router_, config_.Copy(),
- process_thread_.get(), &call_stats_, clock, timing_);
+ process_thread_.get(), &call_stats_, clock_, timing_);
}
protected:
@@ -131,6 +135,7 @@
PacketRouter packet_router_;
RtpStreamReceiverController rtp_stream_receiver_controller_;
std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
+ Clock* clock_;
VCMTiming* timing_;
};
@@ -243,13 +248,13 @@
fake_decoder.video_format = SdpVideoFormat("VP8");
fake_decoder.decoder_factory = &fake_decoder_factory_;
config_.decoders.push_back(fake_decoder);
- Clock* clock = Clock::GetRealTimeClock();
- timing_ = new VCMTiming(clock);
+ clock_ = Clock::GetRealTimeClock();
+ timing_ = new VCMTiming(clock_);
video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream(
task_queue_factory_.get(), &rtp_stream_receiver_controller_,
kDefaultNumCpuCores, &packet_router_, config_.Copy(),
- process_thread_.get(), &call_stats_, clock, timing_));
+ process_thread_.get(), &call_stats_, clock_, timing_));
}
protected:
@@ -263,6 +268,7 @@
PacketRouter packet_router_;
RtpStreamReceiverController rtp_stream_receiver_controller_;
std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
+ Clock* clock_;
VCMTiming* timing_;
};
@@ -320,7 +326,85 @@
video_receive_stream_->OnCompleteFrame(std::move(test_frame));
EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
- EXPECT_EQ(fake_renderer_.packet_infos().size(), 3U);
+ EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
+}
+
+TEST_F(VideoReceiveStreamTestWithFakeDecoder, RenderedFrameUpdatesGetSources) {
+ constexpr uint32_t kSsrc = 1111;
+ constexpr uint32_t kCsrc = 9001;
+ constexpr uint32_t kRtpTimestamp = 12345;
+
+ // Prepare one video frame with per-packet information.
+ auto test_frame = absl::make_unique<FrameObjectFake>();
+ test_frame->SetPayloadType(99);
+ test_frame->id.picture_id = 0;
+ RtpPacketInfos packet_infos;
+ {
+ RtpPacketInfos::vector_type infos;
+
+ RtpPacketInfo info;
+ info.set_ssrc(kSsrc);
+ info.set_csrcs({kCsrc});
+ info.set_rtp_timestamp(kRtpTimestamp);
+
+ info.set_receive_time_ms(clock_->TimeInMilliseconds() - 5000);
+ infos.push_back(info);
+
+ info.set_receive_time_ms(clock_->TimeInMilliseconds() - 3000);
+ infos.push_back(info);
+
+ info.set_receive_time_ms(clock_->TimeInMilliseconds() - 2000);
+ infos.push_back(info);
+
+ info.set_receive_time_ms(clock_->TimeInMilliseconds() - 4000);
+ infos.push_back(info);
+
+ packet_infos = RtpPacketInfos(std::move(infos));
+ }
+ test_frame->SetPacketInfos(packet_infos);
+
+ // Start receive stream.
+ video_receive_stream_->Start();
+ EXPECT_THAT(video_receive_stream_->GetSources(), IsEmpty());
+
+ // Render one video frame.
+ int64_t timestamp_ms_min = clock_->TimeInMilliseconds();
+ video_receive_stream_->OnCompleteFrame(std::move(test_frame));
+ EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
+ int64_t timestamp_ms_max = clock_->TimeInMilliseconds();
+
+ // Verify that the per-packet information is passed to the renderer.
+ EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
+
+ // Verify that the per-packet information also updates |GetSources()|.
+ std::vector<RtpSource> sources = video_receive_stream_->GetSources();
+ ASSERT_THAT(sources, SizeIs(2));
+ {
+ auto it = std::find_if(sources.begin(), sources.end(),
+ [](const RtpSource& source) {
+ return source.source_type() == RtpSourceType::SSRC;
+ });
+ ASSERT_NE(it, sources.end());
+
+ EXPECT_EQ(it->source_id(), kSsrc);
+ EXPECT_EQ(it->source_type(), RtpSourceType::SSRC);
+ EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
+ EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
+ EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
+ }
+ {
+ auto it = std::find_if(sources.begin(), sources.end(),
+ [](const RtpSource& source) {
+ return source.source_type() == RtpSourceType::CSRC;
+ });
+ ASSERT_NE(it, sources.end());
+
+ EXPECT_EQ(it->source_id(), kCsrc);
+ EXPECT_EQ(it->source_type(), RtpSourceType::CSRC);
+ EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
+ EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
+ EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
+ }
}
} // namespace webrtc