blob: a17008fb3fff6550cbc289008787213310427edb [file] [log] [blame]
Taylor Brandstetter3a034e12020-07-09 22:32:341/*
2 * Copyright 2020 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "pc/sctp_data_channel.h"
12
Florent Castelli5183f002021-05-07 11:52:4413#include <limits>
Taylor Brandstetter3a034e12020-07-09 22:32:3414#include <memory>
15#include <string>
16#include <utility>
17
Taylor Brandstetter3a034e12020-07-09 22:32:3418#include "media/sctp/sctp_transport_internal.h"
Markus Handella1b82012021-05-26 16:56:3019#include "pc/proxy.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3420#include "rtc_base/checks.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3421#include "rtc_base/logging.h"
Florent Castellidcb9ffc2021-06-29 12:58:2322#include "rtc_base/system/unused.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3423#include "rtc_base/thread.h"
24
25namespace webrtc {
26
27namespace {
28
29static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
Taylor Brandstetter3a034e12020-07-09 22:32:3430
31static std::atomic<int> g_unique_id{0};
32
33int GenerateUniqueId() {
34 return ++g_unique_id;
35}
36
37// Define proxy for DataChannelInterface.
Tommi55f72802023-03-27 10:39:3338BEGIN_PROXY_MAP(DataChannel)
Mirko Bonadei9d9b8de2021-02-26 08:51:2639PROXY_PRIMARY_THREAD_DESTRUCTOR()
Tommif9e13f82023-04-06 19:21:4540BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
41BYPASS_PROXY_METHOD0(void, UnregisterObserver)
Taylor Brandstetter3a034e12020-07-09 22:32:3442BYPASS_PROXY_CONSTMETHOD0(std::string, label)
43BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
44BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
45BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
46BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
47BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
48BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
49BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
50BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
51// Can't bypass the proxy since the id may change.
Tommif9e13f82023-04-06 19:21:4552PROXY_SECONDARY_CONSTMETHOD0(int, id)
Taylor Brandstetter3a034e12020-07-09 22:32:3453BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
Tommif9e13f82023-04-06 19:21:4554BYPASS_PROXY_CONSTMETHOD0(DataState, state)
55BYPASS_PROXY_CONSTMETHOD0(RTCError, error)
56PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
57PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
58PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
59PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
60PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
61PROXY_SECONDARY_METHOD0(void, Close)
62PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
Tommia50a81a2023-04-11 15:32:3463BYPASS_PROXY_METHOD2(void,
64 SendAsync,
65 DataBuffer,
66 absl::AnyInvocable<void(RTCError) &&>)
Markus Handell3d46d0b2021-05-27 19:42:5767END_PROXY_MAP(DataChannel)
Taylor Brandstetter3a034e12020-07-09 22:32:3468} // namespace
69
70InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
71 : DataChannelInit(base), open_handshake_role(kOpener) {
72 // If the channel is externally negotiated, do not send the OPEN message.
73 if (base.negotiated) {
74 open_handshake_role = kNone;
75 } else {
76 // Datachannel is externally negotiated. Ignore the id value.
77 // Specified in createDataChannel, WebRTC spec section 6.1 bullet 13.
78 id = -1;
79 }
Florent Castelli5183f002021-05-07 11:52:4480 // Backwards compatibility: If maxRetransmits or maxRetransmitTime
81 // are negative, the feature is not enabled.
82 // Values are clamped to a 16bit range.
83 if (maxRetransmits) {
84 if (*maxRetransmits < 0) {
85 RTC_LOG(LS_ERROR)
86 << "Accepting maxRetransmits < 0 for backwards compatibility";
87 maxRetransmits = absl::nullopt;
88 } else if (*maxRetransmits > std::numeric_limits<uint16_t>::max()) {
89 maxRetransmits = std::numeric_limits<uint16_t>::max();
90 }
Taylor Brandstetter3a034e12020-07-09 22:32:3491 }
Florent Castelli5183f002021-05-07 11:52:4492
93 if (maxRetransmitTime) {
94 if (*maxRetransmitTime < 0) {
95 RTC_LOG(LS_ERROR)
96 << "Accepting maxRetransmitTime < 0 for backwards compatibility";
97 maxRetransmitTime = absl::nullopt;
98 } else if (*maxRetransmitTime > std::numeric_limits<uint16_t>::max()) {
99 maxRetransmitTime = std::numeric_limits<uint16_t>::max();
100 }
Taylor Brandstetter3a034e12020-07-09 22:32:34101 }
102}
103
Tommic2429a02023-03-03 10:28:23104bool InternalDataChannelInit::IsValid() const {
105 if (id < -1)
106 return false;
107
108 if (maxRetransmits.has_value() && maxRetransmits.value() < 0)
109 return false;
110
111 if (maxRetransmitTime.has_value() && maxRetransmitTime.value() < 0)
112 return false;
113
114 // Only one of these can be set.
115 if (maxRetransmits.has_value() && maxRetransmitTime.has_value())
116 return false;
117
118 return true;
119}
120
Tommi8efaec62023-03-21 17:45:24121StreamId SctpSidAllocator::AllocateSid(rtc::SSLRole role) {
122 RTC_DCHECK_RUN_ON(&sequence_checker_);
123 int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
124 while (potential_sid <= static_cast<int>(cricket::kMaxSctpSid)) {
125 StreamId sid(potential_sid);
126 if (used_sids_.insert(sid).second)
127 return sid;
128 potential_sid += 2;
129 }
130 RTC_LOG(LS_ERROR) << "SCTP sid allocation pool exhausted.";
131 return StreamId();
132}
133
134bool SctpSidAllocator::ReserveSid(StreamId sid) {
135 RTC_DCHECK_RUN_ON(&sequence_checker_);
Tommi492296c2023-03-12 15:59:25136 if (!sid.HasValue() || sid.stream_id_int() > cricket::kMaxSctpSid)
Taylor Brandstetter3a034e12020-07-09 22:32:34137 return false;
Tommi492296c2023-03-12 15:59:25138 return used_sids_.insert(sid).second;
Taylor Brandstetter3a034e12020-07-09 22:32:34139}
140
Tommi8efaec62023-03-21 17:45:24141void SctpSidAllocator::ReleaseSid(StreamId sid) {
142 RTC_DCHECK_RUN_ON(&sequence_checker_);
Tommi492296c2023-03-12 15:59:25143 used_sids_.erase(sid);
Taylor Brandstetter3a034e12020-07-09 22:32:34144}
145
Tommif9e13f82023-04-06 19:21:45146// A DataChannelObserver implementation that offers backwards compatibility with
147// implementations that aren't yet ready to be called back on the network
148// thread. This implementation posts events to the signaling thread where
149// events are delivered.
150// In the class, and together with the `SctpDataChannel` implementation, there's
151// special handling for the `state()` property whereby if that property is
152// queried on the channel object while inside an event callback, we return
153// the state that was active at the time the event was issued. This is to avoid
154// a problem with calling the `state()` getter on the proxy, which would do
155// a blocking call to the network thread, effectively flushing operations on
156// the network thread that could cause the state to change and eventually return
157// a misleading or arguably, wrong, state value to the callback implementation.
158// As a future improvement to the ObserverAdapter, we could do the same for
159// other properties that need to be read on the network thread. Eventually
160// all implementations should expect to be called on the network thread though
161// and the ObserverAdapter no longer be necessary.
162class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
163 public:
164 explicit ObserverAdapter(
165 SctpDataChannel* channel,
166 rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety)
167 : channel_(channel), signaling_safety_(std::move(signaling_safety)) {}
168
169 bool IsInsideCallback() const {
170 RTC_DCHECK_RUN_ON(signaling_thread());
171 return cached_getters_ != nullptr;
172 }
173
174 DataChannelInterface::DataState cached_state() const {
175 RTC_DCHECK_RUN_ON(signaling_thread());
176 RTC_DCHECK(IsInsideCallback());
177 return cached_getters_->state();
178 }
179
180 RTCError cached_error() const {
181 RTC_DCHECK_RUN_ON(signaling_thread());
182 RTC_DCHECK(IsInsideCallback());
183 return cached_getters_->error();
184 }
185
186 void SetDelegate(DataChannelObserver* delegate) {
187 RTC_DCHECK_RUN_ON(signaling_thread());
188 delegate_ = delegate;
189 safety_.reset(PendingTaskSafetyFlag::CreateDetached());
190 }
191
192 static void DeleteOnSignalingThread(
193 std::unique_ptr<ObserverAdapter> observer) {
194 auto* signaling_thread = observer->signaling_thread();
195 if (!signaling_thread->IsCurrent())
196 signaling_thread->PostTask([observer = std::move(observer)]() {});
197 }
198
199 private:
200 class CachedGetters {
201 public:
202 explicit CachedGetters(ObserverAdapter* adapter)
203 : adapter_(adapter),
204 cached_state_(adapter_->channel_->state()),
205 cached_error_(adapter_->channel_->error()) {
206 RTC_DCHECK_RUN_ON(adapter->network_thread());
207 }
208
209 ~CachedGetters() {
210 if (!was_dropped_) {
211 RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
212 RTC_DCHECK_EQ(adapter_->cached_getters_, this);
213 adapter_->cached_getters_ = nullptr;
214 }
215 }
216
217 bool PrepareForCallback() {
218 RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
219 RTC_DCHECK(was_dropped_);
220 was_dropped_ = false;
221 adapter_->cached_getters_ = this;
222 return adapter_->delegate_ && adapter_->signaling_safety_->alive();
223 }
224
225 RTCError error() { return cached_error_; }
226 DataChannelInterface::DataState state() { return cached_state_; }
227
228 private:
229 ObserverAdapter* const adapter_;
230 bool was_dropped_ = true;
231 const DataChannelInterface::DataState cached_state_;
232 const RTCError cached_error_;
233 };
234
235 void OnStateChange() override {
236 RTC_DCHECK_RUN_ON(network_thread());
237 signaling_thread()->PostTask(
238 SafeTask(safety_.flag(),
239 [this, cached_state = std::make_unique<CachedGetters>(this)] {
240 RTC_DCHECK_RUN_ON(signaling_thread());
241 if (cached_state->PrepareForCallback())
242 delegate_->OnStateChange();
243 }));
244 }
245
246 void OnMessage(const DataBuffer& buffer) override {
247 RTC_DCHECK_RUN_ON(network_thread());
248 signaling_thread()->PostTask(SafeTask(
249 safety_.flag(), [this, buffer = buffer,
250 cached_state = std::make_unique<CachedGetters>(this)] {
251 RTC_DCHECK_RUN_ON(signaling_thread());
252 if (cached_state->PrepareForCallback())
253 delegate_->OnMessage(buffer);
254 }));
255 }
256
257 void OnBufferedAmountChange(uint64_t sent_data_size) override {
258 RTC_DCHECK_RUN_ON(network_thread());
259 signaling_thread()->PostTask(SafeTask(
260 safety_.flag(), [this, sent_data_size,
261 cached_state = std::make_unique<CachedGetters>(this)] {
262 RTC_DCHECK_RUN_ON(signaling_thread());
263 if (cached_state->PrepareForCallback())
264 delegate_->OnBufferedAmountChange(sent_data_size);
265 }));
266 }
267
Tommia50a81a2023-04-11 15:32:34268 bool IsOkToCallOnTheNetworkThread() override { return true; }
269
Tommi56577cc2023-04-07 10:04:02270 rtc::Thread* signaling_thread() const { return signaling_thread_; }
Tommif9e13f82023-04-06 19:21:45271 rtc::Thread* network_thread() const { return channel_->network_thread_; }
272
273 DataChannelObserver* delegate_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
274 SctpDataChannel* const channel_;
Tommi56577cc2023-04-07 10:04:02275 // Make sure to keep our own signaling_thread_ pointer to avoid dereferencing
276 // `channel_` in the `RTC_DCHECK_RUN_ON` checks on the signaling thread.
277 rtc::Thread* const signaling_thread_{channel_->signaling_thread_};
Tommif9e13f82023-04-06 19:21:45278 ScopedTaskSafety safety_;
279 rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_;
280 CachedGetters* cached_getters_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
281};
282
Tommi1c0d91f2023-03-02 14:42:06283// static
Taylor Brandstetter3a034e12020-07-09 22:32:34284rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
Tommi1c0d91f2023-03-02 14:42:06285 rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
Taylor Brandstetter3a034e12020-07-09 22:32:34286 const std::string& label,
Tommie9aa8672023-03-20 13:43:09287 bool connected_to_transport,
Taylor Brandstetter3a034e12020-07-09 22:32:34288 const InternalDataChannelInit& config,
289 rtc::Thread* signaling_thread,
290 rtc::Thread* network_thread) {
Tommi9296a162023-03-21 15:28:52291 RTC_DCHECK(config.IsValid());
292 return rtc::make_ref_counted<SctpDataChannel>(
Tommie9aa8672023-03-20 13:43:09293 config, std::move(controller), label, connected_to_transport,
294 signaling_thread, network_thread);
Taylor Brandstetter3a034e12020-07-09 22:32:34295}
296
297// static
298rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
Tommif9e13f82023-04-06 19:21:45299 rtc::scoped_refptr<SctpDataChannel> channel,
300 rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety) {
Tommi55f72802023-03-27 10:39:33301 // Copy thread params to local variables before `std::move()`.
Tomas Gunnarsson0d5ce622022-03-18 14:57:15302 auto* signaling_thread = channel->signaling_thread_;
Tommi55f72802023-03-27 10:39:33303 auto* network_thread = channel->network_thread_;
Tommif9e13f82023-04-06 19:21:45304 channel->observer_adapter_ = std::make_unique<ObserverAdapter>(
305 channel.get(), std::move(signaling_safety));
Tommi55f72802023-03-27 10:39:33306 return DataChannelProxy::Create(signaling_thread, network_thread,
307 std::move(channel));
Taylor Brandstetter3a034e12020-07-09 22:32:34308}
309
Tommi1c0d91f2023-03-02 14:42:06310SctpDataChannel::SctpDataChannel(
311 const InternalDataChannelInit& config,
312 rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
313 const std::string& label,
Tommie9aa8672023-03-20 13:43:09314 bool connected_to_transport,
Tommi1c0d91f2023-03-02 14:42:06315 rtc::Thread* signaling_thread,
316 rtc::Thread* network_thread)
Taylor Brandstetter3a034e12020-07-09 22:32:34317 : signaling_thread_(signaling_thread),
318 network_thread_(network_thread),
Tommi1158bde2023-03-30 10:01:56319 id_n_(config.id),
Taylor Brandstetter3a034e12020-07-09 22:32:34320 internal_id_(GenerateUniqueId()),
321 label_(label),
Tommi492296c2023-03-12 15:59:25322 protocol_(config.protocol),
323 max_retransmit_time_(config.maxRetransmitTime),
324 max_retransmits_(config.maxRetransmits),
325 priority_(config.priority),
326 negotiated_(config.negotiated),
327 ordered_(config.ordered),
Taylor Brandstetter3a034e12020-07-09 22:32:34328 observer_(nullptr),
Tommia50a81a2023-04-11 15:32:34329 controller_(std::move(controller)) {
Tommi4f7ade52023-03-29 18:46:59330 RTC_DCHECK_RUN_ON(network_thread_);
331 // Since we constructed on the network thread we can't (yet) check the
332 // `controller_` pointer since doing so will trigger a thread check.
Florent Castellidcb9ffc2021-06-29 12:58:23333 RTC_UNUSED(network_thread_);
Tommi492296c2023-03-12 15:59:25334 RTC_DCHECK(config.IsValid());
Taylor Brandstetter3a034e12020-07-09 22:32:34335
Tommia50a81a2023-04-11 15:32:34336 if (connected_to_transport)
337 network_safety_->SetAlive();
338
Tommi492296c2023-03-12 15:59:25339 switch (config.open_handshake_role) {
Tommi4e1c9572023-03-15 11:36:20340 case InternalDataChannelInit::kNone: // pre-negotiated
Taylor Brandstetter3a034e12020-07-09 22:32:34341 handshake_state_ = kHandshakeReady;
342 break;
Tommi4e1c9572023-03-15 11:36:20343 case InternalDataChannelInit::kOpener:
Taylor Brandstetter3a034e12020-07-09 22:32:34344 handshake_state_ = kHandshakeShouldSendOpen;
345 break;
Tommi4e1c9572023-03-15 11:36:20346 case InternalDataChannelInit::kAcker:
Taylor Brandstetter3a034e12020-07-09 22:32:34347 handshake_state_ = kHandshakeShouldSendAck;
348 break;
349 }
Tommic2429a02023-03-03 10:28:23350}
351
Andrey Logvin7f16fcd2023-04-05 08:53:13352SctpDataChannel::~SctpDataChannel() {
Tommif9e13f82023-04-06 19:21:45353 if (observer_adapter_)
354 ObserverAdapter::DeleteOnSignalingThread(std::move(observer_adapter_));
Andrey Logvin7f16fcd2023-04-05 08:53:13355}
Taylor Brandstetter3a034e12020-07-09 22:32:34356
357void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
Tommif9e13f82023-04-06 19:21:45358 // Note: at this point, we do not know on which thread we're being called
359 // from since this method bypasses the proxy. On Android in particular,
360 // registration methods are called from unknown threads.
361
362 // Check if we should set up an observer adapter that will make sure that
363 // callbacks are delivered on the signaling thread rather than directly
364 // on the network thread.
365 const auto* current_thread = rtc::Thread::Current();
366 // TODO(webrtc:11547): Eventually all DataChannelObserver implementations
367 // should be called on the network thread and IsOkToCallOnTheNetworkThread().
368 if (!observer->IsOkToCallOnTheNetworkThread()) {
369 RTC_LOG(LS_WARNING) << "DataChannelObserver - adapter needed";
370 auto prepare_observer = [&]() {
371 RTC_DCHECK(observer_adapter_) << "CreateProxy hasn't been called";
372 observer_adapter_->SetDelegate(observer);
373 return observer_adapter_.get();
374 };
375 // Instantiate the adapter in the right context and then substitute the
376 // observer pointer the SctpDataChannel will call back on, with the adapter.
377 if (signaling_thread_ == current_thread) {
378 observer = prepare_observer();
379 } else {
380 observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
381 }
382 }
383
Tommiefb361c2023-05-09 14:41:51384 // Now do the observer registration on the network thread. In the common case,
385 // we'll do this asynchronously via `PostTask()`. For that reason we grab
386 // a reference to ourselves while the task is in flight. We can't use
387 // `SafeTask(network_safety_, ...)` for this since we can't assume that we
388 // have a transport (network_safety_ represents the transport connection).
389 rtc::scoped_refptr<SctpDataChannel> me(this);
390 auto register_observer = [me = std::move(me), observer = observer] {
391 RTC_DCHECK_RUN_ON(me->network_thread_);
392 me->observer_ = observer;
393 me->DeliverQueuedReceivedData();
Tommif9e13f82023-04-06 19:21:45394 };
395
396 if (network_thread_ == current_thread) {
397 register_observer();
398 } else {
Tommiefb361c2023-05-09 14:41:51399 network_thread_->PostTask(std::move(register_observer));
Tommif9e13f82023-04-06 19:21:45400 }
Taylor Brandstetter3a034e12020-07-09 22:32:34401}
402
403void SctpDataChannel::UnregisterObserver() {
Tommif9e13f82023-04-06 19:21:45404 // Note: As with `RegisterObserver`, the proxy is being bypassed.
405 const auto* current_thread = rtc::Thread::Current();
406 // Callers must not be invoking the unregistration from the network thread
407 // (assuming a multi-threaded environment where we have a dedicated network
408 // thread). That would indicate non-network related work happening on the
409 // network thread or that unregistration is being done from within a callback
410 // (without unwinding the stack, which is a requirement).
411 // The network thread is not allowed to make blocking calls to the signaling
412 // thread, so that would blow up if attempted. Since we support an adapter
413 // for observers that are not safe to call on the network thread, we do
414 // need to check+free it on the signaling thread.
415 RTC_DCHECK(current_thread != network_thread_ ||
416 network_thread_ == signaling_thread_);
417
418 auto unregister_observer = [&] {
419 RTC_DCHECK_RUN_ON(network_thread_);
420 observer_ = nullptr;
421 };
422
423 if (current_thread == network_thread_) {
424 unregister_observer();
425 } else {
426 network_thread_->BlockingCall(std::move(unregister_observer));
427 }
428
429 auto clear_observer = [&]() {
430 if (observer_adapter_)
431 observer_adapter_->SetDelegate(nullptr);
432 };
433
434 if (current_thread != signaling_thread_) {
435 signaling_thread_->BlockingCall(std::move(clear_observer));
436 } else {
437 clear_observer();
438 }
Taylor Brandstetter3a034e12020-07-09 22:32:34439}
440
Tommi492296c2023-03-12 15:59:25441std::string SctpDataChannel::label() const {
442 return label_;
443}
444
Taylor Brandstetter3a034e12020-07-09 22:32:34445bool SctpDataChannel::reliable() const {
446 // May be called on any thread.
Tommi492296c2023-03-12 15:59:25447 return !max_retransmits_ && !max_retransmit_time_;
448}
449
450bool SctpDataChannel::ordered() const {
451 return ordered_;
452}
453
454uint16_t SctpDataChannel::maxRetransmitTime() const {
455 return max_retransmit_time_ ? *max_retransmit_time_
456 : static_cast<uint16_t>(-1);
457}
458
459uint16_t SctpDataChannel::maxRetransmits() const {
460 return max_retransmits_ ? *max_retransmits_ : static_cast<uint16_t>(-1);
461}
462
463absl::optional<int> SctpDataChannel::maxPacketLifeTime() const {
464 return max_retransmit_time_;
465}
466
467absl::optional<int> SctpDataChannel::maxRetransmitsOpt() const {
468 return max_retransmits_;
469}
470
471std::string SctpDataChannel::protocol() const {
472 return protocol_;
473}
474
475bool SctpDataChannel::negotiated() const {
476 return negotiated_;
477}
478
479int SctpDataChannel::id() const {
Tommif9e13f82023-04-06 19:21:45480 RTC_DCHECK_RUN_ON(network_thread_);
Tommif9e13f82023-04-06 19:21:45481 return id_n_.stream_id_int();
Tommi492296c2023-03-12 15:59:25482}
483
484Priority SctpDataChannel::priority() const {
485 return priority_ ? *priority_ : Priority::kLow;
Taylor Brandstetter3a034e12020-07-09 22:32:34486}
487
488uint64_t SctpDataChannel::buffered_amount() const {
Tommif9e13f82023-04-06 19:21:45489 RTC_DCHECK_RUN_ON(network_thread_);
Florent Castelli65956852021-10-18 09:13:22490 return queued_send_data_.byte_count();
Taylor Brandstetter3a034e12020-07-09 22:32:34491}
492
493void SctpDataChannel::Close() {
Tommif9e13f82023-04-06 19:21:45494 RTC_DCHECK_RUN_ON(network_thread_);
Florent Castelli8f04c7c2022-05-05 21:43:44495 if (state_ == kClosing || state_ == kClosed)
Taylor Brandstetter3a034e12020-07-09 22:32:34496 return;
497 SetState(kClosing);
498 // Will send queued data before beginning the underlying closing procedure.
499 UpdateState();
500}
501
502SctpDataChannel::DataState SctpDataChannel::state() const {
Tommif9e13f82023-04-06 19:21:45503 // Note: The proxy is bypassed for the `state()` accessor. This is to allow
504 // observer callbacks to query what the new state is from within a state
505 // update notification without having to do a blocking call to the network
506 // thread from within a callback. This also makes it so that the returned
507 // state is guaranteed to be the new state that provoked the state change
508 // notification, whereby a blocking call to the network thread might end up
509 // getting put behind other messages on the network thread and eventually
510 // fetch a different state value (since pending messages might cause the
511 // state to change in the meantime).
512 const auto* current_thread = rtc::Thread::Current();
513 if (current_thread == signaling_thread_ && observer_adapter_ &&
514 observer_adapter_->IsInsideCallback()) {
515 return observer_adapter_->cached_state();
516 }
517
518 auto return_state = [&] {
519 RTC_DCHECK_RUN_ON(network_thread_);
520 return state_;
521 };
522
523 return current_thread == network_thread_
524 ? return_state()
525 : network_thread_->BlockingCall(std::move(return_state));
Taylor Brandstetter3a034e12020-07-09 22:32:34526}
527
528RTCError SctpDataChannel::error() const {
Tommif9e13f82023-04-06 19:21:45529 const auto* current_thread = rtc::Thread::Current();
530 if (current_thread == signaling_thread_ && observer_adapter_ &&
531 observer_adapter_->IsInsideCallback()) {
532 return observer_adapter_->cached_error();
533 }
534
535 auto return_error = [&] {
536 RTC_DCHECK_RUN_ON(network_thread_);
537 return error_;
538 };
539
540 return current_thread == network_thread_
541 ? return_error()
542 : network_thread_->BlockingCall(std::move(return_error));
Taylor Brandstetter3a034e12020-07-09 22:32:34543}
544
545uint32_t SctpDataChannel::messages_sent() const {
Tommif9e13f82023-04-06 19:21:45546 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34547 return messages_sent_;
548}
549
550uint64_t SctpDataChannel::bytes_sent() const {
Tommif9e13f82023-04-06 19:21:45551 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34552 return bytes_sent_;
553}
554
555uint32_t SctpDataChannel::messages_received() const {
Tommif9e13f82023-04-06 19:21:45556 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34557 return messages_received_;
558}
559
560uint64_t SctpDataChannel::bytes_received() const {
Tommif9e13f82023-04-06 19:21:45561 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34562 return bytes_received_;
563}
564
565bool SctpDataChannel::Send(const DataBuffer& buffer) {
Tommif9e13f82023-04-06 19:21:45566 RTC_DCHECK_RUN_ON(network_thread_);
Tommia50a81a2023-04-11 15:32:34567 RTCError err = SendImpl(buffer);
568 if (err.type() == RTCErrorType::INVALID_STATE ||
569 err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
570 return false;
571 }
Taylor Brandstetter3a034e12020-07-09 22:32:34572
Tommia50a81a2023-04-11 15:32:34573 // Always return true for SCTP DataChannel per the spec.
574 return true;
575}
576
577// RTC_RUN_ON(network_thread_);
578RTCError SctpDataChannel::SendImpl(DataBuffer buffer) {
Taylor Brandstetter3a034e12020-07-09 22:32:34579 if (state_ != kOpen) {
Tommie25c1222023-04-11 09:46:24580 error_ = RTCError(RTCErrorType::INVALID_STATE);
Tommia50a81a2023-04-11 15:32:34581 return error_;
Taylor Brandstetter3a034e12020-07-09 22:32:34582 }
583
Taylor Brandstetter3a034e12020-07-09 22:32:34584 // If the queue is non-empty, we're waiting for SignalReadyToSend,
585 // so just add to the end of the queue and keep waiting.
586 if (!queued_send_data_.Empty()) {
Tommia50a81a2023-04-11 15:32:34587 error_ = QueueSendDataMessage(buffer)
588 ? RTCError::OK()
589 : RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
590 return error_;
Taylor Brandstetter3a034e12020-07-09 22:32:34591 }
592
Tommia50a81a2023-04-11 15:32:34593 return SendDataMessage(buffer, true);
594}
Taylor Brandstetter3a034e12020-07-09 22:32:34595
Tommia50a81a2023-04-11 15:32:34596void SctpDataChannel::SendAsync(
597 DataBuffer buffer,
598 absl::AnyInvocable<void(RTCError) &&> on_complete) {
599 // Note: at this point, we do not know on which thread we're being called
600 // since this method bypasses the proxy. On Android the thread might be VM
601 // owned, on other platforms it might be the signaling thread, or in Chrome
602 // it can be the JS thread. We also don't know if it's consistently the same
603 // thread. So we always post to the network thread (even if the current thread
604 // might be the network thread - in theory a call could even come from within
605 // the `on_complete` callback).
606 network_thread_->PostTask(SafeTask(
607 network_safety_, [this, buffer = std::move(buffer),
608 on_complete = std::move(on_complete)]() mutable {
609 RTC_DCHECK_RUN_ON(network_thread_);
610 RTCError err = SendImpl(std::move(buffer));
611 if (on_complete)
612 std::move(on_complete)(err);
613 }));
Taylor Brandstetter3a034e12020-07-09 22:32:34614}
615
Tommi1158bde2023-03-30 10:01:56616void SctpDataChannel::SetSctpSid_n(StreamId sid) {
617 RTC_DCHECK_RUN_ON(network_thread_);
618 RTC_DCHECK(!id_n_.HasValue());
619 RTC_DCHECK(sid.HasValue());
Tommif9e13f82023-04-06 19:21:45620 RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
621 RTC_DCHECK_EQ(state_, kConnecting);
Tommi1158bde2023-03-30 10:01:56622 id_n_ = sid;
Taylor Brandstetter3a034e12020-07-09 22:32:34623}
624
Tommi00264ca2023-03-14 12:21:06625void SctpDataChannel::OnClosingProcedureStartedRemotely() {
Tommif9e13f82023-04-06 19:21:45626 RTC_DCHECK_RUN_ON(network_thread_);
Tommi00264ca2023-03-14 12:21:06627 if (state_ != kClosing && state_ != kClosed) {
Taylor Brandstetter3a034e12020-07-09 22:32:34628 // Don't bother sending queued data since the side that initiated the
629 // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
630 // discussion about this.
631 queued_send_data_.Clear();
632 queued_control_data_.Clear();
633 // Just need to change state to kClosing, SctpTransport will handle the
634 // rest of the closing procedure and OnClosingProcedureComplete will be
635 // called later.
636 started_closing_procedure_ = true;
637 SetState(kClosing);
638 }
639}
640
Tommi51edb562023-03-14 08:23:51641void SctpDataChannel::OnClosingProcedureComplete() {
Tommif9e13f82023-04-06 19:21:45642 RTC_DCHECK_RUN_ON(network_thread_);
Tommi51edb562023-03-14 08:23:51643 // If the closing procedure is complete, we should have finished sending
644 // all pending data and transitioned to kClosing already.
645 RTC_DCHECK_EQ(state_, kClosing);
646 RTC_DCHECK(queued_send_data_.Empty());
Tommi51edb562023-03-14 08:23:51647 SetState(kClosed);
Taylor Brandstetter3a034e12020-07-09 22:32:34648}
649
650void SctpDataChannel::OnTransportChannelCreated() {
Tommif9e13f82023-04-06 19:21:45651 RTC_DCHECK_RUN_ON(network_thread_);
Tommia50a81a2023-04-11 15:32:34652 network_safety_->SetAlive();
Taylor Brandstetter3a034e12020-07-09 22:32:34653}
654
Florent Castellidcb9ffc2021-06-29 12:58:23655void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
Tommif9e13f82023-04-06 19:21:45656 RTC_DCHECK_RUN_ON(network_thread_);
Tommi492296c2023-03-12 15:59:25657 // The SctpTransport is unusable, which could come from multiple reasons:
Florent Castellidcb9ffc2021-06-29 12:58:23658 // - the SCTP m= section was rejected
659 // - the DTLS transport is closed
660 // - the SCTP transport is closed
Taylor Brandstetter3a034e12020-07-09 22:32:34661 CloseAbruptlyWithError(std::move(error));
662}
663
664DataChannelStats SctpDataChannel::GetStats() const {
Tommif9e13f82023-04-06 19:21:45665 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34666 DataChannelStats stats{internal_id_, id(), label(),
667 protocol(), state(), messages_sent(),
668 messages_received(), bytes_sent(), bytes_received()};
669 return stats;
670}
671
Tommi4e1c9572023-03-15 11:36:20672void SctpDataChannel::OnDataReceived(DataMessageType type,
Taylor Brandstetter3a034e12020-07-09 22:32:34673 const rtc::CopyOnWriteBuffer& payload) {
Tommif9e13f82023-04-06 19:21:45674 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34675
Tommi4e1c9572023-03-15 11:36:20676 if (type == DataMessageType::kControl) {
Taylor Brandstetter3a034e12020-07-09 22:32:34677 if (handshake_state_ != kHandshakeWaitingForAck) {
678 // Ignore it if we are not expecting an ACK message.
679 RTC_LOG(LS_WARNING)
680 << "DataChannel received unexpected CONTROL message, sid = "
Tommif9e13f82023-04-06 19:21:45681 << id_n_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34682 return;
683 }
684 if (ParseDataChannelOpenAckMessage(payload)) {
685 // We can send unordered as soon as we receive the ACK message.
686 handshake_state_ = kHandshakeReady;
687 RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
Tommif9e13f82023-04-06 19:21:45688 << id_n_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34689 } else {
690 RTC_LOG(LS_WARNING)
691 << "DataChannel failed to parse OPEN_ACK message, sid = "
Tommif9e13f82023-04-06 19:21:45692 << id_n_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34693 }
694 return;
695 }
696
Tommi4e1c9572023-03-15 11:36:20697 RTC_DCHECK(type == DataMessageType::kBinary ||
698 type == DataMessageType::kText);
Taylor Brandstetter3a034e12020-07-09 22:32:34699
Tommi934a88a2023-03-15 13:34:56700 RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
Tommif9e13f82023-04-06 19:21:45701 << id_n_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34702 // We can send unordered as soon as we receive any DATA message since the
703 // remote side must have received the OPEN (and old clients do not send
704 // OPEN_ACK).
705 if (handshake_state_ == kHandshakeWaitingForAck) {
706 handshake_state_ = kHandshakeReady;
707 }
708
Tommi4e1c9572023-03-15 11:36:20709 bool binary = (type == DataMessageType::kBinary);
Taylor Brandstetter3a034e12020-07-09 22:32:34710 auto buffer = std::make_unique<DataBuffer>(payload, binary);
711 if (state_ == kOpen && observer_) {
712 ++messages_received_;
713 bytes_received_ += buffer->size();
714 observer_->OnMessage(*buffer.get());
715 } else {
716 if (queued_received_data_.byte_count() + payload.size() >
717 kMaxQueuedReceivedDataBytes) {
718 RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
719
720 queued_received_data_.Clear();
721 CloseAbruptlyWithError(
722 RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
723 "Queued received data exceeds the max buffer size."));
724
725 return;
726 }
727 queued_received_data_.PushBack(std::move(buffer));
728 }
729}
730
Tommie9aa8672023-03-20 13:43:09731void SctpDataChannel::OnTransportReady() {
Tommif9e13f82023-04-06 19:21:45732 RTC_DCHECK_RUN_ON(network_thread_);
Tommia50a81a2023-04-11 15:32:34733 RTC_DCHECK(connected_to_transport());
Tommif9e13f82023-04-06 19:21:45734 RTC_DCHECK(id_n_.HasValue());
Taylor Brandstetter3a034e12020-07-09 22:32:34735
736 SendQueuedControlMessages();
737 SendQueuedDataMessages();
738
739 UpdateState();
740}
741
742void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
Tommif9e13f82023-04-06 19:21:45743 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34744
745 if (state_ == kClosed) {
746 return;
747 }
748
Tommia50a81a2023-04-11 15:32:34749 network_safety_->SetNotAlive();
Taylor Brandstetter3a034e12020-07-09 22:32:34750
751 // Closing abruptly means any queued data gets thrown away.
Taylor Brandstetter3a034e12020-07-09 22:32:34752 queued_send_data_.Clear();
753 queued_control_data_.Clear();
754
755 // Still go to "kClosing" before "kClosed", since observers may be expecting
756 // that.
757 SetState(kClosing);
758 error_ = std::move(error);
759 SetState(kClosed);
760}
761
762void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
763 const std::string& message) {
Tommif9e13f82023-04-06 19:21:45764 RTC_DCHECK_RUN_ON(network_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34765 RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
766 error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
767 CloseAbruptlyWithError(std::move(error));
768}
769
Tommif9e13f82023-04-06 19:21:45770// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34771void SctpDataChannel::UpdateState() {
Taylor Brandstetter3a034e12020-07-09 22:32:34772 // UpdateState determines what to do from a few state variables. Include
773 // all conditions required for each state transition here for
774 // clarity. OnTransportReady(true) will send any queued data and then invoke
775 // UpdateState().
776
777 switch (state_) {
778 case kConnecting: {
Tommia50a81a2023-04-11 15:32:34779 if (connected_to_transport() && controller_) {
Taylor Brandstetter3a034e12020-07-09 22:32:34780 if (handshake_state_ == kHandshakeShouldSendOpen) {
781 rtc::CopyOnWriteBuffer payload;
Tommi492296c2023-03-12 15:59:25782 WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_,
783 max_retransmits_, max_retransmit_time_,
784 &payload);
Taylor Brandstetter3a034e12020-07-09 22:32:34785 SendControlMessage(payload);
786 } else if (handshake_state_ == kHandshakeShouldSendAck) {
787 rtc::CopyOnWriteBuffer payload;
788 WriteDataChannelOpenAckMessage(&payload);
789 SendControlMessage(payload);
790 }
Tommie08f9a92023-03-28 08:09:40791 if (handshake_state_ == kHandshakeReady ||
792 handshake_state_ == kHandshakeWaitingForAck) {
Taylor Brandstetter3a034e12020-07-09 22:32:34793 SetState(kOpen);
794 // If we have received buffers before the channel got writable.
795 // Deliver them now.
796 DeliverQueuedReceivedData();
797 }
Tommie9aa8672023-03-20 13:43:09798 } else {
Tommif9e13f82023-04-06 19:21:45799 RTC_DCHECK(!id_n_.HasValue());
Taylor Brandstetter3a034e12020-07-09 22:32:34800 }
801 break;
802 }
803 case kOpen: {
804 break;
805 }
806 case kClosing: {
Tommia50a81a2023-04-11 15:32:34807 if (connected_to_transport() && controller_) {
Tommif21354c2023-03-07 07:43:24808 // Wait for all queued data to be sent before beginning the closing
809 // procedure.
810 if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
811 // For SCTP data channels, we need to wait for the closing procedure
812 // to complete; after calling RemoveSctpDataStream,
813 // OnClosingProcedureComplete will end up called asynchronously
814 // afterwards.
Tommif9e13f82023-04-06 19:21:45815 if (!started_closing_procedure_ && id_n_.HasValue()) {
Tommif21354c2023-03-07 07:43:24816 started_closing_procedure_ = true;
Tommif9e13f82023-04-06 19:21:45817 controller_->RemoveSctpDataStream(id_n_);
Tommif21354c2023-03-07 07:43:24818 }
Taylor Brandstetter3a034e12020-07-09 22:32:34819 }
Tommif21354c2023-03-07 07:43:24820 } else {
821 // When we're not connected to a transport, we'll transition
822 // directly to the `kClosed` state from here.
823 queued_send_data_.Clear();
824 queued_control_data_.Clear();
825 SetState(kClosed);
Taylor Brandstetter3a034e12020-07-09 22:32:34826 }
827 break;
828 }
829 case kClosed:
830 break;
831 }
832}
833
Tommif9e13f82023-04-06 19:21:45834// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34835void SctpDataChannel::SetState(DataState state) {
Taylor Brandstetter3a034e12020-07-09 22:32:34836 if (state_ == state) {
837 return;
838 }
839
840 state_ = state;
841 if (observer_) {
842 observer_->OnStateChange();
843 }
Tommid2afbaf2023-03-02 09:51:16844
Tommi1c0d91f2023-03-02 14:42:06845 if (controller_)
Tommid2afbaf2023-03-02 09:51:16846 controller_->OnChannelStateChanged(this, state_);
Taylor Brandstetter3a034e12020-07-09 22:32:34847}
848
Tommif9e13f82023-04-06 19:21:45849// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34850void SctpDataChannel::DeliverQueuedReceivedData() {
Yuwei Huang20838942023-05-09 23:01:33851 if (!observer_ || state_ != kOpen) {
Taylor Brandstetter3a034e12020-07-09 22:32:34852 return;
853 }
854
855 while (!queued_received_data_.Empty()) {
856 std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
857 ++messages_received_;
858 bytes_received_ += buffer->size();
859 observer_->OnMessage(*buffer);
860 }
861}
862
Tommif9e13f82023-04-06 19:21:45863// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34864void SctpDataChannel::SendQueuedDataMessages() {
Taylor Brandstetter3a034e12020-07-09 22:32:34865 if (queued_send_data_.Empty()) {
866 return;
867 }
868
869 RTC_DCHECK(state_ == kOpen || state_ == kClosing);
870
871 while (!queued_send_data_.Empty()) {
872 std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
Tommia50a81a2023-04-11 15:32:34873 if (!SendDataMessage(*buffer, false).ok()) {
Taylor Brandstetter3a034e12020-07-09 22:32:34874 // Return the message to the front of the queue if sending is aborted.
875 queued_send_data_.PushFront(std::move(buffer));
876 break;
877 }
878 }
879}
880
Tommif9e13f82023-04-06 19:21:45881// RTC_RUN_ON(network_thread_).
Tommia50a81a2023-04-11 15:32:34882RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
883 bool queue_if_blocked) {
Florent Castellid95b1492021-05-10 09:29:56884 SendDataParams send_params;
Tommi1c0d91f2023-03-02 14:42:06885 if (!controller_) {
Tommie25c1222023-04-11 09:46:24886 error_ = RTCError(RTCErrorType::INVALID_STATE);
Tommia50a81a2023-04-11 15:32:34887 return error_;
Harald Alvestrand9e5aeb92022-05-11 09:35:36888 }
Taylor Brandstetter3a034e12020-07-09 22:32:34889
Tommi492296c2023-03-12 15:59:25890 send_params.ordered = ordered_;
Taylor Brandstetter3a034e12020-07-09 22:32:34891 // Send as ordered if it is still going through OPEN/ACK signaling.
Tommi492296c2023-03-12 15:59:25892 if (handshake_state_ != kHandshakeReady && !ordered_) {
Taylor Brandstetter3a034e12020-07-09 22:32:34893 send_params.ordered = true;
Tommi934a88a2023-03-15 13:34:56894 RTC_DLOG(LS_VERBOSE)
Taylor Brandstetter3a034e12020-07-09 22:32:34895 << "Sending data as ordered for unordered DataChannel "
896 "because the OPEN_ACK message has not been received.";
897 }
898
Tommi492296c2023-03-12 15:59:25899 send_params.max_rtx_count = max_retransmits_;
900 send_params.max_rtx_ms = max_retransmit_time_;
Florent Castellid95b1492021-05-10 09:29:56901 send_params.type =
902 buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
Taylor Brandstetter3a034e12020-07-09 22:32:34903
Tommie25c1222023-04-11 09:46:24904 error_ = controller_->SendData(id_n_, send_params, buffer.data);
905 if (error_.ok()) {
Taylor Brandstetter3a034e12020-07-09 22:32:34906 ++messages_sent_;
907 bytes_sent_ += buffer.size();
908
Taylor Brandstetter3a034e12020-07-09 22:32:34909 if (observer_ && buffer.size() > 0) {
910 observer_->OnBufferedAmountChange(buffer.size());
911 }
Tommia50a81a2023-04-11 15:32:34912 return error_;
Taylor Brandstetter3a034e12020-07-09 22:32:34913 }
914
Tommie25c1222023-04-11 09:46:24915 if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
Tommia50a81a2023-04-11 15:32:34916 if (!queue_if_blocked)
917 return error_;
918
919 if (QueueSendDataMessage(buffer)) {
920 error_ = RTCError::OK();
921 return error_;
Taylor Brandstetter3a034e12020-07-09 22:32:34922 }
923 }
924 // Close the channel if the error is not SDR_BLOCK, or if queuing the
925 // message failed.
926 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
927 "send_result = "
Tommie25c1222023-04-11 09:46:24928 << ToString(error_.type()) << ":" << error_.message();
Taylor Brandstetter3a034e12020-07-09 22:32:34929 CloseAbruptlyWithError(
930 RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
931
Tommia50a81a2023-04-11 15:32:34932 return error_;
Taylor Brandstetter3a034e12020-07-09 22:32:34933}
934
Tommif9e13f82023-04-06 19:21:45935// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34936bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
Taylor Brandstetter3a034e12020-07-09 22:32:34937 size_t start_buffered_amount = queued_send_data_.byte_count();
Florent Castellia563a2a2021-10-18 09:46:21938 if (start_buffered_amount + buffer.size() >
939 DataChannelInterface::MaxSendQueueSize()) {
Taylor Brandstetter3a034e12020-07-09 22:32:34940 RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
Tommie25c1222023-04-11 09:46:24941 error_ = RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
Taylor Brandstetter3a034e12020-07-09 22:32:34942 return false;
943 }
944 queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
945 return true;
946}
947
Tommif9e13f82023-04-06 19:21:45948// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34949void SctpDataChannel::SendQueuedControlMessages() {
Taylor Brandstetter3a034e12020-07-09 22:32:34950 PacketQueue control_packets;
951 control_packets.Swap(&queued_control_data_);
952
953 while (!control_packets.Empty()) {
954 std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
955 SendControlMessage(buf->data);
956 }
957}
958
Tommif9e13f82023-04-06 19:21:45959// RTC_RUN_ON(network_thread_).
Taylor Brandstetter3a034e12020-07-09 22:32:34960bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
Tommia50a81a2023-04-11 15:32:34961 RTC_DCHECK(connected_to_transport());
Tommif9e13f82023-04-06 19:21:45962 RTC_DCHECK(id_n_.HasValue());
Tommi1158bde2023-03-30 10:01:56963 RTC_DCHECK(controller_);
Taylor Brandstetter3a034e12020-07-09 22:32:34964
965 bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
Tommi492296c2023-03-12 15:59:25966 RTC_DCHECK(!is_open_message || !negotiated_);
Taylor Brandstetter3a034e12020-07-09 22:32:34967
Florent Castellid95b1492021-05-10 09:29:56968 SendDataParams send_params;
Taylor Brandstetter3a034e12020-07-09 22:32:34969 // Send data as ordered before we receive any message from the remote peer to
970 // make sure the remote peer will not receive any data before it receives the
971 // OPEN message.
Tommi492296c2023-03-12 15:59:25972 send_params.ordered = ordered_ || is_open_message;
Florent Castellid95b1492021-05-10 09:29:56973 send_params.type = DataMessageType::kControl;
Taylor Brandstetter3a034e12020-07-09 22:32:34974
Tommif9e13f82023-04-06 19:21:45975 RTCError err = controller_->SendData(id_n_, send_params, buffer);
Tommi1fabbac2023-03-21 13:48:51976 if (err.ok()) {
Tommi934a88a2023-03-15 13:34:56977 RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
Tommif9e13f82023-04-06 19:21:45978 << id_n_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34979
980 if (handshake_state_ == kHandshakeShouldSendAck) {
981 handshake_state_ = kHandshakeReady;
982 } else if (handshake_state_ == kHandshakeShouldSendOpen) {
983 handshake_state_ = kHandshakeWaitingForAck;
984 }
Tommi1fabbac2023-03-21 13:48:51985 } else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
Tommi1158bde2023-03-30 10:01:56986 queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
Taylor Brandstetter3a034e12020-07-09 22:32:34987 } else {
988 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
989 " the CONTROL message, send_result = "
Tommi1fabbac2023-03-21 13:48:51990 << ToString(err.type());
991 err.set_message("Failed to send a CONTROL message");
992 CloseAbruptlyWithError(err);
Taylor Brandstetter3a034e12020-07-09 22:32:34993 }
Tommi1fabbac2023-03-21 13:48:51994 return err.ok();
Taylor Brandstetter3a034e12020-07-09 22:32:34995}
996
997// static
998void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
999 g_unique_id = new_value;
1000}
1001
Taylor Brandstetter3a034e12020-07-09 22:32:341002} // namespace webrtc