Move RTP/RTCP demuxing logic from BaseChannel to RtpTransport.
BUG=webrtc:7013
Review-Url: https://codereview.webrtc.org/2890263003
Cr-Commit-Position: refs/heads/master@{#18391}
diff --git a/webrtc/pc/channel.cc b/webrtc/pc/channel.cc
index f5428a4..a08d7ae 100644
--- a/webrtc/pc/channel.cc
+++ b/webrtc/pc/channel.cc
@@ -105,15 +105,9 @@
DataMediaChannel::Error error;
};
-static const char* PacketType(bool rtcp) {
- return (!rtcp) ? "RTP" : "RTCP";
-}
-
static bool ValidPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
// Check the packet size. We could check the header too if needed.
- return (packet &&
- packet->size() >= (!rtcp ? kMinRtpPacketLen : kMinRtcpPacketLen) &&
- packet->size() <= kMaxRtpPacketLen);
+ return packet && IsValidRtpRtcpPacketSize(rtcp, packet->size());
}
static bool IsReceiveContentDirection(MediaContentDirection direction) {
@@ -179,6 +173,11 @@
#endif
rtp_transport_.SignalReadyToSend.connect(
this, &BaseChannel::OnTransportReadyToSend);
+ // TODO(zstein): RtpTransport::SignalPacketReceived will probably be replaced
+ // with a callback interface later so that the demuxer can select which
+ // channel to signal.
+ rtp_transport_.SignalPacketReceived.connect(this,
+ &BaseChannel::OnPacketReceived);
LOG(LS_INFO) << "Created channel for " << content_name;
}
@@ -214,6 +213,9 @@
DisconnectFromPacketTransport(rtp_transport_.rtcp_packet_transport());
}
+ rtp_transport_.SetRtpPacketTransport(nullptr);
+ rtp_transport_.SetRtcpPacketTransport(nullptr);
+
// Clear pending read packets/messages.
network_thread_->Clear(&invoker_);
network_thread_->Clear(this);
@@ -397,7 +399,6 @@
// TODO(zstein): de-dup with ConnectToPacketTransport
transport->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
- transport->SignalReadPacket.connect(this, &BaseChannel::OnPacketRead);
transport->SignalDtlsState.connect(this, &BaseChannel::OnDtlsState);
transport->SignalSentPacket.connect(this, &BaseChannel::SignalSentPacket_n);
transport->ice_transport()->SignalSelectedCandidatePairChanged.connect(
@@ -411,7 +412,6 @@
false);
transport->SignalWritableState.disconnect(this);
- transport->SignalReadPacket.disconnect(this);
transport->SignalDtlsState.disconnect(this);
transport->SignalSentPacket.disconnect(this);
transport->ice_transport()->SignalSelectedCandidatePairChanged.disconnect(
@@ -422,7 +422,6 @@
rtc::PacketTransportInternal* transport) {
RTC_DCHECK_RUN_ON(network_thread_);
transport->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
- transport->SignalReadPacket.connect(this, &BaseChannel::OnPacketRead);
transport->SignalSentPacket.connect(this, &BaseChannel::SignalSentPacket_n);
}
@@ -430,7 +429,6 @@
rtc::PacketTransportInternal* transport) {
RTC_DCHECK_RUN_ON(network_thread_);
transport->SignalWritableState.disconnect(this);
- transport->SignalReadPacket.disconnect(this);
transport->SignalSentPacket.disconnect(this);
}
@@ -576,22 +574,6 @@
UpdateWritableState_n();
}
-void BaseChannel::OnPacketRead(rtc::PacketTransportInternal* transport,
- const char* data,
- size_t len,
- const rtc::PacketTime& packet_time,
- int flags) {
- TRACE_EVENT0("webrtc", "BaseChannel::OnPacketRead");
- // OnPacketRead gets called from P2PSocket; now pass data to MediaEngine
- RTC_DCHECK(network_thread_->IsCurrent());
-
- // When using RTCP multiplexing we might get RTCP packets on the RTP
- // transport. We feed RTP traffic into the demuxer to determine if it is RTCP.
- bool rtcp = PacketIsRtcp(transport, data, len);
- rtc::CopyOnWriteBuffer packet(data, len);
- HandlePacket(rtcp, &packet, packet_time);
-}
-
void BaseChannel::OnDtlsState(DtlsTransportInternal* transport,
DtlsTransportState state) {
if (!ShouldSetupDtlsSrtp_n()) {
@@ -641,13 +623,6 @@
Bind(&MediaChannel::OnReadyToSend, media_channel_, ready));
}
-bool BaseChannel::PacketIsRtcp(const rtc::PacketTransportInternal* transport,
- const char* data,
- size_t len) {
- return (transport == rtp_transport_.rtcp_packet_transport() ||
- rtcp_mux_filter_.DemuxRtcp(data, static_cast<int>(len)));
-}
-
bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
@@ -680,7 +655,7 @@
// Protect ourselves against crazy data.
if (!ValidPacket(rtcp, packet)) {
LOG(LS_ERROR) << "Dropping outgoing " << content_name_ << " "
- << PacketType(rtcp)
+ << RtpRtcpStringLiteral(rtcp)
<< " packet: wrong size=" << packet->size();
return false;
}
@@ -772,31 +747,13 @@
return rtp_transport_.SendPacket(rtcp, packet, updated_options, flags);
}
-bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
- // Protect ourselves against crazy data.
- if (!ValidPacket(rtcp, packet)) {
- LOG(LS_ERROR) << "Dropping incoming " << content_name_ << " "
- << PacketType(rtcp)
- << " packet: wrong size=" << packet->size();
- return false;
- }
- if (rtcp) {
- // Permit all (seemingly valid) RTCP packets.
- return true;
- }
- // Check whether we handle this payload.
- return bundle_filter_.DemuxPacket(packet->data(), packet->size());
+bool BaseChannel::HandlesPayloadType(int packet_type) const {
+ return rtp_transport_.HandlesPayloadType(packet_type);
}
-void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
- const rtc::PacketTime& packet_time) {
- RTC_DCHECK(network_thread_->IsCurrent());
- if (!WantsPacket(rtcp, packet)) {
- return;
- }
-
- // We are only interested in the first rtp packet because that
- // indicates the media has started flowing.
+void BaseChannel::OnPacketReceived(bool rtcp,
+ rtc::CopyOnWriteBuffer& packet,
+ const rtc::PacketTime& packet_time) {
if (!has_received_packet_ && !rtcp) {
has_received_packet_ = true;
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
@@ -805,8 +762,8 @@
// Unprotect the packet, if needed.
if (srtp_filter_.IsActive()) {
TRACE_EVENT0("webrtc", "SRTP Decode");
- char* data = packet->data<char>();
- int len = static_cast<int>(packet->size());
+ char* data = packet.data<char>();
+ int len = static_cast<int>(packet.size());
bool res;
if (!rtcp) {
res = srtp_filter_.UnprotectRtp(data, len, &len);
@@ -816,8 +773,8 @@
GetRtpSeqNum(data, len, &seq_num);
GetRtpSsrc(data, len, &ssrc);
LOG(LS_ERROR) << "Failed to unprotect " << content_name_
- << " RTP packet: size=" << len
- << ", seqnum=" << seq_num << ", SSRC=" << ssrc;
+ << " RTP packet: size=" << len << ", seqnum=" << seq_num
+ << ", SSRC=" << ssrc;
return;
}
} else {
@@ -831,7 +788,7 @@
}
}
- packet->SetSize(len);
+ packet.SetSize(len);
} else if (srtp_required_) {
// Our session description indicates that SRTP is required, but we got a
// packet before our SRTP filter is active. This means either that
@@ -844,20 +801,21 @@
// before sending media, to prevent weird failure modes, so it's fine
// for us to just eat packets here. This is all sidestepped if RTCP mux
// is used anyway.
- LOG(LS_WARNING) << "Can't process incoming " << PacketType(rtcp)
+ LOG(LS_WARNING) << "Can't process incoming " << RtpRtcpStringLiteral(rtcp)
<< " packet when SRTP is inactive and crypto is required";
return;
}
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, worker_thread_,
- Bind(&BaseChannel::OnPacketReceived, this, rtcp, *packet, packet_time));
+ Bind(&BaseChannel::ProcessPacket, this, rtcp, packet, packet_time));
}
-void BaseChannel::OnPacketReceived(bool rtcp,
- const rtc::CopyOnWriteBuffer& packet,
- const rtc::PacketTime& packet_time) {
+void BaseChannel::ProcessPacket(bool rtcp,
+ const rtc::CopyOnWriteBuffer& packet,
+ const rtc::PacketTime& packet_time) {
RTC_DCHECK(worker_thread_->IsCurrent());
+
// Need to copy variable because OnRtcpReceived/OnPacketReceived
// requires non-const pointer to buffer. This doesn't memcpy the actual data.
rtc::CopyOnWriteBuffer data(packet);
@@ -987,7 +945,7 @@
}
LOG(LS_INFO) << "Installing keys from DTLS-SRTP on " << content_name() << " "
- << PacketType(rtcp);
+ << RtpRtcpStringLiteral(rtcp);
int key_len;
int salt_len;
@@ -1448,6 +1406,10 @@
}
}
+void BaseChannel::AddHandledPayloadType(int payload_type) {
+ rtp_transport_.AddHandledPayloadType(payload_type);
+}
+
void BaseChannel::FlushRtcpMessages_n() {
// Flush all remaining RTCP messages. This should only be called in
// destructor.
@@ -1659,15 +1621,13 @@
media_channel()->GetActiveStreams(actives);
}
-void VoiceChannel::OnPacketRead(rtc::PacketTransportInternal* transport,
- const char* data,
- size_t len,
- const rtc::PacketTime& packet_time,
- int flags) {
- BaseChannel::OnPacketRead(transport, data, len, packet_time, flags);
+void VoiceChannel::OnPacketReceived(bool rtcp,
+ rtc::CopyOnWriteBuffer& packet,
+ const rtc::PacketTime& packet_time) {
+ BaseChannel::OnPacketReceived(rtcp, packet, packet_time);
// Set a flag when we've received an RTP packet. If we're waiting for early
// media, this will disable the timeout.
- if (!received_media_ && !PacketIsRtcp(transport, data, len)) {
+ if (!received_media_ && !rtcp) {
received_media_ = true;
}
}
@@ -1766,7 +1726,7 @@
return false;
}
for (const AudioCodec& codec : audio->codecs()) {
- bundle_filter()->AddPayloadType(codec.id);
+ AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;
@@ -2039,7 +1999,7 @@
return false;
}
for (const VideoCodec& codec : video->codecs()) {
- bundle_filter()->AddPayloadType(codec.id);
+ AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;
@@ -2234,7 +2194,7 @@
return false;
}
for (const DataCodec& codec : data->codecs()) {
- bundle_filter()->AddPayloadType(codec.id);
+ AddHandledPayloadType(codec.id);
}
last_recv_params_ = recv_params;