Fix unsignalled ssrc race in WebRtcVideoChannel.
BaseChannel adds and removes receive streams on the worker thread
(UpdateRemoteStreams_w) and then posts a task to the network thread to
update the demuxer criteria. Until this happens, OnRtpPacket() keeps
forwarding "recently removed" ssrc packets to the WebRtcVideoChannel.
Furthermore WebRtcVideoChannel::OnPacketReceived() posts task from the
network thread to the worker thread, so even if the demuxer criteria was
instantly updated we would still have an issue of in-flight packets for
old ssrcs arriving late on the worker thread inside WebRtcVideoChannel.
The wrong ssrc could also arrive when the demuxer goes from forwarding
all packets to a single m= section to forwarding to different m=
sections. In this case we get packets with an ssrc for a recently
created m= section and the ssrc was never intended for our channel.
This is a problem because when WebRtcVideoChannel sees an unknown ssrc
it treats it as an unsignalled stream, creating and destroying default
streams which can be very expensive and introduce large delays when lots
of packets are queued up.
This CL addresses the issue with callbacks for when a demuxer criteria
update is pending and when it has completed. During this window of time,
WebRtcVideoChannel will drop packets for unknown ssrcs.
This approach fixes the race without introducing any new locks and
packets belonging to ssrcs that were not removed continue to be
forwarded even if a demuxer criteria update is pending. This should make
a=inactive for 50p receive streams a glitch-free experience.
(cherry picked from commit 15e078c574981597c5d6ecc13476f54e667dc568)
Bug: webrtc:12258, chromium:1069603
Change-Id: I30d85f53d84e7eddf7d21380fb608631863aad21
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214964
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Taylor <deadbeef@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Original-Commit-Position: refs/heads/master@{#33757}
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215976
Cr-Commit-Position: refs/branch-heads/4472@{#8}
Cr-Branched-From: 3e0c60ba4ef28a9f26fe991e5eec3150402c7dd3-refs/heads/master@{#33644}
diff --git a/call/rtp_demuxer.cc b/call/rtp_demuxer.cc
index c09aefd..ee96196 100644
--- a/call/rtp_demuxer.cc
+++ b/call/rtp_demuxer.cc
@@ -53,6 +53,16 @@
RtpDemuxerCriteria::RtpDemuxerCriteria() = default;
RtpDemuxerCriteria::~RtpDemuxerCriteria() = default;
+bool RtpDemuxerCriteria::operator==(const RtpDemuxerCriteria& other) const {
+ return this->mid == other.mid && this->rsid == other.rsid &&
+ this->ssrcs == other.ssrcs &&
+ this->payload_types == other.payload_types;
+}
+
+bool RtpDemuxerCriteria::operator!=(const RtpDemuxerCriteria& other) const {
+ return !(*this == other);
+}
+
std::string RtpDemuxerCriteria::ToString() const {
rtc::StringBuilder sb;
sb << "{mid: " << (mid.empty() ? "<empty>" : mid)
diff --git a/call/rtp_demuxer.h b/call/rtp_demuxer.h
index b89f154..b71c2bc 100644
--- a/call/rtp_demuxer.h
+++ b/call/rtp_demuxer.h
@@ -28,6 +28,9 @@
RtpDemuxerCriteria();
~RtpDemuxerCriteria();
+ bool operator==(const RtpDemuxerCriteria& other) const;
+ bool operator!=(const RtpDemuxerCriteria& other) const;
+
// If not the empty string, will match packets with this MID.
std::string mid;
diff --git a/media/base/fake_media_engine.h b/media/base/fake_media_engine.h
index 42940bf..880b0ff 100644
--- a/media/base/fake_media_engine.h
+++ b/media/base/fake_media_engine.h
@@ -118,6 +118,8 @@
return RemoveStreamBySsrc(&send_streams_, ssrc);
}
virtual void ResetUnsignaledRecvStream() {}
+ virtual void OnDemuxerCriteriaUpdatePending() {}
+ virtual void OnDemuxerCriteriaUpdateComplete() {}
virtual bool AddRecvStream(const StreamParams& sp) {
if (absl::c_linear_search(receive_streams_, sp)) {
diff --git a/media/base/media_channel.h b/media/base/media_channel.h
index 9b0ead1..b28fd71 100644
--- a/media/base/media_channel.h
+++ b/media/base/media_channel.h
@@ -206,6 +206,17 @@
// Resets any cached StreamParams for an unsignaled RecvStream, and removes
// any existing unsignaled streams.
virtual void ResetUnsignaledRecvStream() = 0;
+ // Informs the media channel when the transport's demuxer criteria is updated.
+ // * OnDemuxerCriteriaUpdatePending() happens on the same thread that the
+ // channel's streams are added and removed (worker thread).
+ // * OnDemuxerCriteriaUpdateComplete() happens on the thread where the demuxer
+ // lives (network thread).
+ // Because the demuxer is updated asynchronously, there is a window of time
+ // where packets are arriving to the channel for streams that have already
+ // been removed on the worker thread. It is important NOT to treat these as
+ // new unsignalled ssrcs.
+ virtual void OnDemuxerCriteriaUpdatePending() = 0;
+ virtual void OnDemuxerCriteriaUpdateComplete() = 0;
// Returns the absoulte sendtime extension id value from media channel.
virtual int GetRtpSendTimeExtnId() const;
// Set the frame encryptor to use on all outgoing frames. This is optional.
diff --git a/media/base/rtp_data_engine.cc b/media/base/rtp_data_engine.cc
index 5fbb25f..d5a4cc2 100644
--- a/media/base/rtp_data_engine.cc
+++ b/media/base/rtp_data_engine.cc
@@ -196,6 +196,8 @@
// Not implemented.
void RtpDataMediaChannel::ResetUnsignaledRecvStream() {}
+void RtpDataMediaChannel::OnDemuxerCriteriaUpdatePending() {}
+void RtpDataMediaChannel::OnDemuxerCriteriaUpdateComplete() {}
void RtpDataMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
int64_t /* packet_time_us */) {
diff --git a/media/base/rtp_data_engine.h b/media/base/rtp_data_engine.h
index e5f071d..5865903 100644
--- a/media/base/rtp_data_engine.h
+++ b/media/base/rtp_data_engine.h
@@ -73,6 +73,8 @@
virtual bool AddRecvStream(const StreamParams& sp);
virtual bool RemoveRecvStream(uint32_t ssrc);
virtual void ResetUnsignaledRecvStream();
+ virtual void OnDemuxerCriteriaUpdatePending();
+ virtual void OnDemuxerCriteriaUpdateComplete();
virtual bool SetSend(bool send) {
sending_ = send;
return true;
diff --git a/media/engine/fake_webrtc_call.cc b/media/engine/fake_webrtc_call.cc
index 77b5a5d..0d97162 100644
--- a/media/engine/fake_webrtc_call.cc
+++ b/media/engine/fake_webrtc_call.cc
@@ -578,14 +578,17 @@
if (media_type == webrtc::MediaType::VIDEO) {
for (auto receiver : video_receive_streams_) {
- if (receiver->GetConfig().rtp.remote_ssrc == ssrc)
+ if (receiver->GetConfig().rtp.remote_ssrc == ssrc) {
+ ++delivered_packets_by_ssrc_[ssrc];
return DELIVERY_OK;
+ }
}
}
if (media_type == webrtc::MediaType::AUDIO) {
for (auto receiver : audio_receive_streams_) {
if (receiver->GetConfig().rtp.remote_ssrc == ssrc) {
receiver->DeliverRtp(packet.cdata(), packet.size(), packet_time_us);
+ ++delivered_packets_by_ssrc_[ssrc];
return DELIVERY_OK;
}
}
diff --git a/media/engine/fake_webrtc_call.h b/media/engine/fake_webrtc_call.h
index 25d43da..1326a07 100644
--- a/media/engine/fake_webrtc_call.h
+++ b/media/engine/fake_webrtc_call.h
@@ -20,6 +20,7 @@
#ifndef MEDIA_ENGINE_FAKE_WEBRTC_CALL_H_
#define MEDIA_ENGINE_FAKE_WEBRTC_CALL_H_
+#include <map>
#include <memory>
#include <string>
#include <vector>
@@ -299,6 +300,10 @@
const std::vector<FakeFlexfecReceiveStream*>& GetFlexfecReceiveStreams();
rtc::SentPacket last_sent_packet() const { return last_sent_packet_; }
+ size_t GetDeliveredPacketsForSsrc(uint32_t ssrc) const {
+ auto it = delivered_packets_by_ssrc_.find(ssrc);
+ return it != delivered_packets_by_ssrc_.end() ? it->second : 0u;
+ }
// This is useful if we care about the last media packet (with id populated)
// but not the last ICE packet (with -1 ID).
@@ -379,6 +384,7 @@
std::vector<FakeVideoReceiveStream*> video_receive_streams_;
std::vector<FakeAudioReceiveStream*> audio_receive_streams_;
std::vector<FakeFlexfecReceiveStream*> flexfec_receive_streams_;
+ std::map<uint32_t, size_t> delivered_packets_by_ssrc_;
int num_created_send_streams_;
int num_created_receive_streams_;
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index 057fdf6..c6e7eea 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1587,6 +1587,19 @@
}
}
+void WebRtcVideoChannel::OnDemuxerCriteriaUpdatePending() {
+ RTC_DCHECK_RUN_ON(&thread_checker_);
+ ++demuxer_criteria_id_;
+}
+
+void WebRtcVideoChannel::OnDemuxerCriteriaUpdateComplete() {
+ RTC_DCHECK_RUN_ON(&network_thread_checker_);
+ worker_thread_->PostTask(ToQueuedTask(task_safety_, [this] {
+ RTC_DCHECK_RUN_ON(&thread_checker_);
+ ++demuxer_criteria_completed_id_;
+ }));
+}
+
bool WebRtcVideoChannel::SetSink(
uint32_t ssrc,
rtc::VideoSinkInterface<webrtc::VideoFrame>* sink) {
@@ -1753,6 +1766,14 @@
return;
}
+ // Ignore unknown ssrcs if there is a demuxer criteria update pending.
+ // During a demuxer update we may receive ssrcs that were recently
+ // removed or we may receve ssrcs that were recently configured for a
+ // different video channel.
+ if (demuxer_criteria_id_ != demuxer_criteria_completed_id_) {
+ return;
+ }
+
switch (unsignalled_ssrc_handler_->OnUnsignalledSsrc(this, ssrc)) {
case UnsignalledSsrcHandler::kDropPacket:
return;
diff --git a/media/engine/webrtc_video_engine.h b/media/engine/webrtc_video_engine.h
index fe3ad6d..80e047a 100644
--- a/media/engine/webrtc_video_engine.h
+++ b/media/engine/webrtc_video_engine.h
@@ -159,6 +159,8 @@
bool AddRecvStream(const StreamParams& sp, bool default_stream);
bool RemoveRecvStream(uint32_t ssrc) override;
void ResetUnsignaledRecvStream() override;
+ void OnDemuxerCriteriaUpdatePending() override;
+ void OnDemuxerCriteriaUpdateComplete() override;
bool SetSink(uint32_t ssrc,
rtc::VideoSinkInterface<webrtc::VideoFrame>* sink) override;
void SetDefaultSink(
@@ -574,6 +576,22 @@
RTC_GUARDED_BY(thread_checker_);
std::map<uint32_t, WebRtcVideoReceiveStream*> receive_streams_
RTC_GUARDED_BY(thread_checker_);
+ // When the channel and demuxer get reconfigured, there is a window of time
+ // where we have to be prepared for packets arriving based on the old demuxer
+ // criteria because the streams live on the worker thread and the demuxer
+ // lives on the network thread. Because packets are posted from the network
+ // thread to the worker thread, they can still be in-flight when streams are
+ // reconfgured. This can happen when |demuxer_criteria_id_| and
+ // |demuxer_criteria_completed_id_| don't match. During this time, we do not
+ // want to create unsignalled receive streams and should instead drop the
+ // packets. E.g:
+ // * If RemoveRecvStream(old_ssrc) was recently called, there may be packets
+ // in-flight for that ssrc. This happens when a receiver becomes inactive.
+ // * If we go from one to many m= sections, the demuxer may change from
+ // forwarding all packets to only forwarding the configured ssrcs, so there
+ // is a risk of receiving ssrcs for other, recently added m= sections.
+ uint32_t demuxer_criteria_id_ RTC_GUARDED_BY(thread_checker_) = 0;
+ uint32_t demuxer_criteria_completed_id_ RTC_GUARDED_BY(thread_checker_) = 0;
std::set<uint32_t> send_ssrcs_ RTC_GUARDED_BY(thread_checker_);
std::set<uint32_t> receive_ssrcs_ RTC_GUARDED_BY(thread_checker_);
diff --git a/media/engine/webrtc_video_engine_unittest.cc b/media/engine/webrtc_video_engine_unittest.cc
index fc945dd..8ff34de 100644
--- a/media/engine/webrtc_video_engine_unittest.cc
+++ b/media/engine/webrtc_video_engine_unittest.cc
@@ -6385,6 +6385,9 @@
cricket::StreamParams unsignaled_stream;
unsignaled_stream.set_stream_ids({kSyncLabel});
ASSERT_TRUE(channel_->AddRecvStream(unsignaled_stream));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
// The stream shouldn't have been created at this point because it doesn't
// have any SSRCs.
EXPECT_EQ(0u, fake_call_->GetVideoReceiveStreams().size());
@@ -6403,11 +6406,22 @@
EXPECT_EQ(kSyncLabel,
fake_call_->GetVideoReceiveStreams()[0]->GetConfig().sync_group);
- // Reset the unsignaled stream to clear the cache. This time when
- // a default video receive stream is created it won't have a sync_group.
+ // Reset the unsignaled stream to clear the cache. This deletes the receive
+ // stream.
channel_->ResetUnsignaledRecvStream();
+ channel_->OnDemuxerCriteriaUpdatePending();
EXPECT_EQ(0u, fake_call_->GetVideoReceiveStreams().size());
+ // Until the demuxer criteria has been updated, we ignore in-flight ssrcs of
+ // the recently removed unsignaled receive stream.
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(0u, fake_call_->GetVideoReceiveStreams().size());
+
+ // After the demuxer criteria has been updated, we should proceed to create
+ // unsignalled receive streams. This time when a default video receive stream
+ // is created it won't have a sync_group.
+ channel_->OnDemuxerCriteriaUpdateComplete();
channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
rtc::Thread::Current()->ProcessMessages(0);
EXPECT_EQ(1u, fake_call_->GetVideoReceiveStreams().size());
@@ -6447,6 +6461,279 @@
EXPECT_EQ(receivers2[0]->GetConfig().rtp.remote_ssrc, kIncomingSignalledSsrc);
}
+TEST_F(WebRtcVideoChannelTest,
+ RecentlyAddedSsrcsDoNotCreateUnsignalledRecvStreams) {
+ const uint32_t kSsrc1 = 1;
+ const uint32_t kSsrc2 = 2;
+
+ // Starting point: receiving kSsrc1.
+ EXPECT_TRUE(channel_->AddRecvStream(StreamParams::CreateLegacy(kSsrc1)));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+
+ // If this is the only m= section the demuxer might be configure to forward
+ // all packets, regardless of ssrc, to this channel. When we go to multiple m=
+ // sections, there can thus be a window of time where packets that should
+ // never have belonged to this channel arrive anyway.
+
+ // Emulate a second m= section being created by updating the demuxer criteria
+ // without adding any streams.
+ channel_->OnDemuxerCriteriaUpdatePending();
+
+ // Emulate there being in-flight packets for kSsrc1 and kSsrc2 arriving before
+ // the demuxer is updated.
+ {
+ // Receive a packet for kSsrc1.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc1);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ {
+ // Receive a packet for kSsrc2.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc2);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // No unsignaled ssrc for kSsrc2 should have been created, but kSsrc1 should
+ // arrive since it already has a stream.
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc1), 1u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc2), 0u);
+
+ // Signal that the demuxer update is complete. Because there are no more
+ // pending demuxer updates, receiving unknown ssrcs (kSsrc2) should again
+ // result in unsignalled receive streams being created.
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // Receive packets for kSsrc1 and kSsrc2 again.
+ {
+ // Receive a packet for kSsrc1.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc1);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ {
+ // Receive a packet for kSsrc2.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc2);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // An unsignalled ssrc for kSsrc2 should be created and the packet counter
+ // should increase for both ssrcs.
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 2u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc1), 2u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc2), 1u);
+}
+
+TEST_F(WebRtcVideoChannelTest,
+ RecentlyRemovedSsrcsDoNotCreateUnsignalledRecvStreams) {
+ const uint32_t kSsrc1 = 1;
+ const uint32_t kSsrc2 = 2;
+
+ // Starting point: receiving kSsrc1 and kSsrc2.
+ EXPECT_TRUE(channel_->AddRecvStream(StreamParams::CreateLegacy(kSsrc1)));
+ EXPECT_TRUE(channel_->AddRecvStream(StreamParams::CreateLegacy(kSsrc2)));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 2u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc1), 0u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc2), 0u);
+
+ // Remove kSsrc1, signal that a demuxer criteria update is pending, but not
+ // completed yet.
+ EXPECT_TRUE(channel_->RemoveRecvStream(kSsrc1));
+ channel_->OnDemuxerCriteriaUpdatePending();
+
+ // We only have a receiver for kSsrc2 now.
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+
+ // Emulate there being in-flight packets for kSsrc1 and kSsrc2 arriving before
+ // the demuxer is updated.
+ {
+ // Receive a packet for kSsrc1.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc1);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ {
+ // Receive a packet for kSsrc2.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc2);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // No unsignaled ssrc for kSsrc1 should have been created, but the packet
+ // count for kSsrc2 should increase.
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc1), 0u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc2), 1u);
+
+ // Signal that the demuxer update is complete. This means we should stop
+ // ignorning kSsrc1.
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // Receive packets for kSsrc1 and kSsrc2 again.
+ {
+ // Receive a packet for kSsrc1.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc1);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ {
+ // Receive a packet for kSsrc2.
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc2);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // An unsignalled ssrc for kSsrc1 should be created and the packet counter
+ // should increase for both ssrcs.
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 2u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc1), 1u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc2), 2u);
+}
+
+TEST_F(WebRtcVideoChannelTest, MultiplePendingDemuxerCriteriaUpdates) {
+ const uint32_t kSsrc = 1;
+
+ // Starting point: receiving kSsrc.
+ EXPECT_TRUE(channel_->AddRecvStream(StreamParams::CreateLegacy(kSsrc)));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+ ASSERT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+
+ // Remove kSsrc...
+ EXPECT_TRUE(channel_->RemoveRecvStream(kSsrc));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 0u);
+ // And then add it back again, before the demuxer knows about the new
+ // criteria!
+ EXPECT_TRUE(channel_->AddRecvStream(StreamParams::CreateLegacy(kSsrc)));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+
+ // In-flight packets should arrive because the stream was recreated, even
+ // though demuxer criteria updates are pending...
+ {
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc), 1u);
+
+ // Signal that the demuxer knows about the first update: the removal.
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // This still should not prevent in-flight packets from arriving because we
+ // have a receive stream for it.
+ {
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc), 2u);
+
+ // Remove the kSsrc again while previous demuxer updates are still pending.
+ EXPECT_TRUE(channel_->RemoveRecvStream(kSsrc));
+ channel_->OnDemuxerCriteriaUpdatePending();
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 0u);
+
+ // Now the packet should be dropped and not create an unsignalled receive
+ // stream.
+ {
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 0u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc), 2u);
+
+ // Signal that the demuxer knows about the second update: adding it back.
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // The packets should continue to be dropped because removal happened after
+ // the most recently completed demuxer update.
+ {
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 0u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc), 2u);
+
+ // Signal that the demuxer knows about the last update: the second removal.
+ channel_->OnDemuxerCriteriaUpdateComplete();
+ rtc::Thread::Current()->ProcessMessages(0);
+
+ // If packets still arrive after the demuxer knows about the latest removal we
+ // should finally create an unsignalled receive stream.
+ {
+ const size_t kDataLength = 12;
+ uint8_t data[kDataLength];
+ memset(data, 0, sizeof(data));
+ rtc::SetBE32(&data[8], kSsrc);
+ rtc::CopyOnWriteBuffer packet(data, kDataLength);
+ channel_->OnPacketReceived(packet, /* packet_time_us */ -1);
+ }
+ rtc::Thread::Current()->ProcessMessages(0);
+ EXPECT_EQ(fake_call_->GetVideoReceiveStreams().size(), 1u);
+ EXPECT_EQ(fake_call_->GetDeliveredPacketsForSsrc(kSsrc), 3u);
+}
+
// Test BaseMinimumPlayoutDelayMs on receive streams.
TEST_F(WebRtcVideoChannelTest, BaseMinimumPlayoutDelayMs) {
// Test that set won't work for non-existing receive streams.
diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc
index f0ea10d..57c1eae 100644
--- a/media/engine/webrtc_voice_engine.cc
+++ b/media/engine/webrtc_voice_engine.cc
@@ -2070,6 +2070,13 @@
}
}
+// Not implemented.
+// TODO(https://crbug.com/webrtc/12676): Implement a fix for the unsignalled
+// SSRC race that can happen when an m= section goes from receiving to not
+// receiving.
+void WebRtcVoiceMediaChannel::OnDemuxerCriteriaUpdatePending() {}
+void WebRtcVoiceMediaChannel::OnDemuxerCriteriaUpdateComplete() {}
+
bool WebRtcVoiceMediaChannel::SetLocalSource(uint32_t ssrc,
AudioSource* source) {
auto it = send_streams_.find(ssrc);
diff --git a/media/engine/webrtc_voice_engine.h b/media/engine/webrtc_voice_engine.h
index f7f1bfc..4bc61d5 100644
--- a/media/engine/webrtc_voice_engine.h
+++ b/media/engine/webrtc_voice_engine.h
@@ -187,6 +187,8 @@
bool AddRecvStream(const StreamParams& sp) override;
bool RemoveRecvStream(uint32_t ssrc) override;
void ResetUnsignaledRecvStream() override;
+ void OnDemuxerCriteriaUpdatePending() override;
+ void OnDemuxerCriteriaUpdateComplete() override;
// E2EE Frame API
// Set a frame decryptor to a particular ssrc that will intercept all
diff --git a/pc/channel.cc b/pc/channel.cc
index f37be67..17ce0da 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -174,7 +174,13 @@
bool BaseChannel::ConnectToRtpTransport() {
RTC_DCHECK(rtp_transport_);
- if (!RegisterRtpDemuxerSink_n()) {
+ // We don't need to call OnDemuxerCriteriaUpdatePending/Complete because
+ // there's no previous criteria to worry about.
+ bool result = rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
+ if (result) {
+ previous_demuxer_criteria_ = demuxer_criteria_;
+ } else {
+ previous_demuxer_criteria_ = {};
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
return false;
}
@@ -512,23 +518,28 @@
}
bool BaseChannel::RegisterRtpDemuxerSink_w() {
+ if (demuxer_criteria_ == previous_demuxer_criteria_) {
+ return true;
+ }
+ media_channel_->OnDemuxerCriteriaUpdatePending();
// Copy demuxer criteria, since they're a worker-thread variable
// and we want to pass them to the network thread
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [this, demuxer_criteria = demuxer_criteria_] {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
- return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this);
+ bool result =
+ rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this);
+ if (result) {
+ previous_demuxer_criteria_ = demuxer_criteria;
+ } else {
+ previous_demuxer_criteria_ = {};
+ }
+ media_channel_->OnDemuxerCriteriaUpdateComplete();
+ return result;
});
}
-bool BaseChannel::RegisterRtpDemuxerSink_n() {
- RTC_DCHECK(rtp_transport_);
- // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
- // networking thread.
- return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
-}
-
void BaseChannel::EnableMedia_w() {
if (enabled_)
return;
diff --git a/pc/channel.h b/pc/channel.h
index fe3778b..5d470d3 100644
--- a/pc/channel.h
+++ b/pc/channel.h
@@ -300,7 +300,6 @@
const RtpHeaderExtensions& header_extensions);
bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread());
- bool RegisterRtpDemuxerSink_n() RTC_RUN_ON(network_thread());
// Return description of media channel to facilitate logging
std::string ToString() const;
@@ -371,6 +370,9 @@
// TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed
// on network thread in RegisterRtpDemuxerSink_n (called from Init_w)
webrtc::RtpDemuxerCriteria demuxer_criteria_;
+ // Accessed on the worker thread, modified on the network thread from
+ // RegisterRtpDemuxerSink_w's Invoke.
+ webrtc::RtpDemuxerCriteria previous_demuxer_criteria_;
// This generator is used to generate SSRCs for local streams.
// This is needed in cases where SSRCs are not negotiated or set explicitly
// like in Simulcast.