Add SendMany method to dcsctp socket
Bug: webrtc:15724
Change-Id: Ib1689cd46395e2315803714ef50c009580fd71bb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/331021
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41397}
diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h
index d0a81ea..9989ae8 100644
--- a/net/dcsctp/public/dcsctp_socket.h
+++ b/net/dcsctp/public/dcsctp_socket.h
@@ -13,6 +13,7 @@
#include <cstdint>
#include <memory>
#include <utility>
+#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@@ -577,6 +578,16 @@
virtual SendStatus Send(DcSctpMessage message,
const SendOptions& send_options) = 0;
+ // Sends the messages `messages` using the provided send options.
+ // Sending a message is an asynchronous operation, and the `OnError` callback
+ // may be invoked to indicate any errors in sending the message.
+ //
+ // This has identical semantics to Send, except that it may coalesce many
+ // messages into a single SCTP packet if they would fit.
+ virtual std::vector<SendStatus> SendMany(
+ rtc::ArrayView<DcSctpMessage> messages,
+ const SendOptions& send_options) = 0;
+
// Resetting streams is an asynchronous operation and the results will
// be notified using `DcSctpSocketCallbacks::OnStreamsResetDone()` on success
// and `DcSctpSocketCallbacks::OnStreamsResetFailed()` on failure. Note that
diff --git a/net/dcsctp/public/mock_dcsctp_socket.h b/net/dcsctp/public/mock_dcsctp_socket.h
index 0fd572b..c71c3ae 100644
--- a/net/dcsctp/public/mock_dcsctp_socket.h
+++ b/net/dcsctp/public/mock_dcsctp_socket.h
@@ -10,6 +10,8 @@
#ifndef NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_
#define NET_DCSCTP_PUBLIC_MOCK_DCSCTP_SOCKET_H_
+#include <vector>
+
#include "net/dcsctp/public/dcsctp_socket.h"
#include "test/gmock.h"
@@ -56,6 +58,12 @@
(DcSctpMessage message, const SendOptions& send_options),
(override));
+ MOCK_METHOD(std::vector<SendStatus>,
+ SendMany,
+ (rtc::ArrayView<DcSctpMessage> messages,
+ const SendOptions& send_options),
+ (override));
+
MOCK_METHOD(ResetStreamsStatus,
ResetStreams,
(rtc::ArrayView<const StreamID> outgoing_streams),
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 5fc9bf5..bdf79e4 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -475,8 +475,43 @@
const SendOptions& send_options) {
RTC_DCHECK_RUN_ON(&thread_checker_);
CallbackDeferrer::ScopedDeferrer deferrer(callbacks_);
- LifecycleId lifecycle_id = send_options.lifecycle_id;
+ SendStatus send_status = InternalSend(message, send_options);
+ if (send_status != SendStatus::kSuccess)
+ return send_status;
+ Timestamp now = callbacks_.Now();
+ ++metrics_.tx_messages_count;
+ send_queue_.Add(now, std::move(message), send_options);
+ if (tcb_ != nullptr)
+ tcb_->SendBufferedPackets(now);
+ RTC_DCHECK(IsConsistent());
+ return SendStatus::kSuccess;
+}
+std::vector<SendStatus> DcSctpSocket::SendMany(
+ rtc::ArrayView<DcSctpMessage> messages,
+ const SendOptions& send_options) {
+ RTC_DCHECK_RUN_ON(&thread_checker_);
+ CallbackDeferrer::ScopedDeferrer deferrer(callbacks_);
+ Timestamp now = callbacks_.Now();
+ std::vector<SendStatus> send_statuses;
+ send_statuses.reserve(messages.size());
+ for (DcSctpMessage& message : messages) {
+ SendStatus send_status = InternalSend(message, send_options);
+ send_statuses.push_back(send_status);
+ if (send_status != SendStatus::kSuccess)
+ continue;
+ ++metrics_.tx_messages_count;
+ send_queue_.Add(now, std::move(message), send_options);
+ }
+ if (tcb_ != nullptr)
+ tcb_->SendBufferedPackets(now);
+ RTC_DCHECK(IsConsistent());
+ return send_statuses;
+}
+
+SendStatus DcSctpSocket::InternalSend(const DcSctpMessage& message,
+ const SendOptions& send_options) {
+ LifecycleId lifecycle_id = send_options.lifecycle_id;
if (message.payload().empty()) {
if (lifecycle_id.IsSet()) {
callbacks_.OnLifecycleEnd(lifecycle_id);
@@ -514,15 +549,6 @@
"Unable to send message as the send queue is full");
return SendStatus::kErrorResourceExhaustion;
}
-
- Timestamp now = callbacks_.Now();
- ++metrics_.tx_messages_count;
- send_queue_.Add(now, std::move(message), send_options);
- if (tcb_ != nullptr) {
- tcb_->SendBufferedPackets(now);
- }
-
- RTC_DCHECK(IsConsistent());
return SendStatus::kSuccess;
}
diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h
index c59d6ca..2712d70 100644
--- a/net/dcsctp/socket/dcsctp_socket.h
+++ b/net/dcsctp/socket/dcsctp_socket.h
@@ -14,6 +14,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <vector>
#include "absl/strings/string_view.h"
#include "api/array_view.h"
@@ -91,6 +92,8 @@
void Close() override;
SendStatus Send(DcSctpMessage message,
const SendOptions& send_options) override;
+ std::vector<SendStatus> SendMany(rtc::ArrayView<DcSctpMessage> messages,
+ const SendOptions& send_options) override;
ResetStreamsStatus ResetStreams(
rtc::ArrayView<const StreamID> outgoing_streams) override;
SocketState state() const override;
@@ -165,6 +168,9 @@
void MaybeSendShutdownOnPacketReceived(const SctpPacket& packet);
// If there are streams pending to be reset, send a request to reset them.
void MaybeSendResetStreamsRequest();
+ // Performs internal processing shared between Send and SendMany.
+ SendStatus InternalSend(const DcSctpMessage& message,
+ const SendOptions& send_options);
// Sends a INIT chunk.
void SendInit();
// Sends a SHUTDOWN chunk.
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index bb080d6..413516b 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -66,6 +66,7 @@
using ::testing::_;
using ::testing::AllOf;
using ::testing::ElementsAre;
+using ::testing::ElementsAreArray;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::IsEmpty;
@@ -1561,6 +1562,33 @@
EXPECT_EQ(a.socket.options().max_message_size, 42u);
}
+TEST_P(DcSctpSocketParametrizedTest, SendManyMessages) {
+ SocketUnderTest a("A");
+ auto z = std::make_unique<SocketUnderTest>("Z");
+
+ ConnectSockets(a, *z);
+ z = MaybeHandoverSocket(std::move(z));
+
+ static constexpr int kIterations = 100;
+ std::vector<DcSctpMessage> messages;
+ std::vector<SendStatus> statuses;
+ for (int i = 0; i < kIterations; ++i) {
+ messages.push_back(DcSctpMessage(StreamID(1), PPID(53), {1, 2}));
+ statuses.push_back(SendStatus::kSuccess);
+ }
+ EXPECT_THAT(a.socket.SendMany(messages, {}), ElementsAreArray(statuses));
+
+ ExchangeMessages(a, *z);
+
+ for (int i = 0; i < kIterations; ++i) {
+ EXPECT_TRUE(z->cb.ConsumeReceivedMessage().has_value());
+ }
+
+ EXPECT_FALSE(z->cb.ConsumeReceivedMessage().has_value());
+
+ MaybeHandoverSocketAndSendMessage(a, std::move(z));
+}
+
TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) {
SocketUnderTest a("A");
auto z = std::make_unique<SocketUnderTest>("Z");