Do all BaseChannel operations within a single Thread::Invoke.
Instead of doing a separate Invoke for each channel, this CL first
gathers a list of operations to be performed on the signaling thread,
then does a single Invoke on the worker thread (and nested Invoke
on the network thread) to update all channels at once.
This includes the methods:
* Enable
* SetLocalContent/SetRemoteContent
* RegisterRtpDemuxerSink
* UpdateRtpHeaderExtensionMap
Also, removed the need for a network thread Invoke in
IsReadyToSendMedia_w by moving ownership of was_ever_writable_ to the
worker thread.
Bug: webrtc:12266
Change-Id: I31e61fe0758aeb053b09db84f234deb58dfb3d05
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/194181
Commit-Queue: Taylor <deadbeef@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32817}
diff --git a/pc/channel.cc b/pc/channel.cc
index aad7c54..34269a1 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -170,7 +170,9 @@
bool BaseChannel::ConnectToRtpTransport() {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
- if (!RegisterRtpDemuxerSink_n()) {
+ // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
+ // networking thread.
+ if (!rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this)) {
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
return false;
}
@@ -291,13 +293,42 @@
Bind(&BaseChannel::SetRemoteContent_w, this, content, type, error_desc));
}
-bool BaseChannel::SetPayloadTypeDemuxingEnabled(bool enabled) {
+void BaseChannel::SetPayloadTypeDemuxingEnabled(bool enabled) {
TRACE_EVENT0("webrtc", "BaseChannel::SetPayloadTypeDemuxingEnabled");
- return InvokeOnWorker<bool>(
+ InvokeOnWorker<void>(
RTC_FROM_HERE,
Bind(&BaseChannel::SetPayloadTypeDemuxingEnabled_w, this, enabled));
}
+bool BaseChannel::UpdateRtpTransport(std::string* error_desc) {
+ return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this, error_desc] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK(rtp_transport_);
+ // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
+ // networking thread.
+ if (!rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this)) {
+ RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
+ rtc::StringBuilder desc;
+ desc << "Failed to set up demuxing for m-section with mid='"
+ << content_name() << "'.";
+ SafeSetError(desc.str(), error_desc);
+ return false;
+ }
+ // NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
+ // extension maps are not merged when BUNDLE is enabled. This is fine
+ // because the ID for MID should be consistent among all the RTP transports,
+ // and that's all RtpTransport uses this map for.
+ //
+ // TODO(deadbeef): Move this call to JsepTransport, there is no reason
+ // BaseChannel needs to be involved here.
+ if (media_type() != cricket::MEDIA_TYPE_DATA) {
+ rtp_transport_->UpdateRtpHeaderExtensionMap(
+ receive_rtp_header_extensions_);
+ }
+ return true;
+ });
+}
+
bool BaseChannel::IsReadyToReceiveMedia_w() const {
// Receive data if we are enabled and have local content,
return enabled() &&
@@ -305,12 +336,6 @@
}
bool BaseChannel::IsReadyToSendMedia_w() const {
- // Need to access some state updated on the network thread.
- return network_thread_->Invoke<bool>(
- RTC_FROM_HERE, Bind(&BaseChannel::IsReadyToSendMedia_n, this));
-}
-
-bool BaseChannel::IsReadyToSendMedia_n() const {
// Send outgoing data if we are enabled, have local and remote content,
// and we have had some form of connectivity.
return enabled() &&
@@ -508,38 +533,6 @@
});
}
-void BaseChannel::UpdateRtpHeaderExtensionMap(
- const RtpHeaderExtensions& header_extensions) {
- // Update the header extension map on network thread in case there is data
- // race.
- //
- // NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
- // extension maps are not merged when BUNDLE is enabled. This is fine because
- // the ID for MID should be consistent among all the RTP transports.
- network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &header_extensions] {
- RTC_DCHECK_RUN_ON(network_thread());
- rtp_transport_->UpdateRtpHeaderExtensionMap(header_extensions);
- });
-}
-
-bool BaseChannel::RegisterRtpDemuxerSink_w() {
- // Copy demuxer criteria, since they're a worker-thread variable
- // and we want to pass them to the network thread
- return network_thread_->Invoke<bool>(
- RTC_FROM_HERE, [this, demuxer_criteria = demuxer_criteria_] {
- RTC_DCHECK_RUN_ON(network_thread());
- RTC_DCHECK(rtp_transport_);
- return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this);
- });
-}
-
-bool BaseChannel::RegisterRtpDemuxerSink_n() {
- RTC_DCHECK(rtp_transport_);
- // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
- // networking thread.
- return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
-}
-
void BaseChannel::EnableMedia_w() {
RTC_DCHECK(worker_thread_ == rtc::Thread::Current());
if (enabled_)
@@ -573,22 +566,28 @@
if (writable_) {
return;
}
+ writable_ = true;
RTC_LOG(LS_INFO) << "Channel writable (" << ToString() << ")"
- << (was_ever_writable_ ? "" : " for the first time");
-
- was_ever_writable_ = true;
- writable_ = true;
- UpdateMediaSendRecvState();
+ << (was_ever_writable_n_ ? "" : " for the first time");
+ // We only have to do this AsyncInvoke once, when first transitioning to
+ // writable.
+ if (!was_ever_writable_n_) {
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [this] {
+ RTC_DCHECK_RUN_ON(worker_thread());
+ was_ever_writable_ = true;
+ UpdateMediaSendRecvState_w();
+ });
+ }
+ was_ever_writable_n_ = true;
}
void BaseChannel::ChannelNotWritable_n() {
- if (!writable_)
+ if (!writable_) {
return;
-
- RTC_LOG(LS_INFO) << "Channel not writable (" << ToString() << ")";
+ }
writable_ = false;
- UpdateMediaSendRecvState();
+ RTC_LOG(LS_INFO) << "Channel not writable (" << ToString() << ")";
}
bool BaseChannel::AddRecvStream_w(const StreamParams& sp) {
@@ -604,9 +603,9 @@
media_channel()->ResetUnsignaledRecvStream();
}
-bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
+void BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
if (enabled == payload_type_demuxing_enabled_) {
- return true;
+ return;
}
payload_type_demuxing_enabled_ = enabled;
if (!enabled) {
@@ -617,21 +616,10 @@
// there is no straightforward way to identify those streams.
media_channel()->ResetUnsignaledRecvStream();
demuxer_criteria_.payload_types.clear();
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to disable payload type demuxing for "
- << ToString();
- return false;
- }
} else if (!payload_types_.empty()) {
demuxer_criteria_.payload_types.insert(payload_types_.begin(),
payload_types_.end());
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to enable payload type demuxing for "
- << ToString();
- return false;
- }
}
- return true;
}
bool BaseChannel::UpdateLocalStreams_w(const std::vector<StreamParams>& streams,
@@ -772,11 +760,6 @@
demuxer_criteria_.ssrcs.insert(new_stream.ssrcs.begin(),
new_stream.ssrcs.end());
}
- // Re-register the sink to update the receiving ssrcs.
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
- ret = false;
- }
remote_streams_ = streams;
return ret;
}
@@ -795,6 +778,10 @@
return webrtc::RtpExtension::FilterDuplicateNonEncrypted(extensions);
}
+void BaseChannel::SetReceiveExtensions(const RtpHeaderExtensions& extensions) {
+ receive_rtp_header_extensions_ = extensions;
+}
+
void BaseChannel::OnMessage(rtc::Message* pmsg) {
TRACE_EVENT0("webrtc", "BaseChannel::OnMessage");
switch (pmsg->message_id) {
@@ -873,12 +860,6 @@
Deinit();
}
-void BaseChannel::UpdateMediaSendRecvState() {
- RTC_DCHECK_RUN_ON(network_thread());
- invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
- [this] { UpdateMediaSendRecvState_w(); });
-}
-
void VoiceChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) {
BaseChannel::Init_w(rtp_transport);
}
@@ -916,7 +897,7 @@
RtpHeaderExtensions rtp_header_extensions =
GetFilteredRtpHeaderExtensions(audio->rtp_header_extensions());
- UpdateRtpHeaderExtensionMap(rtp_header_extensions);
+ SetReceiveExtensions(rtp_header_extensions);
media_channel()->SetExtmapAllowMixed(audio->extmap_allow_mixed());
AudioRecvParameters recv_params = last_recv_params_;
@@ -936,11 +917,6 @@
for (const AudioCodec& codec : audio->codecs()) {
MaybeAddHandledPayloadType(codec.id);
}
- // Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to set up audio demuxing for " << ToString();
- return false;
- }
}
last_recv_params_ = recv_params;
@@ -1003,10 +979,6 @@
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to update audio demuxing for " << ToString();
- return false;
- }
}
// TODO(pthatcher): Move remote streams into AudioRecvParameters,
@@ -1087,7 +1059,7 @@
RtpHeaderExtensions rtp_header_extensions =
GetFilteredRtpHeaderExtensions(video->rtp_header_extensions());
- UpdateRtpHeaderExtensionMap(rtp_header_extensions);
+ SetReceiveExtensions(rtp_header_extensions);
media_channel()->SetExtmapAllowMixed(video->extmap_allow_mixed());
VideoRecvParameters recv_params = last_recv_params_;
@@ -1130,11 +1102,6 @@
for (const VideoCodec& codec : video->codecs()) {
MaybeAddHandledPayloadType(codec.id);
}
- // Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to set up video demuxing for " << ToString();
- return false;
- }
}
last_recv_params_ = recv_params;
@@ -1241,10 +1208,6 @@
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to update video demuxing for " << ToString();
- return false;
- }
}
// TODO(pthatcher): Move remote streams into VideoRecvParameters,
@@ -1355,11 +1318,6 @@
for (const DataCodec& codec : data->codecs()) {
MaybeAddHandledPayloadType(codec.id);
}
- // Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink_w()) {
- RTC_LOG(LS_ERROR) << "Failed to set up data demuxing for " << ToString();
- return false;
- }
last_recv_params_ = recv_params;
diff --git a/pc/channel.h b/pc/channel.h
index 1fb2a39..0ba23eb 100644
--- a/pc/channel.h
+++ b/pc/channel.h
@@ -119,9 +119,6 @@
RTC_DCHECK_RUN_ON(network_thread());
return srtp_active();
}
-
- bool writable() const { return writable_; }
-
// Set an RTP level transport which could be an RtpTransport without
// encryption, an SrtpTransport for SDES or a DtlsSrtpTransport for DTLS-SRTP.
// This can be called from any thread and it hops to the network thread
@@ -143,7 +140,8 @@
return rtp_transport();
}
- // Channel control
+ // Channel control. Must call UpdateRtpTransport afterwards to apply any
+ // changes to the RtpTransport on the network thread.
bool SetLocalContent(const MediaContentDescription* content,
webrtc::SdpType type,
std::string* error_desc) override;
@@ -158,7 +156,11 @@
// This method will also remove any existing streams that were bound to this
// channel on the basis of payload type, since one of these streams might
// actually belong to a new channel. See: crbug.com/webrtc/11477
- bool SetPayloadTypeDemuxingEnabled(bool enabled) override;
+ //
+ // As with SetLocalContent/SetRemoteContent, must call UpdateRtpTransport
+ // afterwards to apply changes to the RtpTransport on the network thread.
+ void SetPayloadTypeDemuxingEnabled(bool enabled) override;
+ bool UpdateRtpTransport(std::string* error_desc) override;
bool Enable(bool enable) override;
@@ -198,7 +200,7 @@
protected:
bool was_ever_writable() const {
- RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK_RUN_ON(worker_thread());
return was_ever_writable_;
}
void set_local_content_direction(webrtc::RtpTransceiverDirection direction) {
@@ -256,7 +258,7 @@
bool AddRecvStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
bool RemoveRecvStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
void ResetUnsignaledRecvStream_w() RTC_RUN_ON(worker_thread());
- bool SetPayloadTypeDemuxingEnabled_w(bool enabled)
+ void SetPayloadTypeDemuxingEnabled_w(bool enabled)
RTC_RUN_ON(worker_thread());
bool AddSendStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
bool RemoveSendStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
@@ -264,7 +266,6 @@
// Should be called whenever the conditions for
// IsReadyToReceiveMedia/IsReadyToSendMedia are satisfied (or unsatisfied).
// Updates the send/recv state of the media channel.
- void UpdateMediaSendRecvState();
virtual void UpdateMediaSendRecvState_w() = 0;
bool UpdateLocalStreams_w(const std::vector<StreamParams>& streams,
@@ -286,6 +287,9 @@
// non-encrypted and encrypted extension is present for the same URI.
RtpHeaderExtensions GetFilteredRtpHeaderExtensions(
const RtpHeaderExtensions& extensions);
+ // Set a list of RTP extensions we should prepare to receive on the next
+ // UpdateRtpTransport call.
+ void SetReceiveExtensions(const RtpHeaderExtensions& extensions);
// From MessageHandler
void OnMessage(rtc::Message* pmsg) override;
@@ -302,13 +306,6 @@
void MaybeAddHandledPayloadType(int payload_type) RTC_RUN_ON(worker_thread());
void ClearHandledPayloadTypes() RTC_RUN_ON(worker_thread());
-
- void UpdateRtpHeaderExtensionMap(
- const RtpHeaderExtensions& header_extensions);
-
- bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread());
- bool RegisterRtpDemuxerSink_n() RTC_RUN_ON(network_thread());
-
// Return description of media channel to facilitate logging
std::string ToString() const;
@@ -319,7 +316,6 @@
void DisconnectFromRtpTransport();
void SignalSentPacket_n(const rtc::SentPacket& sent_packet)
RTC_RUN_ON(network_thread());
- bool IsReadyToSendMedia_n() const RTC_RUN_ON(network_thread());
rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_;
@@ -344,10 +340,9 @@
RTC_GUARDED_BY(network_thread());
std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_
RTC_GUARDED_BY(network_thread());
- // TODO(bugs.webrtc.org/12230): writable_ is accessed in tests
- // outside of the network thread.
- bool writable_ = false;
- bool was_ever_writable_ RTC_GUARDED_BY(network_thread()) = false;
+ bool writable_ RTC_GUARDED_BY(network_thread()) = false;
+ bool was_ever_writable_n_ RTC_GUARDED_BY(network_thread()) = false;
+ bool was_ever_writable_ RTC_GUARDED_BY(worker_thread()) = false;
const bool srtp_required_ = true;
const webrtc::CryptoOptions crypto_options_;
@@ -371,9 +366,10 @@
// Cached list of payload types, used if payload type demuxing is re-enabled.
std::set<uint8_t> payload_types_ RTC_GUARDED_BY(worker_thread());
- // TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed
- // on network thread in RegisterRtpDemuxerSink_n (called from Init_w)
+ // TODO(bugs.webrtc.org/12239): These two variables are modified on the worker
+ // thread, accessed on the network thread in UpdateRtpTransport.
webrtc::RtpDemuxerCriteria demuxer_criteria_;
+ RtpHeaderExtensions receive_rtp_header_extensions_;
// This generator is used to generate SSRCs for local streams.
// This is needed in cases where SSRCs are not negotiated or set explicitly
// like in Simulcast.
diff --git a/pc/channel_interface.h b/pc/channel_interface.h
index 68b6486..4580a2f 100644
--- a/pc/channel_interface.h
+++ b/pc/channel_interface.h
@@ -52,7 +52,8 @@
virtual bool SetRemoteContent(const MediaContentDescription* content,
webrtc::SdpType type,
std::string* error_desc) = 0;
- virtual bool SetPayloadTypeDemuxingEnabled(bool enabled) = 0;
+ virtual void SetPayloadTypeDemuxingEnabled(bool enabled) = 0;
+ virtual bool UpdateRtpTransport(std::string* error_desc) = 0;
// Access to the local and remote streams that were set on the channel.
virtual const std::vector<StreamParams>& local_streams() const = 0;
diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc
index c407147..fb62b08 100644
--- a/pc/channel_unittest.cc
+++ b/pc/channel_unittest.cc
@@ -323,19 +323,26 @@
fake_rtcp_packet_transport2_.get(), asymmetric);
}
});
+ // The transport becoming writable will asynchronously update the send state
+ // on the worker thread; since this test uses the main thread as the worker
+ // thread, we must process the message queue for this to occur.
+ WaitForThreads();
}
bool SendInitiate() {
bool result = channel1_->SetLocalContent(&local_media_content1_,
- SdpType::kOffer, NULL);
+ SdpType::kOffer, NULL) &&
+ channel1_->UpdateRtpTransport(nullptr);
if (result) {
channel1_->Enable(true);
result = channel2_->SetRemoteContent(&remote_media_content1_,
- SdpType::kOffer, NULL);
+ SdpType::kOffer, NULL) &&
+ channel2_->UpdateRtpTransport(nullptr);
if (result) {
ConnectFakeTransports();
result = channel2_->SetLocalContent(&local_media_content2_,
- SdpType::kAnswer, NULL);
+ SdpType::kAnswer, NULL) &&
+ channel2_->UpdateRtpTransport(nullptr);
}
}
return result;
@@ -344,27 +351,32 @@
bool SendAccept() {
channel2_->Enable(true);
return channel1_->SetRemoteContent(&remote_media_content2_,
- SdpType::kAnswer, NULL);
+ SdpType::kAnswer, NULL) &&
+ channel1_->UpdateRtpTransport(nullptr);
}
bool SendOffer() {
bool result = channel1_->SetLocalContent(&local_media_content1_,
- SdpType::kOffer, NULL);
+ SdpType::kOffer, NULL) &&
+ channel1_->UpdateRtpTransport(nullptr);
if (result) {
channel1_->Enable(true);
result = channel2_->SetRemoteContent(&remote_media_content1_,
- SdpType::kOffer, NULL);
+ SdpType::kOffer, NULL) &&
+ channel2_->UpdateRtpTransport(nullptr);
}
return result;
}
bool SendProvisionalAnswer() {
bool result = channel2_->SetLocalContent(&local_media_content2_,
- SdpType::kPrAnswer, NULL);
+ SdpType::kPrAnswer, NULL) &&
+ channel2_->UpdateRtpTransport(nullptr);
if (result) {
channel2_->Enable(true);
result = channel1_->SetRemoteContent(&remote_media_content2_,
- SdpType::kPrAnswer, NULL);
+ SdpType::kPrAnswer, NULL) &&
+ channel1_->UpdateRtpTransport(nullptr);
ConnectFakeTransports();
}
return result;
@@ -372,10 +384,12 @@
bool SendFinalAnswer() {
bool result = channel2_->SetLocalContent(&local_media_content2_,
- SdpType::kAnswer, NULL);
+ SdpType::kAnswer, NULL) &&
+ channel2_->UpdateRtpTransport(nullptr);
if (result)
result = channel1_->SetRemoteContent(&remote_media_content2_,
- SdpType::kAnswer, NULL);
+ SdpType::kAnswer, NULL) &&
+ channel1_->UpdateRtpTransport(nullptr);
return result;
}
@@ -608,10 +622,12 @@
CreateContent(0, kPcmuCodec, kH264Codec, &content1);
content1.AddStream(stream1);
EXPECT_TRUE(channel1_->SetLocalContent(&content1, SdpType::kOffer, NULL));
+ EXPECT_TRUE(channel1_->UpdateRtpTransport(nullptr));
EXPECT_TRUE(channel1_->Enable(true));
EXPECT_EQ(1u, media_channel1_->send_streams().size());
EXPECT_TRUE(channel2_->SetRemoteContent(&content1, SdpType::kOffer, NULL));
+ EXPECT_TRUE(channel2_->UpdateRtpTransport(nullptr));
EXPECT_EQ(1u, media_channel2_->recv_streams().size());
ConnectFakeTransports();
@@ -619,8 +635,10 @@
typename T::Content content2;
CreateContent(0, kPcmuCodec, kH264Codec, &content2);
EXPECT_TRUE(channel1_->SetRemoteContent(&content2, SdpType::kAnswer, NULL));
+ EXPECT_TRUE(channel1_->UpdateRtpTransport(nullptr));
EXPECT_EQ(0u, media_channel1_->recv_streams().size());
EXPECT_TRUE(channel2_->SetLocalContent(&content2, SdpType::kAnswer, NULL));
+ EXPECT_TRUE(channel2_->UpdateRtpTransport(nullptr));
EXPECT_TRUE(channel2_->Enable(true));
EXPECT_EQ(0u, media_channel2_->send_streams().size());
@@ -633,10 +651,12 @@
CreateContent(0, kPcmuCodec, kH264Codec, &content3);
content3.AddStream(stream2);
EXPECT_TRUE(channel2_->SetLocalContent(&content3, SdpType::kOffer, NULL));
+ EXPECT_TRUE(channel2_->UpdateRtpTransport(nullptr));
ASSERT_EQ(1u, media_channel2_->send_streams().size());
EXPECT_EQ(stream2, media_channel2_->send_streams()[0]);
EXPECT_TRUE(channel1_->SetRemoteContent(&content3, SdpType::kOffer, NULL));
+ EXPECT_TRUE(channel1_->UpdateRtpTransport(nullptr));
ASSERT_EQ(1u, media_channel1_->recv_streams().size());
EXPECT_EQ(stream2, media_channel1_->recv_streams()[0]);
@@ -644,9 +664,11 @@
typename T::Content content4;
CreateContent(0, kPcmuCodec, kH264Codec, &content4);
EXPECT_TRUE(channel1_->SetLocalContent(&content4, SdpType::kAnswer, NULL));
+ EXPECT_TRUE(channel1_->UpdateRtpTransport(nullptr));
EXPECT_EQ(0u, media_channel1_->send_streams().size());
EXPECT_TRUE(channel2_->SetRemoteContent(&content4, SdpType::kAnswer, NULL));
+ EXPECT_TRUE(channel2_->UpdateRtpTransport(nullptr));
EXPECT_EQ(0u, media_channel2_->recv_streams().size());
SendCustomRtp2(kSsrc2, 0);
@@ -915,8 +937,6 @@
EXPECT_FALSE(channel2_->SrtpActiveForTesting());
EXPECT_TRUE(SendInitiate());
WaitForThreads();
- EXPECT_TRUE(channel1_->writable());
- EXPECT_TRUE(channel2_->writable());
EXPECT_TRUE(SendAccept());
EXPECT_TRUE(channel1_->SrtpActiveForTesting());
EXPECT_TRUE(channel2_->SrtpActiveForTesting());
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index fd697ce..f924c40 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -2473,11 +2473,6 @@
// But all call-sites should be verifying this before calling us!
RTC_DCHECK(session_error() == SessionError::kNone);
- // If this is answer-ish we're ready to let media flow.
- if (type == SdpType::kPrAnswer || type == SdpType::kAnswer) {
- EnableSending();
- }
-
// Update the signaling state according to the specified state machine (see
// https://w3c.github.io/webrtc-pc/#rtcsignalingstate-enum).
if (type == SdpType::kOffer) {
@@ -4201,21 +4196,6 @@
}
}
-void SdpOfferAnswerHandler::EnableSending() {
- RTC_DCHECK_RUN_ON(signaling_thread());
- for (const auto& transceiver : transceivers()->List()) {
- cricket::ChannelInterface* channel = transceiver->internal()->channel();
- if (channel && !channel->enabled()) {
- channel->Enable(true);
- }
- }
-
- if (data_channel_controller()->rtp_data_channel() &&
- !data_channel_controller()->rtp_data_channel()->enabled()) {
- data_channel_controller()->rtp_data_channel()->Enable(true);
- }
-}
-
RTCError SdpOfferAnswerHandler::PushdownMediaDescription(
SdpType type,
cricket::ContentSource source) {
@@ -4225,15 +4205,13 @@
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(sdesc);
- if (!UpdatePayloadTypeDemuxingState(source)) {
- // Note that this is never expected to fail, since RtpDemuxer doesn't return
- // an error when changing payload type demux criteria, which is all this
- // does.
- LOG_AND_RETURN_ERROR(RTCErrorType::INTERNAL_ERROR,
- "Failed to update payload type demuxing state.");
- }
+ // Gather lists of updates to be made on cricket channels on the signaling
+ // thread, before performing them all at once on the worker thread. Necessary
+ // due to threading restrictions.
+ auto payload_type_demuxing_updates = GetPayloadTypeDemuxingUpdates(source);
+ std::vector<ContentUpdate> content_updates;
- // Push down the new SDP media section for each audio/video transceiver.
+ // Collect updates for each audio/video transceiver.
for (const auto& transceiver : transceivers()->List()) {
const ContentInfo* content_info =
FindMediaSectionForTransceiver(transceiver, sdesc);
@@ -4243,19 +4221,12 @@
}
const MediaContentDescription* content_desc =
content_info->media_description();
- if (!content_desc) {
- continue;
- }
- std::string error;
- bool success = (source == cricket::CS_LOCAL)
- ? channel->SetLocalContent(content_desc, type, &error)
- : channel->SetRemoteContent(content_desc, type, &error);
- if (!success) {
- LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, error);
+ if (content_desc) {
+ content_updates.emplace_back(channel, content_desc);
}
}
- // If using the RtpDataChannel, push down the new SDP section for it too.
+ // If using the RtpDataChannel, add it to the list of updates.
if (data_channel_controller()->rtp_data_channel()) {
const ContentInfo* data_content =
cricket::GetFirstDataContent(sdesc->description());
@@ -4263,21 +4234,21 @@
const MediaContentDescription* data_desc =
data_content->media_description();
if (data_desc) {
- std::string error;
- bool success = (source == cricket::CS_LOCAL)
- ? data_channel_controller()
- ->rtp_data_channel()
- ->SetLocalContent(data_desc, type, &error)
- : data_channel_controller()
- ->rtp_data_channel()
- ->SetRemoteContent(data_desc, type, &error);
- if (!success) {
- LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, error);
- }
+ content_updates.push_back(
+ {data_channel_controller()->rtp_data_channel(), data_desc});
}
}
}
+ RTCError error = pc_->worker_thread()->Invoke<RTCError>(
+ RTC_FROM_HERE,
+ rtc::Bind(&SdpOfferAnswerHandler::ApplyChannelUpdates, this, type, source,
+ std::move(payload_type_demuxing_updates),
+ std::move(content_updates)));
+ if (!error.ok()) {
+ return error;
+ }
+
// Need complete offer/answer with an SCTP m= section before starting SCTP,
// according to https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-19
if (pc_->sctp_mid() && local_description() && remote_description()) {
@@ -4306,6 +4277,49 @@
return RTCError::OK();
}
+RTCError SdpOfferAnswerHandler::ApplyChannelUpdates(
+ SdpType type,
+ cricket::ContentSource source,
+ std::vector<PayloadTypeDemuxingUpdate> payload_type_demuxing_updates,
+ std::vector<ContentUpdate> content_updates) {
+ RTC_DCHECK_RUN_ON(pc_->worker_thread());
+ // If this is answer-ish we're ready to let media flow.
+ bool enable_sending = type == SdpType::kPrAnswer || type == SdpType::kAnswer;
+ std::set<cricket::ChannelInterface*> modified_channels;
+ for (const auto& update : payload_type_demuxing_updates) {
+ modified_channels.insert(update.channel);
+ update.channel->SetPayloadTypeDemuxingEnabled(update.enabled);
+ }
+ for (const auto& update : content_updates) {
+ modified_channels.insert(update.channel);
+ std::string error;
+ bool success = (source == cricket::CS_LOCAL)
+ ? update.channel->SetLocalContent(
+ update.content_description, type, &error)
+ : update.channel->SetRemoteContent(
+ update.content_description, type, &error);
+ if (!success) {
+ LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, error);
+ }
+ if (enable_sending && !update.channel->enabled()) {
+ update.channel->Enable(true);
+ }
+ }
+ // The above calls may have modified properties of the channel (header
+ // extension mappings, demuxer criteria) which still need to be applied to the
+ // RtpTransport.
+ return pc_->network_thread()->Invoke<RTCError>(
+ RTC_FROM_HERE, [modified_channels] {
+ for (auto channel : modified_channels) {
+ std::string error;
+ if (!channel->UpdateRtpTransport(&error)) {
+ LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER, error);
+ }
+ }
+ return RTCError::OK();
+ });
+}
+
RTCError SdpOfferAnswerHandler::PushdownTransportDescription(
cricket::ContentSource source,
SdpType type) {
@@ -4904,7 +4918,8 @@
return "";
}
-bool SdpOfferAnswerHandler::UpdatePayloadTypeDemuxingState(
+std::vector<SdpOfferAnswerHandler::PayloadTypeDemuxingUpdate>
+SdpOfferAnswerHandler::GetPayloadTypeDemuxingUpdates(
cricket::ContentSource source) {
RTC_DCHECK_RUN_ON(signaling_thread());
// We may need to delete any created default streams and disable creation of
@@ -4976,8 +4991,7 @@
// Gather all updates ahead of time so that all channels can be updated in a
// single Invoke; necessary due to thread guards.
- std::vector<std::pair<RtpTransceiverDirection, cricket::ChannelInterface*>>
- channels_to_update;
+ std::vector<PayloadTypeDemuxingUpdate> channel_updates;
for (const auto& transceiver : transceivers()->List()) {
cricket::ChannelInterface* channel = transceiver->internal()->channel();
const ContentInfo* content =
@@ -4990,38 +5004,22 @@
if (source == cricket::CS_REMOTE) {
local_direction = RtpTransceiverDirectionReversed(local_direction);
}
- channels_to_update.emplace_back(local_direction,
- transceiver->internal()->channel());
+ cricket::MediaType media_type = channel->media_type();
+ bool in_bundle_group =
+ (bundle_group && bundle_group->HasContentName(channel->content_name()));
+ bool payload_type_demuxing_enabled = false;
+ if (media_type == cricket::MediaType::MEDIA_TYPE_AUDIO) {
+ payload_type_demuxing_enabled =
+ (!in_bundle_group || pt_demuxing_enabled_audio) &&
+ RtpTransceiverDirectionHasRecv(local_direction);
+ } else if (media_type == cricket::MediaType::MEDIA_TYPE_VIDEO) {
+ payload_type_demuxing_enabled =
+ (!in_bundle_group || pt_demuxing_enabled_video) &&
+ RtpTransceiverDirectionHasRecv(local_direction);
+ }
+ channel_updates.emplace_back(channel, payload_type_demuxing_enabled);
}
-
- if (channels_to_update.empty()) {
- return true;
- }
- return pc_->worker_thread()->Invoke<bool>(
- RTC_FROM_HERE, [&channels_to_update, bundle_group,
- pt_demuxing_enabled_audio, pt_demuxing_enabled_video]() {
- for (const auto& it : channels_to_update) {
- RtpTransceiverDirection local_direction = it.first;
- cricket::ChannelInterface* channel = it.second;
- cricket::MediaType media_type = channel->media_type();
- bool in_bundle_group = (bundle_group && bundle_group->HasContentName(
- channel->content_name()));
- if (media_type == cricket::MediaType::MEDIA_TYPE_AUDIO) {
- if (!channel->SetPayloadTypeDemuxingEnabled(
- (!in_bundle_group || pt_demuxing_enabled_audio) &&
- RtpTransceiverDirectionHasRecv(local_direction))) {
- return false;
- }
- } else if (media_type == cricket::MediaType::MEDIA_TYPE_VIDEO) {
- if (!channel->SetPayloadTypeDemuxingEnabled(
- (!in_bundle_group || pt_demuxing_enabled_video) &&
- RtpTransceiverDirectionHasRecv(local_direction))) {
- return false;
- }
- }
- }
- return true;
- });
+ return channel_updates;
}
} // namespace webrtc
diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h
index 43a3dbb..4b14f20 100644
--- a/pc/sdp_offer_answer.h
+++ b/pc/sdp_offer_answer.h
@@ -455,15 +455,32 @@
cricket::MediaType media_type,
StreamCollection* new_streams);
- // Enables media channels to allow sending of media.
- // This enables media to flow on all configured audio/video channels and the
- // RtpDataChannel.
- void EnableSending();
// Push the media parts of the local or remote session description
- // down to all of the channels.
+ // down to all of the channels, and enable sending if applicable.
RTCError PushdownMediaDescription(SdpType type,
cricket::ContentSource source);
+ struct PayloadTypeDemuxingUpdate {
+ PayloadTypeDemuxingUpdate(cricket::ChannelInterface* channel, bool enabled)
+ : channel(channel), enabled(enabled) {}
+ cricket::ChannelInterface* channel;
+ bool enabled;
+ };
+ struct ContentUpdate {
+ ContentUpdate(cricket::ChannelInterface* channel,
+ const cricket::MediaContentDescription* content_description)
+ : channel(channel), content_description(content_description) {}
+ cricket::ChannelInterface* channel;
+ const cricket::MediaContentDescription* content_description;
+ };
+ // Helper method used by PushdownMediaDescription to apply a batch of updates
+ // to BaseChannels on the worker thread.
+ RTCError ApplyChannelUpdates(
+ SdpType type,
+ cricket::ContentSource source,
+ std::vector<PayloadTypeDemuxingUpdate> payload_type_demuxing_updates,
+ std::vector<ContentUpdate> content_updates);
+
RTCError PushdownTransportDescription(cricket::ContentSource source,
SdpType type);
// Helper function to remove stopped transceivers.
@@ -550,9 +567,14 @@
const std::string& mid) const;
const std::string GetTransportName(const std::string& content_name);
- // Based on number of transceivers per media type, enabled or disable
- // payload type based demuxing in the affected channels.
- bool UpdatePayloadTypeDemuxingState(cricket::ContentSource source);
+
+ // Based on number of transceivers per media type, and their bundle status and
+ // payload types, determine whether payload type based demuxing should be
+ // enabled or disabled. Returns a list of channels and the corresponding
+ // value to be passed into SetPayloadTypeDemuxingEnabled, so that this action
+ // can be combined with other operations on the worker thread.
+ std::vector<PayloadTypeDemuxingUpdate> GetPayloadTypeDemuxingUpdates(
+ cricket::ContentSource source);
// ==================================================================
// Access to pc_ variables
diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h
index 1be4dcb0c..3a73225 100644
--- a/pc/test/mock_channel_interface.h
+++ b/pc/test/mock_channel_interface.h
@@ -46,7 +46,8 @@
webrtc::SdpType,
std::string*),
(override));
- MOCK_METHOD(bool, SetPayloadTypeDemuxingEnabled, (bool), (override));
+ MOCK_METHOD(void, SetPayloadTypeDemuxingEnabled, (bool), (override));
+ MOCK_METHOD(bool, UpdateRtpTransport, (std::string*), (override));
MOCK_METHOD(const std::vector<StreamParams>&,
local_streams,
(),