Annotate cricket::BaseChannel with thread guards
This CL also adds commentary to member variables that couldn't be guarded
because they're accessed from multiple threads.
Bug: webrtc:12230
Change-Id: I5193a7ef36ab25588c76ee6a1863de6a844be1dc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/195331
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32705}
diff --git a/pc/channel.cc b/pc/channel.cc
index 02ee9d2..6ab4371 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -160,6 +160,9 @@
}
std::string BaseChannel::ToString() const {
+ // TODO(bugs.webrtc.org/12230): When media_channel_ is guarded by
+ // worker_thread(), rewrite this debug printout to not print the
+ // media type when called from non-worker-thread.
rtc::StringBuilder sb;
sb << "{mid: " << content_name_;
if (media_channel_) {
@@ -170,8 +173,9 @@
}
bool BaseChannel::ConnectToRtpTransport() {
+ RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_n()) {
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
return false;
}
@@ -187,6 +191,7 @@
}
void BaseChannel::DisconnectFromRtpTransport() {
+ RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
rtp_transport_->UnregisterRtpDemuxerSink(this);
rtp_transport_->SignalReadyToSend.disconnect(this);
@@ -196,7 +201,7 @@
}
void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) {
- RTC_DCHECK_RUN_ON(worker_thread_);
+ RTC_DCHECK_RUN_ON(worker_thread());
network_thread_->Invoke<void>(
RTC_FROM_HERE, [this, rtp_transport] { SetRtpTransport(rtp_transport); });
@@ -213,6 +218,7 @@
// functions, so need to stop this process in Deinit that is called in
// derived classes destructor.
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
+ RTC_DCHECK_RUN_ON(network_thread());
FlushRtcpMessages_n();
if (rtp_transport_) {
@@ -225,15 +231,15 @@
}
bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
- if (rtp_transport == rtp_transport_) {
- return true;
- }
-
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this, rtp_transport] {
return SetRtpTransport(rtp_transport);
});
}
+ RTC_DCHECK_RUN_ON(network_thread());
+ if (rtp_transport == rtp_transport_) {
+ return true;
+ }
if (rtp_transport_) {
DisconnectFromRtpTransport();
@@ -338,7 +344,6 @@
int BaseChannel::SetOption_n(SocketType type,
rtc::Socket::Option opt,
int value) {
- RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
switch (type) {
case ST_RTP:
@@ -376,6 +381,7 @@
// work correctly. Intentionally leave it broken to simplify the code and
// encourage the users to stop using non-muxing RTCP.
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
+ RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
});
}
@@ -393,8 +399,10 @@
}
void BaseChannel::OnTransportReadyToSend(bool ready) {
- invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
- [=] { media_channel_->OnReadyToSend(ready); });
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
+ RTC_DCHECK_RUN_ON(worker_thread());
+ media_channel_->OnReadyToSend(ready);
+ });
}
bool BaseChannel::SendPacket(bool rtcp,
@@ -418,6 +426,7 @@
network_thread_->Post(RTC_FROM_HERE, this, message_id, data);
return true;
}
+ RTC_DCHECK_RUN_ON(network_thread());
TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
@@ -506,25 +515,34 @@
void BaseChannel::UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions) {
- RTC_DCHECK(rtp_transport_);
// Update the header extension map on network thread in case there is data
// race.
- // TODO(zhihuang): Add an rtc::ThreadChecker make sure to RtpTransport won't
- // be accessed from different threads.
//
// NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
// extension maps are not merged when BUNDLE is enabled. This is fine because
// the ID for MID should be consistent among all the RTP transports.
network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &header_extensions] {
+ RTC_DCHECK_RUN_ON(network_thread());
rtp_transport_->UpdateRtpHeaderExtensionMap(header_extensions);
});
}
-bool BaseChannel::RegisterRtpDemuxerSink() {
+bool BaseChannel::RegisterRtpDemuxerSink_w() {
+ // Copy demuxer criteria, since they're a worker-thread variable
+ // and we want to pass them to the network thread
+ return network_thread_->Invoke<bool>(
+ RTC_FROM_HERE, [this, demuxer_criteria = demuxer_criteria_] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK(rtp_transport_);
+ return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this);
+ });
+}
+
+bool BaseChannel::RegisterRtpDemuxerSink_n() {
RTC_DCHECK(rtp_transport_);
- return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this] {
- return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
- });
+ // TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
+ // networking thread.
+ return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
}
void BaseChannel::EnableMedia_w() {
@@ -557,7 +575,6 @@
}
void BaseChannel::ChannelWritable_n() {
- RTC_DCHECK_RUN_ON(network_thread());
if (writable_) {
return;
}
@@ -571,7 +588,6 @@
}
void BaseChannel::ChannelNotWritable_n() {
- RTC_DCHECK_RUN_ON(network_thread());
if (!writable_)
return;
@@ -581,12 +597,10 @@
}
bool BaseChannel::AddRecvStream_w(const StreamParams& sp) {
- RTC_DCHECK(worker_thread() == rtc::Thread::Current());
return media_channel()->AddRecvStream(sp);
}
bool BaseChannel::RemoveRecvStream_w(uint32_t ssrc) {
- RTC_DCHECK(worker_thread() == rtc::Thread::Current());
return media_channel()->RemoveRecvStream(ssrc);
}
@@ -596,7 +610,6 @@
}
bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
- RTC_DCHECK_RUN_ON(worker_thread());
if (enabled == payload_type_demuxing_enabled_) {
return true;
}
@@ -609,7 +622,7 @@
// there is no straightforward way to identify those streams.
media_channel()->ResetUnsignaledRecvStream();
demuxer_criteria_.payload_types.clear();
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to disable payload type demuxing for "
<< ToString();
return false;
@@ -617,7 +630,7 @@
} else if (!payload_types_.empty()) {
demuxer_criteria_.payload_types.insert(payload_types_.begin(),
payload_types_.end());
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to enable payload type demuxing for "
<< ToString();
return false;
@@ -765,7 +778,7 @@
new_stream.ssrcs.end());
}
// Re-register the sink to update the receiving ssrcs.
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
ret = false;
}
@@ -775,7 +788,6 @@
RtpHeaderExtensions BaseChannel::GetFilteredRtpHeaderExtensions(
const RtpHeaderExtensions& extensions) {
- RTC_DCHECK(rtp_transport_);
if (crypto_options_.srtp.enable_encrypted_rtp_header_extensions) {
RtpHeaderExtensions filtered;
absl::c_copy_if(extensions, std::back_inserter(filtered),
@@ -826,7 +838,6 @@
void BaseChannel::FlushRtcpMessages_n() {
// Flush all remaining RTCP messages. This should only be called in
// destructor.
- RTC_DCHECK_RUN_ON(network_thread());
rtc::MessageList rtcp_messages;
network_thread_->Clear(this, MSG_SEND_RTCP_PACKET, &rtcp_messages);
for (const auto& message : rtcp_messages) {
@@ -836,7 +847,6 @@
}
void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) {
- RTC_DCHECK_RUN_ON(network_thread());
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
[this, sent_packet] {
RTC_DCHECK_RUN_ON(worker_thread());
@@ -881,6 +891,7 @@
void VoiceChannel::UpdateMediaSendRecvState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
+ RTC_DCHECK_RUN_ON(worker_thread());
bool recv = IsReadyToReceiveMedia_w();
media_channel()->SetPlayout(recv);
@@ -931,7 +942,7 @@
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up audio demuxing for " << ToString();
return false;
}
@@ -997,7 +1008,7 @@
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to update audio demuxing for " << ToString();
return false;
}
@@ -1048,6 +1059,7 @@
void VideoChannel::UpdateMediaSendRecvState_w() {
// Send outgoing data if we're the active call, we have the remote content,
// and we have had some form of connectivity.
+ RTC_DCHECK_RUN_ON(worker_thread());
bool send = IsReadyToSendMedia_w();
if (!media_channel()->SetSend(send)) {
RTC_LOG(LS_ERROR) << "Failed to SetSend on video channel: " + ToString();
@@ -1124,7 +1136,7 @@
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up video demuxing for " << ToString();
return false;
}
@@ -1234,7 +1246,7 @@
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to update video demuxing for " << ToString();
return false;
}
@@ -1349,7 +1361,7 @@
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
- if (!RegisterRtpDemuxerSink()) {
+ if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up data demuxing for " << ToString();
return false;
}
@@ -1437,6 +1449,7 @@
void RtpDataChannel::UpdateMediaSendRecvState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
+ RTC_DCHECK_RUN_ON(worker_thread());
bool recv = IsReadyToReceiveMedia_w();
if (!media_channel()->SetReceive(recv)) {
RTC_LOG(LS_ERROR) << "Failed to SetReceive on data channel: " << ToString();
diff --git a/pc/channel.h b/pc/channel.h
index 51cc40f..ad75070 100644
--- a/pc/channel.h
+++ b/pc/channel.h
@@ -106,6 +106,13 @@
// This function returns true if using SRTP (DTLS-based keying or SDES).
bool srtp_active() const {
+ // TODO(bugs.webrtc.org/12230): At least some tests call this function
+ // from other threads.
+ if (!network_thread_->IsCurrent()) {
+ return network_thread_->Invoke<bool>(RTC_FROM_HERE,
+ [this] { return srtp_active(); });
+ }
+ RTC_DCHECK_RUN_ON(network_thread());
return rtp_transport_ && rtp_transport_->IsSrtpActive();
}
@@ -117,7 +124,16 @@
// internally. It would replace the |SetTransports| and its variants.
bool SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) override;
- webrtc::RtpTransportInternal* rtp_transport() const { return rtp_transport_; }
+ webrtc::RtpTransportInternal* rtp_transport() const {
+ // TODO(bugs.webrtc.org/12230): At least some tests call this function
+ // from other threads.
+ if (!network_thread_->IsCurrent()) {
+ return network_thread_->Invoke<webrtc::RtpTransportInternal*>(
+ RTC_FROM_HERE, [this] { return rtp_transport(); });
+ }
+ RTC_DCHECK_RUN_ON(network_thread());
+ return rtp_transport_;
+ }
// Channel control
bool SetLocalContent(const MediaContentDescription* content,
@@ -156,7 +172,8 @@
// Only public for unit tests. Otherwise, consider protected.
int SetOption(SocketType type, rtc::Socket::Option o, int val) override;
- int SetOption_n(SocketType type, rtc::Socket::Option o, int val);
+ int SetOption_n(SocketType type, rtc::Socket::Option o, int val)
+ RTC_RUN_ON(network_thread());
// RtpPacketSinkInterface overrides.
void OnRtpPacket(const webrtc::RtpPacketReceived& packet) override;
@@ -167,14 +184,24 @@
transport_name_ = transport_name;
}
- MediaChannel* media_channel() const override { return media_channel_.get(); }
+ MediaChannel* media_channel() const override {
+ // TODO(bugs.webrtc.org/12230): Called on multiple threads,
+ // including from StatsCollector::ExtractMediaInfo.
+ // RTC_DCHECK_RUN_ON(worker_thread());
+ return media_channel_.get();
+ }
protected:
- bool was_ever_writable() const { return was_ever_writable_; }
+ bool was_ever_writable() const {
+ RTC_DCHECK_RUN_ON(network_thread());
+ return was_ever_writable_;
+ }
void set_local_content_direction(webrtc::RtpTransceiverDirection direction) {
+ RTC_DCHECK_RUN_ON(worker_thread());
local_content_direction_ = direction;
}
void set_remote_content_direction(webrtc::RtpTransceiverDirection direction) {
+ RTC_DCHECK_RUN_ON(worker_thread());
remote_content_direction_ = direction;
}
// These methods verify that:
@@ -187,11 +214,11 @@
//
// When any of these properties change, UpdateMediaSendRecvState_w should be
// called.
- bool IsReadyToReceiveMedia_w() const;
- bool IsReadyToSendMedia_w() const;
+ bool IsReadyToReceiveMedia_w() const RTC_RUN_ON(worker_thread());
+ bool IsReadyToSendMedia_w() const RTC_RUN_ON(worker_thread());
rtc::Thread* signaling_thread() { return signaling_thread_; }
- void FlushRtcpMessages_n();
+ void FlushRtcpMessages_n() RTC_RUN_ON(network_thread());
// NetworkInterface implementation, called by MediaEngine
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
@@ -211,22 +238,23 @@
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
- void EnableMedia_w();
- void DisableMedia_w();
+ void EnableMedia_w() RTC_RUN_ON(worker_thread());
+ void DisableMedia_w() RTC_RUN_ON(worker_thread());
// Performs actions if the RTP/RTCP writable state changed. This should
// be called whenever a channel's writable state changes or when RTCP muxing
// becomes active/inactive.
- void UpdateWritableState_n();
- void ChannelWritable_n();
- void ChannelNotWritable_n();
+ void UpdateWritableState_n() RTC_RUN_ON(network_thread());
+ void ChannelWritable_n() RTC_RUN_ON(network_thread());
+ void ChannelNotWritable_n() RTC_RUN_ON(network_thread());
- bool AddRecvStream_w(const StreamParams& sp);
- bool RemoveRecvStream_w(uint32_t ssrc);
- void ResetUnsignaledRecvStream_w();
- bool SetPayloadTypeDemuxingEnabled_w(bool enabled);
- bool AddSendStream_w(const StreamParams& sp);
- bool RemoveSendStream_w(uint32_t ssrc);
+ bool AddRecvStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
+ bool RemoveRecvStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
+ void ResetUnsignaledRecvStream_w() RTC_RUN_ON(worker_thread());
+ bool SetPayloadTypeDemuxingEnabled_w(bool enabled)
+ RTC_RUN_ON(worker_thread());
+ bool AddSendStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
+ bool RemoveSendStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
// Should be called whenever the conditions for
// IsReadyToReceiveMedia/IsReadyToSendMedia are satisfied (or unsatisfied).
@@ -236,10 +264,12 @@
bool UpdateLocalStreams_w(const std::vector<StreamParams>& streams,
webrtc::SdpType type,
- std::string* error_desc);
+ std::string* error_desc)
+ RTC_RUN_ON(worker_thread());
bool UpdateRemoteStreams_w(const std::vector<StreamParams>& streams,
webrtc::SdpType type,
- std::string* error_desc);
+ std::string* error_desc)
+ RTC_RUN_ON(worker_thread());
virtual bool SetLocalContent_w(const MediaContentDescription* content,
webrtc::SdpType type,
std::string* error_desc) = 0;
@@ -271,7 +301,8 @@
void UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions);
- bool RegisterRtpDemuxerSink();
+ bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread());
+ bool RegisterRtpDemuxerSink_n() RTC_RUN_ON(network_thread());
// Return description of media channel to facilitate logging
std::string ToString() const;
@@ -281,8 +312,9 @@
private:
bool ConnectToRtpTransport();
void DisconnectFromRtpTransport();
- void SignalSentPacket_n(const rtc::SentPacket& sent_packet);
- bool IsReadyToSendMedia_n() const;
+ void SignalSentPacket_n(const rtc::SentPacket& sent_packet)
+ RTC_RUN_ON(network_thread());
+ bool IsReadyToSendMedia_n() const RTC_RUN_ON(network_thread());
rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_;
@@ -296,27 +328,39 @@
const std::string content_name_;
// Won't be set when using raw packet transports. SDP-specific thing.
+ // TODO(bugs.webrtc.org/12230): Written on network thread, read on
+ // worker thread (at least).
std::string transport_name_;
- webrtc::RtpTransportInternal* rtp_transport_ = nullptr;
+ webrtc::RtpTransportInternal* rtp_transport_
+ RTC_GUARDED_BY(network_thread()) = nullptr;
- std::vector<std::pair<rtc::Socket::Option, int> > socket_options_;
- std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_;
+ std::vector<std::pair<rtc::Socket::Option, int> > socket_options_
+ RTC_GUARDED_BY(network_thread());
+ std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_
+ RTC_GUARDED_BY(network_thread());
+ // TODO(bugs.webrtc.org/12230): writable_ is accessed in tests
+ // outside of the network thread.
bool writable_ = false;
- bool was_ever_writable_ = false;
+ bool was_ever_writable_ RTC_GUARDED_BY(network_thread()) = false;
const bool srtp_required_ = true;
- webrtc::CryptoOptions crypto_options_;
+ const webrtc::CryptoOptions crypto_options_;
// MediaChannel related members that should be accessed from the worker
// thread.
+ // TODO(bugs.webrtc.org/12230): written on worker thread, accessed by
+ // multiple threads.
std::unique_ptr<MediaChannel> media_channel_;
// Currently the |enabled_| flag is accessed from the signaling thread as
// well, but it can be changed only when signaling thread does a synchronous
// call to the worker thread, so it should be safe.
bool enabled_ = false;
bool payload_type_demuxing_enabled_ RTC_GUARDED_BY(worker_thread()) = true;
- std::vector<StreamParams> local_streams_;
- std::vector<StreamParams> remote_streams_;
+ std::vector<StreamParams> local_streams_ RTC_GUARDED_BY(worker_thread());
+ std::vector<StreamParams> remote_streams_ RTC_GUARDED_BY(worker_thread());
+ // TODO(bugs.webrtc.org/12230): local_content_direction and
+ // remote_content_direction are set on the worker thread, but accessed on the
+ // network thread.
webrtc::RtpTransceiverDirection local_content_direction_ =
webrtc::RtpTransceiverDirection::kInactive;
webrtc::RtpTransceiverDirection remote_content_direction_ =
@@ -324,6 +368,8 @@
// Cached list of payload types, used if payload type demuxing is re-enabled.
std::set<uint8_t> payload_types_ RTC_GUARDED_BY(worker_thread());
+ // TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed
+ // on network thread in RegisterRtpDemuxerSink_n (called from Init_w)
webrtc::RtpDemuxerCriteria demuxer_criteria_;
// This generator is used to generate SSRCs for local streams.
// This is needed in cases where SSRCs are not negotiated or set explicitly