Reland "(Un/)Subscribe RtpVideoSender for feedback on the transport queue."
This is a reland of 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa
Original change's description:
> (Un/)Subscribe RtpVideoSender for feedback on the transport queue.
>
> * RtpVideoSender now registers/unregisters for feedback callback
> inside of SetActive(), which runs on the transport queue.
> * Transport feedback is given on the transport queue
> * Registration/unregistration for feedback is done on the same
> * Removed the last mutex from TransportFeedbackDemuxer.
>
> Ultimately, this work is related to moving state from the Call
> class, that's related to network configuration, but due to the code
> is currently written is attached to the worker thread, over to the
> Transport, where it's used (e.g. suspended_video_send_ssrcs_).
>
> Bug: webrtc:13517, webrtc:11993
> Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35777}
Bug: webrtc:13517, webrtc:11993
Change-Id: I766e569abea8bae96d32267a951fcdc195ced8a7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249782
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35863}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index fc4f483..230b048 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -572,10 +572,10 @@
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
- feedback_demuxer_.OnTransportFeedback(feedback);
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
+ feedback_demuxer_.OnTransportFeedback(feedback);
absl::optional<TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
feedback_time);
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 78cf281..35e6bee 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -445,9 +445,6 @@
fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled());
fec_controller_->SetProtectionCallback(this);
- // Signal congestion controller this object is ready for OnPacket* callbacks.
- transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
- rtp_config_.ssrcs, this);
// Construction happens on the worker thread (see Call::CreateVideoSendStream)
// but subseqeuent calls to the RTP state will happen on one of two threads:
@@ -466,8 +463,8 @@
SetActiveModulesLocked(
std::vector<bool>(rtp_streams_.size(), /*active=*/false));
- transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver(
- this);
+
+ RTC_DCHECK(!registered_for_feedback_);
}
void RtpVideoSender::SetActive(bool active) {
@@ -475,8 +472,18 @@
MutexLock lock(&mutex_);
if (active_ == active)
return;
+
const std::vector<bool> active_modules(rtp_streams_.size(), active);
SetActiveModulesLocked(active_modules);
+
+ auto* feedback_provider = transport_->GetStreamFeedbackProvider();
+ if (active && !registered_for_feedback_) {
+ feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this);
+ registered_for_feedback_ = true;
+ } else if (!active && registered_for_feedback_) {
+ feedback_provider->DeRegisterStreamFeedbackObserver(this);
+ registered_for_feedback_ = false;
+ }
}
void RtpVideoSender::SetActiveModules(const std::vector<bool> active_modules) {
diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h
index ea0d1ee..7023804 100644
--- a/call/rtp_video_sender.h
+++ b/call/rtp_video_sender.h
@@ -180,6 +180,7 @@
// transport task queue.
mutable Mutex mutex_;
bool active_ RTC_GUARDED_BY(mutex_);
+ bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false;
const std::unique_ptr<FecController> fec_controller_;
bool fec_allowed_ RTC_GUARDED_BY(mutex_);
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index 0644556..c47717d 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -13,6 +13,7 @@
#include <atomic>
#include <memory>
#include <string>
+#include <utility>
#include "call/rtp_transport_controller_send.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@@ -177,10 +178,33 @@
/*frame_count_observer=*/nullptr,
/*frame_transformer=*/nullptr) {}
+ ~RtpVideoSenderTestFixture() { SetActive(false); }
+
RtpVideoSender* router() { return router_.get(); }
MockTransport& transport() { return transport_; }
void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
+ void SetActive(bool active) {
+ RunOnTransportQueue([&]() { router_->SetActive(active); });
+ }
+
+ void SetActiveModules(const std::vector<bool>& active_modules) {
+ RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); });
+ }
+
+ // Several RtpVideoSender methods expect to be called on the task queue as
+ // owned by the send transport. While the SequenceChecker may pick up the
+ // default thread as the transport queue, explicit checks for the transport
+ // queue (not just using a SequenceChecker) aren't possible unless such a
+ // queue is actually active. So RunOnTransportQueue is a convenience function
+ // that allow for running a closure on the transport queue, similar to
+ // SendTask().
+ template <typename Closure>
+ void RunOnTransportQueue(Closure&& task) {
+ transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
+ AdvanceTime(TimeDelta::Millis(0));
+ }
+
private:
NiceMock<MockTransport> transport_;
NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
@@ -217,15 +241,15 @@
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
- test.router()->SetActive(true);
+ test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
- test.router()->SetActive(false);
+ test.SetActive(false);
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
- test.router()->SetActive(true);
+ test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
}
@@ -244,7 +268,7 @@
CodecSpecificInfo codec_info;
codec_info.codecType = kVideoCodecVP8;
- test.router()->SetActive(true);
+ test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
@@ -254,7 +278,7 @@
test.router()->OnEncodedImage(encoded_image_2, &codec_info).error);
// Inactive.
- test.router()->SetActive(false);
+ test.SetActive(false);
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -284,14 +308,14 @@
// Only setting one stream to active will still set the payload router to
// active and allow sending data on the active stream.
std::vector<bool> active_modules({true, false});
- test.router()->SetActiveModules(active_modules);
+ test.SetActiveModules(active_modules);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
// Setting both streams to inactive will turn the payload router to
// inactive.
active_modules = {false, false};
- test.router()->SetActiveModules(active_modules);
+ test.SetActiveModules(active_modules);
// An incoming encoded image will not ask the module to send outgoing data
// because the payload router is inactive.
EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -303,7 +327,7 @@
TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
test.router()->GetRtpPayloadStates();
@@ -328,7 +352,7 @@
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, states);
- test.router()->SetActive(true);
+ test.SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
test.router()->GetRtpPayloadStates();
@@ -368,7 +392,7 @@
test.router()->OnEncodedImage(encoded_image, nullptr).error);
::testing::Mock::VerifyAndClearExpectations(&callback);
- test.router()->SetActive(true);
+ test.SetActive(true);
FrameCounts frame_counts;
EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1))
@@ -397,7 +421,7 @@
TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
constexpr uint8_t kPayload = 'a';
EncodedImage encoded_image;
@@ -496,8 +520,8 @@
}
// This tests that we utilize transport wide feedback to retransmit lost
-// packets. This is tested by dropping all ordirary packets from a "lossy"
-// stream send along with an secondary untouched stream. The transport wide
+// packets. This is tested by dropping all ordinary packets from a "lossy"
+// stream sent along with a secondary untouched stream. The transport wide
// feedback packets from the secondary stream allows the sending side to
// detect and retreansmit the lost packets from the lossy stream.
TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
@@ -562,7 +586,7 @@
TEST(RtpVideoSenderTest, EarlyRetransmits) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
const uint8_t kPayload[1] = {'a'};
EncodedImage encoded_image;
@@ -657,7 +681,7 @@
TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -717,7 +741,7 @@
TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -773,7 +797,7 @@
TEST(RtpVideoSenderTest,
SupportsDependencyDescriptorForVp9NotProvidedByEncoder) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -828,7 +852,7 @@
test::ScopedFieldTrials field_trials(
"WebRTC-GenericCodecDependencyDescriptor/Enabled/");
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -874,7 +898,7 @@
TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -932,7 +956,7 @@
TEST(RtpVideoSenderTest,
SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
- test.router()->SetActive(true);
+ test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@@ -1017,7 +1041,7 @@
kRtpHeaderSizeBytes + kTransportPacketOverheadBytes;
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes);
- test.router()->SetActive(true);
+ test.SetActive(true);
{
test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000),
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
index 29accb5..50987b2 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
@@ -44,11 +44,7 @@
}
void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
- // Currently called on the send transport queue.
- // TODO(tommi): When registration/unregistration as well as
- // OnTransportFeedback callbacks occur on the transport queue, we can remove
- // this lock.
- MutexLock lock(&lock_);
+ RTC_DCHECK_RUN_ON(&observer_checker_);
StreamFeedbackObserver::StreamPacketInfo info;
info.ssrc = packet_info.media_ssrc;
@@ -66,24 +62,22 @@
void TransportFeedbackDemuxer::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
+ RTC_DCHECK_RUN_ON(&observer_checker_);
+
std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
- {
- MutexLock lock(&lock_);
- for (const auto& packet : feedback.GetAllPackets()) {
- int64_t seq_num =
- seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
- auto it = history_.find(seq_num);
- if (it != history_.end()) {
- auto packet_info = it->second;
- packet_info.received = packet.received();
- stream_feedbacks.push_back(std::move(packet_info));
- if (packet.received())
- history_.erase(it);
- }
+ for (const auto& packet : feedback.GetAllPackets()) {
+ int64_t seq_num =
+ seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
+ auto it = history_.find(seq_num);
+ if (it != history_.end()) {
+ auto packet_info = it->second;
+ packet_info.received = packet.received();
+ stream_feedbacks.push_back(std::move(packet_info));
+ if (packet.received())
+ history_.erase(it);
}
}
- RTC_DCHECK_RUN_ON(&observer_checker_);
for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
for (const auto& packet_info : stream_feedbacks) {
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
index 895288f..7f4f575 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
@@ -17,7 +17,6 @@
#include "api/sequence_checker.h"
#include "modules/include/module_common_types_public.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
-#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
@@ -45,15 +44,14 @@
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
private:
- Mutex lock_;
- SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
+ SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_);
std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
- RTC_GUARDED_BY(&lock_);
+ RTC_GUARDED_BY(&observer_checker_);
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than
// set/map to ensure that the processing order is consistent independently of
// the randomized ssrcs.
- RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
observers_ RTC_GUARDED_BY(&observer_checker_);
};