Migrate pc/ to absl::AnyInvocable based TaskQueueBase interface
Bug: webrtc:14245
Change-Id: I9043aa507421a93f0d7ba7406e237f727999b696
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268121
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37478}
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 632a072..866f213 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -78,7 +78,6 @@
"../api:sequence_checker",
"../api/crypto:options",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/units:timestamp",
"../call:rtp_interfaces",
"../call:rtp_receiver",
@@ -798,7 +797,6 @@
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/transport:bitrate_settings",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
@@ -868,7 +866,6 @@
"../api:priority",
"../api:rtc_error",
"../api:scoped_refptr",
- "../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../media:rtc_data_sctp_transport_internal",
"../media:rtc_media_base",
@@ -883,7 +880,10 @@
"../rtc_base/system:unused",
"../rtc_base/third_party/sigslot:sigslot",
]
- absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/cleanup",
+ "//third_party/abseil-cpp/absl/types:optional",
+ ]
}
rtc_library("data_channel_utils") {
@@ -916,7 +916,6 @@
"../api:scoped_refptr",
"../api:sequence_checker",
"../api/neteq:neteq_api",
- "../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
"../api/transport:sctp_transport_factory_interface",
"../media:rtc_data_sctp_transport_factory",
@@ -948,7 +947,6 @@
"../api:rtc_error",
"../api:scoped_refptr",
"../api:sequence_checker",
- "../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../media:rtc_media_base",
"../rtc_base",
@@ -1053,6 +1051,7 @@
"../rtc_base/third_party/sigslot:sigslot",
]
absl_deps = [
+ "//third_party/abseil-cpp/absl/functional:bind_front",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
@@ -1213,7 +1212,6 @@
"../api/crypto:options",
"../api/rtc_event_log",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/transport:bitrate_settings",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
@@ -1577,7 +1575,6 @@
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/video:video_bitrate_allocator_factory",
"../media:rtc_media_base",
"../rtc_base:checks",
@@ -1709,7 +1706,6 @@
"../api:sequence_checker",
"../api/crypto:frame_decryptor_interface",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/transport/rtp:rtp_source",
"../media:rtc_media_base",
"../rtc_base",
@@ -1950,15 +1946,14 @@
"../api:libjingle_peerconnection_api",
"../api:scoped_refptr",
"../api:sequence_checker",
+ "../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
+ "../api/units:time_delta",
"../rtc_base:checks",
"../rtc_base:location",
"../rtc_base:logging",
"../rtc_base:macromagic",
"../rtc_base:refcount",
- "../rtc_base:rtc_base",
- "../rtc_base:threading",
"../rtc_base/third_party/sigslot",
]
absl_deps = [
@@ -2098,7 +2093,6 @@
"../api:sequence_checker",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue",
- "../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
"../api/video:builtin_video_bitrate_allocator_factory",
@@ -2135,7 +2129,10 @@
"../test:scoped_key_value_config",
"../test:test_main",
"../test:test_support",
+ ]
+ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
@@ -2566,7 +2563,6 @@
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
"../api/transport/rtp:rtp_source",
"../api/units:time_delta",
diff --git a/pc/audio_rtp_receiver.cc b/pc/audio_rtp_receiver.cc
index be2d81e..6ed163a 100644
--- a/pc/audio_rtp_receiver.cc
+++ b/pc/audio_rtp_receiver.cc
@@ -17,7 +17,6 @@
#include <vector>
#include "api/sequence_checker.h"
-#include "api/task_queue/to_queued_task.h"
#include "pc/audio_track.h"
#include "pc/media_stream_track_proxy.h"
#include "rtc_base/checks.h"
@@ -78,11 +77,10 @@
if (cached_track_enabled_ == enabled)
return;
cached_track_enabled_ = enabled;
- worker_thread_->PostTask(
- ToQueuedTask(worker_thread_safety_, [this, enabled]() {
- RTC_DCHECK_RUN_ON(worker_thread_);
- Reconfigure(enabled);
- }));
+ worker_thread_->PostTask(SafeTask(worker_thread_safety_, [this, enabled]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ Reconfigure(enabled);
+ }));
}
// RTC_RUN_ON(worker_thread_)
diff --git a/pc/channel.cc b/pc/channel.cc
index 01cd432..0777314 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -20,7 +20,6 @@
#include "api/rtp_parameters.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/to_queued_task.h"
#include "api/units/timestamp.h"
#include "media/base/codec.h"
#include "media/base/rid_description.h"
@@ -43,7 +42,6 @@
using ::rtc::UniqueRandomIdGenerator;
using ::webrtc::PendingTaskSafetyFlag;
using ::webrtc::SdpType;
-using ::webrtc::ToQueuedTask;
// Finds a stream based on target's Primary SSRC or RIDs.
// This struct is used in BaseChannel::UpdateLocalStreams_w.
@@ -197,7 +195,7 @@
if (rtp_transport_) {
DisconnectFromRtpTransport_n();
// Clear the cached header extensions on the worker.
- worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
+ worker_thread_->PostTask(SafeTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
rtp_header_extensions_.clear();
}));
@@ -237,7 +235,7 @@
enabled_s_ = enable;
- worker_thread_->PostTask(ToQueuedTask(alive_, [this, enable] {
+ worker_thread_->PostTask(SafeTask(alive_, [this, enable] {
RTC_DCHECK_RUN_ON(worker_thread());
// Sanity check to make sure that enabled_ and enabled_s_
// stay in sync.
@@ -552,7 +550,7 @@
// We only have to do this PostTask once, when first transitioning to
// writable.
if (!was_ever_writable_n_) {
- worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
+ worker_thread_->PostTask(SafeTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
was_ever_writable_ = true;
UpdateMediaSendRecvState_w();
diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc
index 4e718d3..40beff8 100644
--- a/pc/channel_unittest.cc
+++ b/pc/channel_unittest.cc
@@ -16,11 +16,11 @@
#include <string>
#include <type_traits>
+#include "absl/functional/any_invocable.h"
#include "api/array_view.h"
#include "api/audio_options.h"
#include "api/rtp_parameters.h"
#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/to_queued_task.h"
#include "media/base/codec.h"
#include "media/base/fake_media_engine.h"
#include "media/base/fake_rtp.h"
@@ -419,7 +419,7 @@
}
void SendRtp(typename T::MediaChannel* media_channel, rtc::Buffer data) {
- network_thread_->PostTask(webrtc::ToQueuedTask(
+ network_thread_->PostTask(webrtc::SafeTask(
network_thread_safety_, [media_channel, data = std::move(data)]() {
media_channel->SendRtp(data.data(), data.size(),
rtc::PacketOptions());
@@ -503,11 +503,10 @@
// destroyed before this object goes out of scope.
class ScopedCallThread {
public:
- template <class FunctorT>
- explicit ScopedCallThread(FunctorT&& functor)
+ explicit ScopedCallThread(absl::AnyInvocable<void() &&> functor)
: thread_(rtc::Thread::Create()) {
thread_->Start();
- thread_->PostTask(std::forward<FunctorT>(functor));
+ thread_->PostTask(std::move(functor));
}
~ScopedCallThread() { thread_->Stop(); }
diff --git a/pc/connection_context.cc b/pc/connection_context.cc
index de0597b..13c598a 100644
--- a/pc/connection_context.cc
+++ b/pc/connection_context.cc
@@ -14,7 +14,6 @@
#include <utility>
#include <vector>
-#include "api/task_queue/to_queued_task.h"
#include "api/transport/field_trial_based_config.h"
#include "media/base/media_engine.h"
#include "media/sctp/sctp_transport_factory.h"
@@ -120,7 +119,7 @@
// network_thread_->IsCurrent() == true means signaling_thread_ is
// network_thread_. In this case, no further action is required as
// signaling_thread_ can already invoke network_thread_.
- network_thread_->PostTask(ToQueuedTask(
+ network_thread_->PostTask(
[thread = network_thread_, worker_thread = worker_thread_.get()] {
thread->DisallowBlockingCalls();
thread->DisallowAllInvokes();
@@ -128,7 +127,7 @@
// In this case, worker_thread_ == network_thread_
thread->AllowInvokesToThread(thread);
}
- }));
+ });
}
rtc::InitRandom(rtc::Time32());
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index b1461cd..b655b530 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -14,7 +14,6 @@
#include "api/peer_connection_interface.h"
#include "api/rtc_error.h"
-#include "api/task_queue/to_queued_task.h"
#include "pc/peer_connection_internal.h"
#include "pc/sctp_utils.h"
#include "rtc_base/location.h"
@@ -114,7 +113,7 @@
params.sid = channel_id;
params.type = type;
signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] {
+ [self = weak_factory_.GetWeakPtr(), params, buffer] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be
@@ -129,53 +128,49 @@
self->SignalDataChannelTransportReceivedData_s(params, buffer);
}
}
- }));
+ });
}
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- self->SignalDataChannelTransportChannelClosing_s(channel_id);
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->SignalDataChannelTransportChannelClosing_s(channel_id);
+ }
+ });
}
void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- self->SignalDataChannelTransportChannelClosed_s(channel_id);
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->SignalDataChannelTransportChannelClosed_s(channel_id);
+ }
+ });
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- self->data_channel_transport_ready_to_send_ = true;
- self->SignalDataChannelTransportWritable_s(
- self->data_channel_transport_ready_to_send_);
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->data_channel_transport_ready_to_send_ = true;
+ self->SignalDataChannelTransportWritable_s(
+ self->data_channel_transport_ready_to_send_);
+ }
+ });
}
void DataChannelController::OnTransportClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr(), error] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- self->OnTransportChannelClosed(error);
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->OnTransportChannelClosed(error);
+ }
+ });
}
void DataChannelController::SetupDataChannelTransport_n() {
@@ -345,13 +340,12 @@
// we can't free it directly here; we need to free it asynchronously.
sctp_data_channels_to_free_.push_back(*it);
sctp_data_channels_.erase(it);
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- self->sctp_data_channels_to_free_.clear();
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ self->sctp_data_channels_to_free_.clear();
+ }
+ });
return;
}
}
@@ -413,15 +407,14 @@
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
- if (self) {
- RTC_DCHECK_RUN_ON(self->signaling_thread());
- for (const auto& channel : self->sctp_data_channels_) {
- channel->OnTransportChannelCreated();
- }
- }
- }));
+ signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
+ if (self) {
+ RTC_DCHECK_RUN_ON(self->signaling_thread());
+ for (const auto& channel : self->sctp_data_channels_) {
+ channel->OnTransportChannelCreated();
+ }
+ }
+ });
}
rtc::Thread* DataChannelController::network_thread() const {
diff --git a/pc/dtmf_sender.cc b/pc/dtmf_sender.cc
index 40d9e38..91d642c 100644
--- a/pc/dtmf_sender.cc
+++ b/pc/dtmf_sender.cc
@@ -13,10 +13,11 @@
#include <ctype.h>
#include <string.h>
-#include "api/task_queue/to_queued_task.h"
+#include "api/task_queue/pending_task_safety_flag.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
-#include "rtc_base/thread.h"
namespace webrtc {
@@ -57,7 +58,7 @@
}
rtc::scoped_refptr<DtmfSender> DtmfSender::Create(
- rtc::Thread* signaling_thread,
+ TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider) {
if (!signaling_thread) {
return nullptr;
@@ -65,7 +66,7 @@
return rtc::make_ref_counted<DtmfSender>(signaling_thread, provider);
}
-DtmfSender::DtmfSender(rtc::Thread* signaling_thread,
+DtmfSender::DtmfSender(TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider)
: observer_(nullptr),
signaling_thread_(signaling_thread),
@@ -165,12 +166,12 @@
void DtmfSender::QueueInsertDtmf(const rtc::Location& posted_from,
uint32_t delay_ms) {
signaling_thread_->PostDelayedHighPrecisionTask(
- ToQueuedTask(safety_flag_,
- [this] {
- RTC_DCHECK_RUN_ON(signaling_thread_);
- DoInsertDtmf();
- }),
- delay_ms);
+ SafeTask(safety_flag_,
+ [this] {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ DoInsertDtmf();
+ }),
+ TimeDelta::Millis(delay_ms));
}
void DtmfSender::DoInsertDtmf() {
diff --git a/pc/dtmf_sender.h b/pc/dtmf_sender.h
index 06cd3a2..eb3bf5f 100644
--- a/pc/dtmf_sender.h
+++ b/pc/dtmf_sender.h
@@ -19,11 +19,11 @@
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
+#include "api/task_queue/task_queue_base.h"
#include "pc/proxy.h"
#include "rtc_base/location.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
-#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
// DtmfSender is the native implementation of the RTCDTMFSender defined by
@@ -53,7 +53,7 @@
class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> {
public:
- static rtc::scoped_refptr<DtmfSender> Create(rtc::Thread* signaling_thread,
+ static rtc::scoped_refptr<DtmfSender> Create(TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider);
// Implements DtmfSenderInterface.
@@ -70,7 +70,7 @@
int comma_delay() const override;
protected:
- DtmfSender(rtc::Thread* signaling_thread, DtmfProviderInterface* provider);
+ DtmfSender(TaskQueueBase* signaling_thread, DtmfProviderInterface* provider);
virtual ~DtmfSender();
DtmfSender(const DtmfSender&) = delete;
@@ -90,7 +90,7 @@
void StopSending() RTC_RUN_ON(signaling_thread_);
DtmfSenderObserverInterface* observer_ RTC_GUARDED_BY(signaling_thread_);
- rtc::Thread* signaling_thread_;
+ TaskQueueBase* const signaling_thread_;
DtmfProviderInterface* provider_ RTC_GUARDED_BY(signaling_thread_);
std::string tones_ RTC_GUARDED_BY(signaling_thread_);
int duration_ RTC_GUARDED_BY(signaling_thread_);
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 9251ba9..f2727a4 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -25,7 +25,6 @@
#include "api/jsep_ice_candidate.h"
#include "api/rtp_parameters.h"
#include "api/rtp_transceiver_direction.h"
-#include "api/task_queue/to_queued_task.h"
#include "api/uma_metrics.h"
#include "api/video/video_codec_constants.h"
#include "call/audio_state.h"
@@ -726,7 +725,7 @@
ReportTransportStats();
}
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
+ SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
}));
@@ -735,7 +734,7 @@
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
+ SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
}));
@@ -744,7 +743,7 @@
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
+ SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
}));
@@ -753,7 +752,7 @@
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
+ SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
}));
@@ -763,17 +762,17 @@
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(),
- [this, t = transport, c = candidates]() {
- RTC_DCHECK_RUN_ON(signaling_thread());
- OnTransportControllerCandidatesGathered(t, c);
- }));
+ SafeTask(signaling_thread_safety_.flag(),
+ [this, t = transport, c = candidates]() {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ OnTransportControllerCandidatesGathered(t, c);
+ }));
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(ToQueuedTask(
- signaling_thread_safety_.flag(), [this, event = event]() {
+ signaling_thread()->PostTask(
+ SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
}));
@@ -782,7 +781,7 @@
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() {
+ SafeTask(signaling_thread_safety_.flag(), [this, c = c]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
}));
@@ -790,8 +789,8 @@
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(ToQueuedTask(
- signaling_thread_safety_.flag(), [this, event = event]() {
+ signaling_thread()->PostTask(
+ SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
}));
@@ -2497,11 +2496,11 @@
transport_controller_->GetDtlsTransport(mid);
if (dtls_transport) {
signaling_thread()->PostTask(
- ToQueuedTask(signaling_thread_safety_.flag(),
- [this, name = dtls_transport->transport_name()] {
- RTC_DCHECK_RUN_ON(signaling_thread());
- sctp_transport_name_s_ = std::move(name);
- }));
+ SafeTask(signaling_thread_safety_.flag(),
+ [this, name = dtls_transport->transport_name()] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ sctp_transport_name_s_ = std::move(name);
+ }));
}
// Note: setting the data sink and checking initial state must be done last,
@@ -2662,14 +2661,14 @@
const cricket::Candidate& candidate) {
RTC_DCHECK_RUN_ON(signaling_thread());
- network_thread()->PostTask(ToQueuedTask(
+ network_thread()->PostTask(SafeTask(
network_thread_safety_, [this, mid = mid, candidate = candidate] {
RTC_DCHECK_RUN_ON(network_thread());
std::vector<cricket::Candidate> candidates = {candidate};
RTCError error =
transport_controller_->AddRemoteCandidates(mid, candidates);
if (error.ok()) {
- signaling_thread()->PostTask(ToQueuedTask(
+ signaling_thread()->PostTask(SafeTask(
signaling_thread_safety_.flag(),
[this, candidate = std::move(candidate)] {
ReportRemoteIceCandidateAdded(candidate);
@@ -2916,7 +2915,7 @@
if (mid == sctp_mid_n_) {
data_channel_controller_.OnTransportChanged(data_channel_transport);
if (dtls_transport) {
- signaling_thread()->PostTask(ToQueuedTask(
+ signaling_thread()->PostTask(SafeTask(
signaling_thread_safety_.flag(),
[this,
name = std::string(dtls_transport->internal()->transport_name())] {
@@ -2942,7 +2941,7 @@
if (!sctp_mid_s_)
return;
- network_thread()->PostTask(ToQueuedTask(
+ network_thread()->PostTask(SafeTask(
network_thread_safety_,
[this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] {
rtc::scoped_refptr<SctpTransport> sctp_transport =
diff --git a/pc/proxy.h b/pc/proxy.h
index 2f3865d..89a7f51 100644
--- a/pc/proxy.h
+++ b/pc/proxy.h
@@ -123,7 +123,7 @@
};
template <typename C, typename R, typename... Args>
-class MethodCall : public QueuedTask {
+class MethodCall {
public:
typedef R (C::*Method)(Args...);
MethodCall(C* c, Method m, Args&&... args)
@@ -135,19 +135,16 @@
if (t->IsCurrent()) {
Invoke(std::index_sequence_for<Args...>());
} else {
- t->PostTask(std::unique_ptr<QueuedTask>(this));
+ t->PostTask([this] {
+ Invoke(std::index_sequence_for<Args...>());
+ event_.Set();
+ });
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}
private:
- bool Run() override {
- Invoke(std::index_sequence_for<Args...>());
- event_.Set();
- return false;
- }
-
template <size_t... Is>
void Invoke(std::index_sequence<Is...>) {
r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
@@ -161,7 +158,7 @@
};
template <typename C, typename R, typename... Args>
-class ConstMethodCall : public QueuedTask {
+class ConstMethodCall {
public:
typedef R (C::*Method)(Args...) const;
ConstMethodCall(const C* c, Method m, Args&&... args)
@@ -173,19 +170,16 @@
if (t->IsCurrent()) {
Invoke(std::index_sequence_for<Args...>());
} else {
- t->PostTask(std::unique_ptr<QueuedTask>(this));
+ t->PostTask([this] {
+ Invoke(std::index_sequence_for<Args...>());
+ event_.Set();
+ });
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}
private:
- bool Run() override {
- Invoke(std::index_sequence_for<Args...>());
- event_.Set();
- return false;
- }
-
template <size_t... Is>
void Invoke(std::index_sequence<Is...>) {
r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc
index f54198d..478f714 100644
--- a/pc/rtc_stats_collector.cc
+++ b/pc/rtc_stats_collector.cc
@@ -21,6 +21,7 @@
#include <utility>
#include <vector>
+#include "absl/functional/bind_front.h"
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "api/candidate.h"
@@ -1304,33 +1305,10 @@
// We have a fresh cached report to deliver. Deliver asynchronously, since
// the caller may not be expecting a synchronous callback, and it avoids
// reentrancy problems.
- std::vector<RequestInfo> requests;
- requests.swap(requests_);
-
- // Task subclass to take ownership of the requests.
- // TODO(nisse): Delete when we can use C++14, and do lambda capture with
- // std::move.
- class DeliveryTask : public QueuedTask {
- public:
- DeliveryTask(rtc::scoped_refptr<RTCStatsCollector> collector,
- rtc::scoped_refptr<const RTCStatsReport> cached_report,
- std::vector<RequestInfo> requests)
- : collector_(collector),
- cached_report_(cached_report),
- requests_(std::move(requests)) {}
- bool Run() override {
- collector_->DeliverCachedReport(cached_report_, std::move(requests_));
- return true;
- }
-
- private:
- rtc::scoped_refptr<RTCStatsCollector> collector_;
- rtc::scoped_refptr<const RTCStatsReport> cached_report_;
- std::vector<RequestInfo> requests_;
- };
- signaling_thread_->PostTask(std::make_unique<DeliveryTask>(
- rtc::scoped_refptr<RTCStatsCollector>(this), cached_report_,
- std::move(requests)));
+ signaling_thread_->PostTask(
+ absl::bind_front(&RTCStatsCollector::DeliverCachedReport,
+ rtc::scoped_refptr<RTCStatsCollector>(this),
+ cached_report_, std::move(requests_)));
} else if (!num_pending_partial_reports_) {
// Only start gathering stats if we're not already gathering stats. In the
// case of already gathering stats, `callback_` will be invoked when there
diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc
index 387d344..44a96d4 100644
--- a/pc/rtp_transceiver.cc
+++ b/pc/rtp_transceiver.cc
@@ -21,7 +21,6 @@
#include "api/peer_connection_interface.h"
#include "api/rtp_parameters.h"
#include "api/sequence_checker.h"
-#include "api/task_queue/to_queued_task.h"
#include "media/base/codec.h"
#include "media/base/media_constants.h"
#include "media/base/media_engine.h"
@@ -287,8 +286,8 @@
channel_->SetRtpTransport(transport_lookup(channel_->mid()));
channel_->SetFirstPacketReceivedCallback(
[thread = thread_, flag = signaling_thread_safety_, this]() mutable {
- thread->PostTask(ToQueuedTask(std::move(flag),
- [this]() { OnFirstPacketReceived(); }));
+ thread->PostTask(
+ SafeTask(std::move(flag), [this]() { OnFirstPacketReceived(); }));
});
});
PushNewMediaChannelAndDeleteChannel(nullptr);
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 08882c9..a5e0d76 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -15,7 +15,7 @@
#include <string>
#include <utility>
-#include "api/task_queue/to_queued_task.h"
+#include "absl/cleanup/cleanup.h"
#include "media/sctp/sctp_transport_internal.h"
#include "pc/proxy.h"
#include "pc/sctp_utils.h"
@@ -221,13 +221,12 @@
RTC_DCHECK(!controller_detached_);
if (controller_->ReadyToSendData()) {
AddRef();
- rtc::Thread::Current()->PostTask(ToQueuedTask(
- [this] {
- RTC_DCHECK_RUN_ON(signaling_thread_);
- if (state_ != kClosed)
- OnTransportReady(true);
- },
- [this] { Release(); }));
+ absl::Cleanup release = [this] { Release(); };
+ rtc::Thread::Current()->PostTask([this, release = std::move(release)] {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ if (state_ != kClosed)
+ OnTransportReady(true);
+ });
}
return true;
diff --git a/pc/test/integration_test_helpers.cc b/pc/test/integration_test_helpers.cc
index 3f07f36..9bfd9fd 100644
--- a/pc/test/integration_test_helpers.cc
+++ b/pc/test/integration_test_helpers.cc
@@ -64,8 +64,7 @@
tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
MutexLock lock(&mutex_);
for (auto* listener : listeners_) {
- listener->OnTickTaskQueue()->PostTask(
- ToQueuedTask([listener] { listener->OnTick(); }));
+ listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); });
}
return tick_period_;
});
diff --git a/pc/test/integration_test_helpers.h b/pc/test/integration_test_helpers.h
index 3b53ece..f68b96e 100644
--- a/pc/test/integration_test_helpers.h
+++ b/pc/test/integration_test_helpers.h
@@ -54,9 +54,9 @@
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
-#include "api/task_queue/to_queued_task.h"
#include "api/transport/field_trial_based_config.h"
#include "api/uma_metrics.h"
+#include "api/units/time_delta.h"
#include "api/video/video_rotation.h"
#include "api/video_codecs/sdp_video_format.h"
#include "api/video_codecs/video_decoder_factory.h"
@@ -1007,11 +1007,11 @@
RelaySdpMessageIfReceiverExists(type, msg);
} else {
rtc::Thread::Current()->PostDelayedTask(
- ToQueuedTask(task_safety_.flag(),
- [this, type, msg] {
- RelaySdpMessageIfReceiverExists(type, msg);
- }),
- signaling_delay_ms_);
+ SafeTask(task_safety_.flag(),
+ [this, type, msg] {
+ RelaySdpMessageIfReceiverExists(type, msg);
+ }),
+ TimeDelta::Millis(signaling_delay_ms_));
}
}
@@ -1030,12 +1030,12 @@
RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index, msg);
} else {
rtc::Thread::Current()->PostDelayedTask(
- ToQueuedTask(task_safety_.flag(),
- [this, sdp_mid, sdp_mline_index, msg] {
- RelayIceMessageIfReceiverExists(sdp_mid,
- sdp_mline_index, msg);
- }),
- signaling_delay_ms_);
+ SafeTask(task_safety_.flag(),
+ [this, sdp_mid, sdp_mline_index, msg] {
+ RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index,
+ msg);
+ }),
+ TimeDelta::Millis(signaling_delay_ms_));
}
}