Use SingleThreadedTaskQueue in DirectTransport
DirectTransport has so far used its own thread, which led to a different threading-model for in the unit-tests than is used in actual WebRTC. Because of that, some critical-sections that weren't truly necessary in WebRTC could not be replaced with thread-checks, because those checks failed in unit-tests.
This CL introduces SingleThreadedTaskQueue - a TaskQueue which guarantees to run all of its tasks on the same thread (rtc::TaskQueue doesn't guarantee that on Mac) - and uses that for DirectTransport. CLs based on top of this will uncomment thread-checks which had to be commented out before, and remove unnecessary critical-sections.
Future work would probably replace the thread-checkers by more sophisticated serialized-access checks, allowing us to move from the SingleThreadedTaskQueue to a normal TaskQueue.
Related implementation notes:
* This CL has made DirectTransport::StopSending() superfluous, and so it was deleted.
BUG=webrtc:8113, webrtc:7405, webrtc:8056, webrtc:8116
Review-Url: https://codereview.webrtc.org/2998923002
Cr-Commit-Position: refs/heads/master@{#19445}
diff --git a/webrtc/call/bitrate_estimator_tests.cc b/webrtc/call/bitrate_estimator_tests.cc
index de12ef7..5d4acbb 100644
--- a/webrtc/call/bitrate_estimator_tests.cc
+++ b/webrtc/call/bitrate_estimator_tests.cc
@@ -102,51 +102,55 @@
virtual ~BitrateEstimatorTest() { EXPECT_TRUE(streams_.empty()); }
virtual void SetUp() {
- Call::Config config(event_log_.get());
- receiver_call_.reset(Call::Create(config));
- sender_call_.reset(Call::Create(config));
+ task_queue_.SendTask([this]() {
+ Call::Config config(event_log_.get());
+ receiver_call_.reset(Call::Create(config));
+ sender_call_.reset(Call::Create(config));
- send_transport_.reset(
- new test::DirectTransport(sender_call_.get(), payload_type_map_));
- send_transport_->SetReceiver(receiver_call_->Receiver());
- receive_transport_.reset(
- new test::DirectTransport(receiver_call_.get(), payload_type_map_));
- receive_transport_->SetReceiver(sender_call_->Receiver());
+ send_transport_.reset(new test::DirectTransport(
+ &task_queue_, sender_call_.get(), payload_type_map_));
+ send_transport_->SetReceiver(receiver_call_->Receiver());
+ receive_transport_.reset(new test::DirectTransport(
+ &task_queue_, receiver_call_.get(), payload_type_map_));
+ receive_transport_->SetReceiver(sender_call_->Receiver());
- video_send_config_ = VideoSendStream::Config(send_transport_.get());
- video_send_config_.rtp.ssrcs.push_back(kVideoSendSsrcs[0]);
- // Encoders will be set separately per stream.
- video_send_config_.encoder_settings.encoder = nullptr;
- video_send_config_.encoder_settings.payload_name = "FAKE";
- video_send_config_.encoder_settings.payload_type =
- kFakeVideoSendPayloadType;
- test::FillEncoderConfiguration(1, &video_encoder_config_);
+ video_send_config_ = VideoSendStream::Config(send_transport_.get());
+ video_send_config_.rtp.ssrcs.push_back(kVideoSendSsrcs[0]);
+ // Encoders will be set separately per stream.
+ video_send_config_.encoder_settings.encoder = nullptr;
+ video_send_config_.encoder_settings.payload_name = "FAKE";
+ video_send_config_.encoder_settings.payload_type =
+ kFakeVideoSendPayloadType;
+ test::FillEncoderConfiguration(1, &video_encoder_config_);
- receive_config_ = VideoReceiveStream::Config(receive_transport_.get());
- // receive_config_.decoders will be set by every stream separately.
- receive_config_.rtp.remote_ssrc = video_send_config_.rtp.ssrcs[0];
- receive_config_.rtp.local_ssrc = kReceiverLocalVideoSsrc;
- receive_config_.rtp.remb = true;
- receive_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receive_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ receive_config_ = VideoReceiveStream::Config(receive_transport_.get());
+ // receive_config_.decoders will be set by every stream separately.
+ receive_config_.rtp.remote_ssrc = video_send_config_.rtp.ssrcs[0];
+ receive_config_.rtp.local_ssrc = kReceiverLocalVideoSsrc;
+ receive_config_.rtp.remb = true;
+ receive_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receive_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ });
}
virtual void TearDown() {
- std::for_each(streams_.begin(), streams_.end(),
- std::mem_fun(&Stream::StopSending));
+ task_queue_.SendTask([this]() {
+ std::for_each(streams_.begin(), streams_.end(),
+ std::mem_fun(&Stream::StopSending));
- send_transport_->StopSending();
- receive_transport_->StopSending();
+ while (!streams_.empty()) {
+ delete streams_.back();
+ streams_.pop_back();
+ }
- while (!streams_.empty()) {
- delete streams_.back();
- streams_.pop_back();
- }
+ send_transport_.reset();
+ receive_transport_.reset();
- receiver_call_.reset();
- sender_call_.reset();
+ receiver_call_.reset();
+ sender_call_.reset();
+ });
}
protected:
@@ -241,66 +245,80 @@
"RemoteBitrateEstimatorSingleStream: Instantiating.";
TEST_F(BitrateEstimatorTest, InstantiatesTOFPerDefaultForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
TEST_F(BitrateEstimatorTest, ImmediatelySwitchToASTForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
TEST_F(BitrateEstimatorTest, SwitchesToASTForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
// This test is flaky. See webrtc:5790.
TEST_F(BitrateEstimatorTest, DISABLED_SwitchesToASTThenBackToTOFForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine(
- "WrappingBitrateEstimator: Switching to transmission time offset RBE.");
- streams_.push_back(new Stream(this));
- streams_[0]->StopSending();
- streams_[1]->StopSending();
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine(
+ "WrappingBitrateEstimator: Switching to transmission time offset RBE.");
+ streams_.push_back(new Stream(this));
+ streams_[0]->StopSending();
+ streams_[1]->StopSending();
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
} // namespace webrtc