Move thread handling from source tracker.
This makes it simpler to use in more contexts.
Bug: b/364184684
Change-Id: I1b08ebd24e51ba1b3f85261eed503a78cd006fd8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/361480
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Jakob Ivarsson <jakobi@webrtc.org>
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42956}
diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc
index 44c95f3..bf2da5d 100644
--- a/audio/audio_receive_stream.cc
+++ b/audio/audio_receive_stream.cc
@@ -102,7 +102,6 @@
std::unique_ptr<voe::ChannelReceiveInterface> channel_receive)
: config_(config),
audio_state_(audio_state),
- source_tracker_(&env.clock()),
channel_receive_(std::move(channel_receive)) {
RTC_LOG(LS_INFO) << "AudioReceiveStreamImpl: " << config.rtp.remote_ssrc;
RTC_DCHECK(config.decoder_factory);
@@ -114,11 +113,6 @@
// Configure bandwidth estimation.
channel_receive_->RegisterReceiverCongestionControlObjects(packet_router);
- // When output is muted, ChannelReceive will directly notify the source
- // tracker of "delivered" frames, so RtpReceiver information will continue to
- // be updated.
- channel_receive_->SetSourceTracker(&source_tracker_);
-
// Complete configuration.
// TODO(solenberg): Config NACK history window (which is a packet count),
// using the actual packet size for the configured codec.
@@ -378,19 +372,13 @@
std::vector<RtpSource> AudioReceiveStreamImpl::GetSources() const {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
- return source_tracker_.GetSources();
+ return channel_receive_->GetSources();
}
AudioMixer::Source::AudioFrameInfo
AudioReceiveStreamImpl::GetAudioFrameWithInfo(int sample_rate_hz,
AudioFrame* audio_frame) {
- AudioMixer::Source::AudioFrameInfo audio_frame_info =
- channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
- if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError &&
- !audio_frame->packet_infos_.empty()) {
- source_tracker_.OnFrameDelivered(audio_frame->packet_infos_);
- }
- return audio_frame_info;
+ return channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
}
int AudioReceiveStreamImpl::Ssrc() const {
diff --git a/audio/audio_receive_stream.h b/audio/audio_receive_stream.h
index 4eac8a7..cb1450b 100644
--- a/audio/audio_receive_stream.h
+++ b/audio/audio_receive_stream.h
@@ -25,7 +25,6 @@
#include "audio/audio_state.h"
#include "call/audio_receive_stream.h"
#include "call/syncable.h"
-#include "modules/rtp_rtcp/source/source_tracker.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
@@ -156,7 +155,6 @@
SequenceChecker::kDetached};
webrtc::AudioReceiveStreamInterface::Config config_;
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
- SourceTracker source_tracker_;
const std::unique_ptr<voe::ChannelReceiveInterface> channel_receive_;
AudioSendStream* associated_send_stream_
RTC_GUARDED_BY(packet_sequence_checker_) = nullptr;
diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc
index a263ff9..0afda40 100644
--- a/audio/audio_receive_stream_unittest.cc
+++ b/audio/audio_receive_stream_unittest.cc
@@ -132,7 +132,6 @@
.WillRepeatedly(Invoke([](const std::map<int, SdpAudioFormat>& codecs) {
EXPECT_THAT(codecs, ::testing::IsEmpty());
}));
- EXPECT_CALL(*channel_receive_, SetSourceTracker(_));
EXPECT_CALL(*channel_receive_, GetLocalSsrc())
.WillRepeatedly(Return(kLocalSsrc));
diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc
index 7fcdf66..86c8cfb 100644
--- a/audio/channel_receive.cc
+++ b/audio/channel_receive.cc
@@ -171,7 +171,7 @@
int PreferredSampleRate() const override;
- void SetSourceTracker(SourceTracker* source_tracker) override;
+ std::vector<RtpSource> GetSources() const override;
// Associate to a send channel.
// Used for obtaining RTT for a receive-only channel.
@@ -240,7 +240,7 @@
std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
const uint32_t remote_ssrc_;
- SourceTracker* source_tracker_ = nullptr;
+ SourceTracker source_tracker_ RTC_GUARDED_BY(&worker_thread_checker_);
// Info for GetSyncInfo is updated on network or worker thread, and queried on
// the worker thread.
@@ -325,20 +325,18 @@
// Avoid inserting into NetEQ when we are not playing. Count the
// packet as discarded.
- // If we have a source_tracker_, tell it that the frame has been
- // "delivered". Normally, this happens in AudioReceiveStreamInterface when
- // audio frames are pulled out, but when playout is muted, nothing is
- // pulling frames. The downside of this approach is that frames delivered
- // this way won't be delayed for playout, and therefore will be
- // unsynchronized with (a) audio delay when playing and (b) any audio/video
- // synchronization. But the alternative is that muting playout also stops
- // the SourceTracker from updating RtpSource information.
- if (source_tracker_) {
- RtpPacketInfos::vector_type packet_vector = {
- RtpPacketInfo(rtpHeader, receive_time)};
- source_tracker_->OnFrameDelivered(RtpPacketInfos(packet_vector));
- }
-
+ // Tell source_tracker_ that the frame has been "delivered". Normally, this
+ // happens in AudioReceiveStreamInterface when audio frames are pulled out,
+ // but when playout is muted, nothing is pulling frames. The downside of
+ // this approach is that frames delivered this way won't be delayed for
+ // playout, and therefore will be unsynchronized with (a) audio delay when
+ // playing and (b) any audio/video synchronization. But the alternative is
+ // that muting playout also stops the SourceTracker from updating RtpSource
+ // information.
+ RtpPacketInfos::vector_type packet_vector = {
+ RtpPacketInfo(rtpHeader, receive_time)};
+ source_tracker_.OnFrameDelivered(RtpPacketInfos(packet_vector),
+ env_.clock().CurrentTime());
return;
}
@@ -482,7 +480,16 @@
}
packet_infos.push_back(std::move(new_packet_info));
}
- audio_frame->packet_infos_ = RtpPacketInfos(packet_infos);
+ audio_frame->packet_infos_ = RtpPacketInfos(std::move(packet_infos));
+ if (!audio_frame->packet_infos_.empty()) {
+ RtpPacketInfos infos_copy = audio_frame->packet_infos_;
+ Timestamp delivery_time = env_.clock().CurrentTime();
+ worker_thread_->PostTask(
+ SafeTask(worker_safety_.flag(), [this, infos_copy, delivery_time]() {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+ source_tracker_.OnFrameDelivered(infos_copy, delivery_time);
+ }));
+ }
++audio_frame_interval_count_;
if (audio_frame_interval_count_ >= kHistogramReportingInterval) {
@@ -514,10 +521,6 @@
acm_receiver_.last_output_sample_rate_hz());
}
-void ChannelReceive::SetSourceTracker(SourceTracker* source_tracker) {
- source_tracker_ = source_tracker;
-}
-
ChannelReceive::ChannelReceive(
const Environment& env,
NetEqFactory* neteq_factory,
@@ -538,6 +541,7 @@
worker_thread_(TaskQueueBase::Current()),
rtp_receive_statistics_(ReceiveStatistics::Create(&env_.clock())),
remote_ssrc_(remote_ssrc),
+ source_tracker_(&env_.clock()),
acm_receiver_(env_,
AcmConfig(neteq_factory,
decoder_factory,
@@ -1102,6 +1106,11 @@
: acm_receiver_.last_output_sample_rate_hz();
}
+std::vector<RtpSource> ChannelReceive::GetSources() const {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+ return source_tracker_.GetSources();
+}
+
} // namespace
std::unique_ptr<ChannelReceiveInterface> CreateChannelReceive(
diff --git a/audio/channel_receive.h b/audio/channel_receive.h
index 0a11573..4e2048d 100644
--- a/audio/channel_receive.h
+++ b/audio/channel_receive.h
@@ -148,9 +148,7 @@
virtual int PreferredSampleRate() const = 0;
- // Sets the source tracker to notify about "delivered" packets when output is
- // muted.
- virtual void SetSourceTracker(SourceTracker* source_tracker) = 0;
+ virtual std::vector<RtpSource> GetSources() const = 0;
// Associate to a send channel.
// Used for obtaining RTT for a receive-only channel.
diff --git a/audio/mock_voe_channel_proxy.h b/audio/mock_voe_channel_proxy.h
index 1a62f02..fc1cecb 100644
--- a/audio/mock_voe_channel_proxy.h
+++ b/audio/mock_voe_channel_proxy.h
@@ -62,7 +62,7 @@
(int sample_rate_hz, AudioFrame*),
(override));
MOCK_METHOD(int, PreferredSampleRate, (), (const, override));
- MOCK_METHOD(void, SetSourceTracker, (SourceTracker*), (override));
+ MOCK_METHOD(std::vector<RtpSource>, GetSources, (), (const, override));
MOCK_METHOD(void,
SetAssociatedSendChannel,
(const voe::ChannelSendInterface*),
diff --git a/modules/rtp_rtcp/source/source_tracker.cc b/modules/rtp_rtcp/source/source_tracker.cc
index cd881f1..4d1f406 100644
--- a/modules/rtp_rtcp/source/source_tracker.cc
+++ b/modules/rtp_rtcp/source/source_tracker.cc
@@ -17,42 +17,26 @@
namespace webrtc {
-SourceTracker::SourceTracker(Clock* clock)
- : worker_thread_(TaskQueueBase::Current()), clock_(clock) {
- RTC_DCHECK(worker_thread_);
+SourceTracker::SourceTracker(Clock* clock) : clock_(clock) {
RTC_DCHECK(clock_);
}
-void SourceTracker::OnFrameDelivered(RtpPacketInfos packet_infos) {
+void SourceTracker::OnFrameDelivered(const RtpPacketInfos& packet_infos,
+ Timestamp delivery_time) {
+ TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
if (packet_infos.empty()) {
return;
}
-
- Timestamp now = clock_->CurrentTime();
- if (worker_thread_->IsCurrent()) {
- RTC_DCHECK_RUN_ON(worker_thread_);
- OnFrameDeliveredInternal(now, packet_infos);
- } else {
- worker_thread_->PostTask(
- SafeTask(worker_safety_.flag(),
- [this, packet_infos = std::move(packet_infos), now]() {
- RTC_DCHECK_RUN_ON(worker_thread_);
- OnFrameDeliveredInternal(now, packet_infos);
- }));
+ if (delivery_time.IsInfinite()) {
+ delivery_time = clock_->CurrentTime();
}
-}
-
-void SourceTracker::OnFrameDeliveredInternal(
- Timestamp now,
- const RtpPacketInfos& packet_infos) {
- TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
for (const RtpPacketInfo& packet_info : packet_infos) {
for (uint32_t csrc : packet_info.csrcs()) {
SourceKey key(RtpSourceType::CSRC, csrc);
SourceEntry& entry = UpdateEntry(key);
- entry.timestamp = now;
+ entry.timestamp = delivery_time;
entry.audio_level = packet_info.audio_level();
entry.absolute_capture_time = packet_info.absolute_capture_time();
entry.local_capture_clock_offset =
@@ -63,19 +47,17 @@
SourceKey key(RtpSourceType::SSRC, packet_info.ssrc());
SourceEntry& entry = UpdateEntry(key);
- entry.timestamp = now;
+ entry.timestamp = delivery_time;
entry.audio_level = packet_info.audio_level();
entry.absolute_capture_time = packet_info.absolute_capture_time();
entry.local_capture_clock_offset = packet_info.local_capture_clock_offset();
entry.rtp_timestamp = packet_info.rtp_timestamp();
}
- PruneEntries(now);
+ PruneEntries(delivery_time);
}
std::vector<RtpSource> SourceTracker::GetSources() const {
- RTC_DCHECK_RUN_ON(worker_thread_);
-
PruneEntries(clock_->CurrentTime());
std::vector<RtpSource> sources;
diff --git a/modules/rtp_rtcp/source/source_tracker.h b/modules/rtp_rtcp/source/source_tracker.h
index 9d39599..3be339f 100644
--- a/modules/rtp_rtcp/source/source_tracker.h
+++ b/modules/rtp_rtcp/source/source_tracker.h
@@ -19,8 +19,6 @@
#include <vector>
#include "api/rtp_packet_infos.h"
-#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/task_queue_base.h"
#include "api/transport/rtp/rtp_source.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@@ -34,6 +32,7 @@
// - https://w3c.github.io/webrtc-pc/#dom-rtcrtpcontributingsource
// - https://w3c.github.io/webrtc-pc/#dom-rtcrtpsynchronizationsource
//
+// This class is thread unsafe.
class SourceTracker {
public:
// Amount of time before the entry associated with an update is removed. See:
@@ -49,7 +48,8 @@
// Updates the source entries when a frame is delivered to the
// RTCRtpReceiver's MediaStreamTrack.
- void OnFrameDelivered(RtpPacketInfos packet_infos);
+ void OnFrameDelivered(const RtpPacketInfos& packet_infos,
+ Timestamp delivery_time = Timestamp::MinusInfinity());
// Returns an `RtpSource` for each unique SSRC and CSRC identifier updated in
// the last `kTimeoutMs` milliseconds. Entries appear in reverse chronological
@@ -116,27 +116,21 @@
SourceKeyHasher,
SourceKeyComparator>;
- void OnFrameDeliveredInternal(Timestamp now,
- const RtpPacketInfos& packet_infos)
- RTC_RUN_ON(worker_thread_);
-
// Updates an entry by creating it (if it didn't previously exist) and moving
// it to the front of the list. Returns a reference to the entry.
- SourceEntry& UpdateEntry(const SourceKey& key) RTC_RUN_ON(worker_thread_);
+ SourceEntry& UpdateEntry(const SourceKey& key);
// Removes entries that have timed out. Marked as "const" so that we can do
// pruning in getters.
- void PruneEntries(Timestamp now) const RTC_RUN_ON(worker_thread_);
+ void PruneEntries(Timestamp now) const;
- TaskQueueBase* const worker_thread_;
Clock* const clock_;
// Entries are stored in reverse chronological order (i.e. with the most
// recently updated entries appearing first). Mutability is needed for timeout
// pruning in const functions.
- mutable SourceList list_ RTC_GUARDED_BY(worker_thread_);
- mutable SourceMap map_ RTC_GUARDED_BY(worker_thread_);
- ScopedTaskSafety worker_safety_;
+ mutable SourceList list_;
+ mutable SourceMap map_;
};
} // namespace webrtc
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index c6a20ef..4372fda 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -625,11 +625,11 @@
}
void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
- source_tracker_.OnFrameDelivered(video_frame.packet_infos());
config_.renderer->OnFrame(video_frame);
- // TODO(bugs.webrtc.org/10739): we should set local capture clock offset for
- // `video_frame.packet_infos`. But VideoFrame is const qualified here.
+ // TODO: bugs.webrtc.org/42220804 - we should set local capture clock offset
+ // for `packet_infos`.
+ RtpPacketInfos packet_infos = video_frame.packet_infos();
// For frame delay metrics, calculated in `OnRenderedFrame`, to better reflect
// user experience measurements must be done as close as possible to frame
@@ -640,7 +640,7 @@
// rendered" callback from the renderer.
VideoFrameMetaData frame_meta(video_frame, env_.clock().CurrentTime());
call_->worker_thread()->PostTask(
- SafeTask(task_safety_.flag(), [frame_meta, this]() {
+ SafeTask(task_safety_.flag(), [frame_meta, packet_infos, this]() {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
int64_t video_playout_ntp_ms;
int64_t sync_offset_ms;
@@ -652,6 +652,8 @@
estimated_freq_khz);
}
stats_proxy_.OnRenderedFrame(frame_meta);
+ source_tracker_.OnFrameDelivered(packet_infos,
+ frame_meta.decode_timestamp);
}));
webrtc::MutexLock lock(&pending_resolution_mutex_);
@@ -1047,6 +1049,7 @@
}
std::vector<webrtc::RtpSource> VideoReceiveStream2::GetSources() const {
+ RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
return source_tracker_.GetSources();
}
diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h
index f161f63..2c7eb4c 100644
--- a/video/video_receive_stream2.h
+++ b/video/video_receive_stream2.h
@@ -264,7 +264,7 @@
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
bool decoder_stopped_ RTC_GUARDED_BY(decode_sequence_checker_) = true;
- SourceTracker source_tracker_;
+ SourceTracker source_tracker_ RTC_GUARDED_BY(worker_sequence_checker_);
ReceiveStatisticsProxy stats_proxy_;
// Shared by media and rtx stream receivers, since the latter has no RtpRtcp
// module of its own.