blob: 104e320398f571bb5f91041aee557125862ffff8 [file] [log] [blame]
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include <queue>
#include <string>
#include "media/sctp/sctp_transport_internal.h"
#include "media/sctp/usrsctp_transport.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/event.h"
#include "rtc_base/gunit.h"
#include "rtc_base/logging.h"
#include "rtc_base/random.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
namespace {
static constexpr int kDefaultTimeout = 10000; // 10 seconds.
static constexpr int kTransport1Port = 15001;
static constexpr int kTransport2Port = 25002;
static constexpr int kLogPerMessagesCount = 100;
/**
* An simple packet transport implementation which can be
* configured to simulate uniform random packet loss and
* configurable random packet delay and reordering.
*/
class SimulatedPacketTransport final : public rtc::PacketTransportInternal {
public:
SimulatedPacketTransport(std::string name,
rtc::Thread* transport_thread,
uint8_t packet_loss_percents,
uint16_t avg_send_delay_millis)
: transport_name_(name),
transport_thread_(transport_thread),
packet_loss_percents_(packet_loss_percents),
avg_send_delay_millis_(avg_send_delay_millis),
random_(42) {
RTC_DCHECK(transport_thread_);
RTC_DCHECK_LE(packet_loss_percents_, 100);
RTC_DCHECK_RUN_ON(transport_thread_);
}
~SimulatedPacketTransport() override {
RTC_DCHECK_RUN_ON(transport_thread_);
destination_ = nullptr;
SignalWritableState(this);
}
const std::string& transport_name() const override { return transport_name_; }
bool writable() const override { return destination_ != nullptr; }
bool receiving() const override { return true; }
int SendPacket(const char* data,
size_t len,
const rtc::PacketOptions& options,
int flags = 0) {
RTC_DCHECK_RUN_ON(transport_thread_);
auto destination = destination_.load();
if (destination == nullptr) {
return -1;
}
if (random_.Rand(100) < packet_loss_percents_) {
// silent packet loss
return 0;
}
rtc::CopyOnWriteBuffer buffer(data, len);
auto send_task = ToQueuedTask(
destination->task_safety_.flag(),
[destination, flags, buffer = std::move(buffer)] {
destination->SignalReadPacket(
destination, reinterpret_cast<const char*>(buffer.data()),
buffer.size(), rtc::Time(), flags);
});
// Introduce random send delay in range [0 .. 2 * avg_send_delay_millis_]
// millis, which will also work as random packet reordering mechanism.
uint16_t actual_send_delay = avg_send_delay_millis_;
int16_t reorder_delay =
avg_send_delay_millis_ *
std::min(1.0, std::max(-1.0, random_.Gaussian(0, 0.5)));
actual_send_delay += reorder_delay;
if (actual_send_delay > 0) {
destination->transport_thread_->PostDelayedTask(std::move(send_task),
actual_send_delay);
} else {
destination->transport_thread_->PostTask(std::move(send_task));
}
return 0;
}
int SetOption(rtc::Socket::Option opt, int value) override { return 0; }
bool GetOption(rtc::Socket::Option opt, int* value) override { return false; }
int GetError() override { return 0; }
absl::optional<rtc::NetworkRoute> network_route() const override {
return absl::nullopt;
}
void SetDestination(SimulatedPacketTransport* destination) {
RTC_DCHECK_RUN_ON(transport_thread_);
if (destination == this) {
return;
}
destination_ = destination;
SignalWritableState(this);
}
private:
const std::string transport_name_;
rtc::Thread* const transport_thread_;
const uint8_t packet_loss_percents_;
const uint16_t avg_send_delay_millis_;
std::atomic<SimulatedPacketTransport*> destination_ ATOMIC_VAR_INIT(nullptr);
webrtc::Random random_;
webrtc::ScopedTaskSafety task_safety_;
RTC_DISALLOW_COPY_AND_ASSIGN(SimulatedPacketTransport);
};
/**
* A helper class to send specified number of messages over UsrsctpTransport
* with SCTP reliability settings provided by user. The reliability settings are
* specified by passing a template instance of SendDataParams. The sid will be
* assigned by sender itself and will be assigned from range
* [cricket::kMinSctpSid; cricket::kMaxSctpSid]. The wide range of sids are used
* to possibly trigger more execution paths inside usrsctp.
*/
class SctpDataSender final {
public:
SctpDataSender(rtc::Thread* thread,
cricket::UsrsctpTransport* transport,
uint64_t target_messages_count,
webrtc::SendDataParams send_params,
uint32_t sender_id)
: thread_(thread),
transport_(transport),
target_messages_count_(target_messages_count),
send_params_(send_params),
sender_id_(sender_id) {
RTC_DCHECK(thread_);
RTC_DCHECK(transport_);
}
void Start() {
thread_->PostTask(ToQueuedTask(task_safety_.flag(), [this] {
if (started_) {
RTC_LOG(LS_INFO) << sender_id_ << " sender is already started";
return;
}
started_ = true;
SendNextMessage();
}));
}
uint64_t BytesSentCount() const { return num_bytes_sent_; }
uint64_t MessagesSentCount() const { return num_messages_sent_; }
absl::optional<std::string> GetLastError() {
absl::optional<std::string> result = absl::nullopt;
thread_->Invoke<void>(RTC_FROM_HERE,
[this, &result] { result = last_error_; });
return result;
}
bool WaitForCompletion(int give_up_after_ms) {
return sent_target_messages_count_.Wait(give_up_after_ms, kDefaultTimeout);
}
private:
void SendNextMessage() {
RTC_DCHECK_RUN_ON(thread_);
if (!started_ || num_messages_sent_ >= target_messages_count_) {
sent_target_messages_count_.Set();
return;
}
if (num_messages_sent_ % kLogPerMessagesCount == 0) {
RTC_LOG(LS_INFO) << sender_id_ << " sender will try send message "
<< (num_messages_sent_ + 1) << " out of "
<< target_messages_count_;
}
webrtc::SendDataParams params(send_params_);
int sid =
cricket::kMinSctpSid + (num_messages_sent_ % cricket::kMaxSctpStreams);
cricket::SendDataResult result;
transport_->SendData(sid, params, payload_, &result);
switch (result) {
case cricket::SDR_BLOCK:
// retry after timeout
thread_->PostDelayedTask(
ToQueuedTask(task_safety_.flag(), [this] { SendNextMessage(); }),
500);
break;
case cricket::SDR_SUCCESS:
// send next
num_bytes_sent_ += payload_.size();
++num_messages_sent_;
thread_->PostTask(
ToQueuedTask(task_safety_.flag(), [this] { SendNextMessage(); }));
break;
case cricket::SDR_ERROR:
// give up
last_error_ = "UsrsctpTransport::SendData error returned";
sent_target_messages_count_.Set();
break;
}
}
rtc::Thread* const thread_;
cricket::UsrsctpTransport* const transport_;
const uint64_t target_messages_count_;
const webrtc::SendDataParams send_params_;
const uint32_t sender_id_;
rtc::CopyOnWriteBuffer payload_{std::string(1400, '.').c_str(), 1400};
std::atomic<bool> started_ ATOMIC_VAR_INIT(false);
std::atomic<uint64_t> num_messages_sent_ ATOMIC_VAR_INIT(0);
rtc::Event sent_target_messages_count_{true, false};
std::atomic<uint64_t> num_bytes_sent_ ATOMIC_VAR_INIT(0);
absl::optional<std::string> last_error_;
webrtc::ScopedTaskSafetyDetached task_safety_;
RTC_DISALLOW_COPY_AND_ASSIGN(SctpDataSender);
};
/**
* A helper class which counts number of received messages
* and bytes over UsrsctpTransport. Also allow waiting until
* specified number of messages received.
*/
class SctpDataReceiver final : public sigslot::has_slots<> {
public:
explicit SctpDataReceiver(uint32_t receiver_id,
uint64_t target_messages_count)
: receiver_id_(receiver_id),
target_messages_count_(target_messages_count) {}
void OnDataReceived(const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& data) {
num_bytes_received_ += data.size();
if (++num_messages_received_ == target_messages_count_) {
received_target_messages_count_.Set();
}
if (num_messages_received_ % kLogPerMessagesCount == 0) {
RTC_LOG(INFO) << receiver_id_ << " receiver got "
<< num_messages_received_ << " messages";
}
}
uint64_t MessagesReceivedCount() const { return num_messages_received_; }
uint64_t BytesReceivedCount() const { return num_bytes_received_; }
bool WaitForMessagesReceived(int timeout_millis) {
return received_target_messages_count_.Wait(timeout_millis);
}
private:
std::atomic<uint64_t> num_messages_received_ ATOMIC_VAR_INIT(0);
std::atomic<uint64_t> num_bytes_received_ ATOMIC_VAR_INIT(0);
rtc::Event received_target_messages_count_{true, false};
const uint32_t receiver_id_;
const uint64_t target_messages_count_;
RTC_DISALLOW_COPY_AND_ASSIGN(SctpDataReceiver);
};
/**
* Simple class to manage set of threads.
*/
class ThreadPool final {
public:
explicit ThreadPool(size_t threads_count) : random_(42) {
RTC_DCHECK(threads_count > 0);
threads_.reserve(threads_count);
for (size_t i = 0; i < threads_count; i++) {
auto thread = rtc::Thread::Create();
thread->SetName("Thread #" + rtc::ToString(i + 1) + " from Pool", this);
thread->Start();
threads_.emplace_back(std::move(thread));
}
}
rtc::Thread* GetRandomThread() {
return threads_[random_.Rand(0U, threads_.size() - 1)].get();
}
private:
webrtc::Random random_;
std::vector<std::unique_ptr<rtc::Thread>> threads_;
RTC_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};
/**
* Represents single ping-pong test over UsrsctpTransport.
* User can specify target number of message for bidirectional
* send, underlying transport packets loss and average packet delay
* and SCTP delivery settings.
*/
class SctpPingPong final {
public:
SctpPingPong(uint32_t id,
uint16_t port1,
uint16_t port2,
rtc::Thread* transport_thread1,
rtc::Thread* transport_thread2,
uint32_t messages_count,
uint8_t packet_loss_percents,
uint16_t avg_send_delay_millis,
webrtc::SendDataParams send_params)
: id_(id),
port1_(port1),
port2_(port2),
transport_thread1_(transport_thread1),
transport_thread2_(transport_thread2),
messages_count_(messages_count),
packet_loss_percents_(packet_loss_percents),
avg_send_delay_millis_(avg_send_delay_millis),
send_params_(send_params) {
RTC_DCHECK(transport_thread1_ != nullptr);
RTC_DCHECK(transport_thread2_ != nullptr);
}
virtual ~SctpPingPong() {
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
data_sender1_.reset();
sctp_transport1_->SetDtlsTransport(nullptr);
packet_transport1_->SetDestination(nullptr);
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
data_sender2_.reset();
sctp_transport2_->SetDtlsTransport(nullptr);
packet_transport2_->SetDestination(nullptr);
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
sctp_transport1_.reset();
data_receiver1_.reset();
packet_transport1_.reset();
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
sctp_transport2_.reset();
data_receiver2_.reset();
packet_transport2_.reset();
});
}
bool Start() {
CreateTwoConnectedSctpTransportsWithAllStreams();
{
webrtc::MutexLock lock(&lock_);
if (!errors_list_.empty()) {
return false;
}
}
data_sender1_.reset(new SctpDataSender(transport_thread1_,
sctp_transport1_.get(),
messages_count_, send_params_, id_));
data_sender2_.reset(new SctpDataSender(transport_thread2_,
sctp_transport2_.get(),
messages_count_, send_params_, id_));
data_sender1_->Start();
data_sender2_->Start();
return true;
}
std::vector<std::string> GetErrorsList() const {
std::vector<std::string> result;
{
webrtc::MutexLock lock(&lock_);
result = errors_list_;
}
return result;
}
void WaitForCompletion(int32_t timeout_millis) {
if (data_sender1_ == nullptr) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 is not created");
return;
}
if (data_sender2_ == nullptr) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 is not created");
return;
}
if (!data_sender1_->WaitForCompletion(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 failed to complete within " +
rtc::ToString(timeout_millis) + " millis");
return;
}
auto sender1_error = data_sender1_->GetLastError();
if (sender1_error.has_value()) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 error: " + sender1_error.value());
return;
}
if (!data_sender2_->WaitForCompletion(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 failed to complete within " +
rtc::ToString(timeout_millis) + " millis");
return;
}
auto sender2_error = data_sender2_->GetLastError();
if (sender2_error.has_value()) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 error: " + sender1_error.value());
return;
}
if ((data_sender1_->MessagesSentCount() != messages_count_)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 1 sent only " +
rtc::ToString(data_sender1_->MessagesSentCount()) +
" out of " + rtc::ToString(messages_count_));
return;
}
if ((data_sender2_->MessagesSentCount() != messages_count_)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sender 2 sent only " +
rtc::ToString(data_sender2_->MessagesSentCount()) +
" out of " + rtc::ToString(messages_count_));
return;
}
if (!data_receiver1_->WaitForMessagesReceived(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", receiver 1 did not complete within " +
rtc::ToString(messages_count_));
return;
}
if (!data_receiver2_->WaitForMessagesReceived(timeout_millis)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", receiver 2 did not complete within " +
rtc::ToString(messages_count_));
return;
}
if (data_receiver1_->BytesReceivedCount() !=
data_sender2_->BytesSentCount()) {
ReportError(
"SctpPingPong id = " + rtc::ToString(id_) + ", receiver 1 received " +
rtc::ToString(data_receiver1_->BytesReceivedCount()) +
" bytes, but sender 2 send " +
rtc::ToString(rtc::ToString(data_sender2_->BytesSentCount())));
return;
}
if (data_receiver2_->BytesReceivedCount() !=
data_sender1_->BytesSentCount()) {
ReportError(
"SctpPingPong id = " + rtc::ToString(id_) + ", receiver 2 received " +
rtc::ToString(data_receiver2_->BytesReceivedCount()) +
" bytes, but sender 1 send " +
rtc::ToString(rtc::ToString(data_sender1_->BytesSentCount())));
return;
}
RTC_LOG(LS_INFO) << "SctpPingPong id = " << id_ << " is done";
}
private:
void CreateTwoConnectedSctpTransportsWithAllStreams() {
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport1_.reset(new SimulatedPacketTransport(
"SctpPingPong id = " + rtc::ToString(id_) + ", packet transport 1",
transport_thread1_, packet_loss_percents_, avg_send_delay_millis_));
data_receiver1_.reset(new SctpDataReceiver(id_, messages_count_));
sctp_transport1_.reset(new cricket::UsrsctpTransport(
transport_thread1_, packet_transport1_.get()));
sctp_transport1_->set_debug_name_for_testing("sctp transport 1");
sctp_transport1_->SignalDataReceived.connect(
data_receiver1_.get(), &SctpDataReceiver::OnDataReceived);
for (uint32_t i = cricket::kMinSctpSid; i <= cricket::kMaxSctpSid; i++) {
if (!sctp_transport1_->OpenStream(i)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sctp transport 1 stream " + rtc::ToString(i) +
" failed to open");
break;
}
}
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport2_.reset(new SimulatedPacketTransport(
"SctpPingPong id = " + rtc::ToString(id_) + "packet transport 2",
transport_thread2_, packet_loss_percents_, avg_send_delay_millis_));
data_receiver2_.reset(new SctpDataReceiver(id_, messages_count_));
sctp_transport2_.reset(new cricket::UsrsctpTransport(
transport_thread2_, packet_transport2_.get()));
sctp_transport2_->set_debug_name_for_testing("sctp transport 2");
sctp_transport2_->SignalDataReceived.connect(
data_receiver2_.get(), &SctpDataReceiver::OnDataReceived);
for (uint32_t i = cricket::kMinSctpSid; i <= cricket::kMaxSctpSid; i++) {
if (!sctp_transport2_->OpenStream(i)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", sctp transport 2 stream " + rtc::ToString(i) +
" failed to open");
break;
}
}
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport1_->SetDestination(packet_transport2_.get());
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
packet_transport2_->SetDestination(packet_transport1_.get());
});
transport_thread1_->Invoke<void>(RTC_FROM_HERE, [this] {
if (!sctp_transport1_->Start(port1_, port2_,
cricket::kSctpSendBufferSize)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", failed to start sctp transport 1");
}
});
transport_thread2_->Invoke<void>(RTC_FROM_HERE, [this] {
if (!sctp_transport2_->Start(port2_, port1_,
cricket::kSctpSendBufferSize)) {
ReportError("SctpPingPong id = " + rtc::ToString(id_) +
", failed to start sctp transport 2");
}
});
}
void ReportError(std::string error) {
webrtc::MutexLock lock(&lock_);
errors_list_.push_back(std::move(error));
}
std::unique_ptr<SimulatedPacketTransport> packet_transport1_;
std::unique_ptr<SimulatedPacketTransport> packet_transport2_;
std::unique_ptr<SctpDataReceiver> data_receiver1_;
std::unique_ptr<SctpDataReceiver> data_receiver2_;
std::unique_ptr<cricket::UsrsctpTransport> sctp_transport1_;
std::unique_ptr<cricket::UsrsctpTransport> sctp_transport2_;
std::unique_ptr<SctpDataSender> data_sender1_;
std::unique_ptr<SctpDataSender> data_sender2_;
mutable webrtc::Mutex lock_;
std::vector<std::string> errors_list_ RTC_GUARDED_BY(lock_);
const uint32_t id_;
const uint16_t port1_;
const uint16_t port2_;
rtc::Thread* const transport_thread1_;
rtc::Thread* const transport_thread2_;
const uint32_t messages_count_;
const uint8_t packet_loss_percents_;
const uint16_t avg_send_delay_millis_;
const webrtc::SendDataParams send_params_;
RTC_DISALLOW_COPY_AND_ASSIGN(SctpPingPong);
};
/**
* Helper function to calculate max number of milliseconds
* allowed for test to run based on test configuration.
*/
constexpr int32_t GetExecutionTimeLimitInMillis(uint32_t total_messages,
uint8_t packet_loss_percents) {
return std::min<int64_t>(
std::numeric_limits<int32_t>::max(),
std::max<int64_t>(
1LL * total_messages * 100 *
std::max(1, packet_loss_percents * packet_loss_percents),
kDefaultTimeout));
}
} // namespace
namespace cricket {
/**
* The set of tests intended to check usrsctp reliability on
* stress conditions: multiple sockets, concurrent access,
* lossy network link. It was observed in the past that
* usrsctp might misbehave in concurrent environment
* under load on lossy networks: deadlocks and memory corruption
* issues might happen in non-basic usage scenarios.
* It's recommended to run this test whenever usrsctp version
* used is updated to verify it properly works in stress
* conditions under higher than usual load.
* It is also recommended to enable ASAN when these tests
* are executed, so whenever memory bug is happen inside usrsctp,
* it will be easier to understand what went wrong with ASAN
* provided diagnostics information.
* The tests cases currently disabled by default due to
* long execution time and due to unresolved issue inside
* `usrsctp` library detected by try-bots with ThreadSanitizer.
*/
class UsrSctpReliabilityTest : public ::testing::Test {};
/**
* A simple test which send multiple messages over reliable
* connection, usefull to verify test infrastructure works.
* Execution time is less than 1 second.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverReliableConnection) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 0;
constexpr uint16_t avg_send_delay_millis = 10;
constexpr uint32_t messages_count = 100;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = true;
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* A test to verify that multiple messages can be reliably delivered
* over lossy network when usrsctp configured to guarantee reliably
* and in order delivery.
* The test case is disabled by default because it takes
* long time to run.
* Execution time is about 2.5 minutes.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionReliableAndInOrder) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 16;
constexpr uint32_t messages_count = 10000;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = true;
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* A test to verify that multiple messages can be reliably delivered
* over lossy network when usrsctp configured to retransmit lost
* packets.
* The test case is disabled by default because it takes
* long time to run.
* Execution time is about 2.5 minutes.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionWithRetries) {
auto thread1 = rtc::Thread::Create();
auto thread2 = rtc::Thread::Create();
thread1->Start();
thread2->Start();
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 16;
constexpr uint32_t messages_count = 10000;
constexpr int32_t wait_timeout =
GetExecutionTimeLimitInMillis(messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
webrtc::SendDataParams send_params;
send_params.ordered = false;
send_params.max_rtx_count = std::numeric_limits<uint16_t>::max();
send_params.max_rtx_ms = std::numeric_limits<uint16_t>::max();
SctpPingPong test(1, kTransport1Port, kTransport2Port, thread1.get(),
thread2.get(), messages_count, packet_loss_percents,
avg_send_delay_millis, send_params);
EXPECT_TRUE(test.Start()) << rtc::join(test.GetErrorsList(), ';');
test.WaitForCompletion(wait_timeout);
auto errors_list = test.GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
}
/**
* This is kind of reliability stress-test of usrsctp to verify
* that all messages are delivered when multiple usrsctp
* sockets used concurrently and underlying transport is lossy.
*
* It was observed in the past that in stress condtions usrsctp
* might encounter deadlock and memory corruption bugs:
* https://github.com/sctplab/usrsctp/issues/325
*
* It is recoomended to run this test whenever usrsctp version
* used by WebRTC is updated.
*
* The test case is disabled by default because it takes
* long time to run.
* Execution time of this test is about 1-2 hours.
*/
TEST_F(UsrSctpReliabilityTest,
DISABLED_AllMessagesAreDeliveredOverLossyConnectionConcurrentTests) {
ThreadPool pool(16);
webrtc::SendDataParams send_params;
send_params.ordered = true;
constexpr uint32_t base_sctp_port = 5000;
// The constants value below were experimentally chosen
// to have reasonable execution time and to reproduce
// particular deadlock issue inside usrsctp:
// https://github.com/sctplab/usrsctp/issues/325
// The constants values may be adjusted next time
// some other issue inside usrsctp need to be debugged.
constexpr uint32_t messages_count = 200;
constexpr uint8_t packet_loss_percents = 5;
constexpr uint16_t avg_send_delay_millis = 0;
constexpr uint32_t parallel_ping_pongs = 16 * 1024;
constexpr uint32_t total_ping_pong_tests = 16 * parallel_ping_pongs;
constexpr int32_t wait_timeout = GetExecutionTimeLimitInMillis(
total_ping_pong_tests * messages_count, packet_loss_percents);
static_assert(wait_timeout > 0,
"Timeout computation must produce positive value");
std::queue<std::unique_ptr<SctpPingPong>> tests;
for (uint32_t i = 0; i < total_ping_pong_tests; i++) {
uint32_t port1 =
base_sctp_port + (2 * i) % (UINT16_MAX - base_sctp_port - 1);
auto test = std::make_unique<SctpPingPong>(
i, port1, port1 + 1, pool.GetRandomThread(), pool.GetRandomThread(),
messages_count, packet_loss_percents, avg_send_delay_millis,
send_params);
EXPECT_TRUE(test->Start()) << rtc::join(test->GetErrorsList(), ';');
tests.emplace(std::move(test));
while (tests.size() >= parallel_ping_pongs) {
auto& oldest_test = tests.front();
oldest_test->WaitForCompletion(wait_timeout);
auto errors_list = oldest_test->GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
tests.pop();
}
}
while (!tests.empty()) {
auto& oldest_test = tests.front();
oldest_test->WaitForCompletion(wait_timeout);
auto errors_list = oldest_test->GetErrorsList();
EXPECT_TRUE(errors_list.empty()) << rtc::join(errors_list, ';');
tests.pop();
}
}
} // namespace cricket