blob: 2280174baa247e0d6ca150f299e370f479703f13 [file] [log] [blame]
Taylor Brandstetter3a034e12020-07-09 22:32:341/*
2 * Copyright 2020 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "pc/sctp_data_channel.h"
12
Florent Castelli5183f002021-05-07 11:52:4413#include <limits>
Taylor Brandstetter3a034e12020-07-09 22:32:3414#include <memory>
15#include <string>
16#include <utility>
17
Danil Chapovalova30439b2022-07-07 08:08:4918#include "absl/cleanup/cleanup.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3419#include "media/sctp/sctp_transport_internal.h"
Markus Handella1b82012021-05-26 16:56:3020#include "pc/proxy.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3421#include "rtc_base/checks.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3422#include "rtc_base/logging.h"
Florent Castellidcb9ffc2021-06-29 12:58:2323#include "rtc_base/system/unused.h"
Taylor Brandstetter3a034e12020-07-09 22:32:3424#include "rtc_base/thread.h"
25
26namespace webrtc {
27
28namespace {
29
30static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
Taylor Brandstetter3a034e12020-07-09 22:32:3431
32static std::atomic<int> g_unique_id{0};
33
34int GenerateUniqueId() {
35 return ++g_unique_id;
36}
37
38// Define proxy for DataChannelInterface.
Tommi55f72802023-03-27 10:39:3339BEGIN_PROXY_MAP(DataChannel)
Mirko Bonadei9d9b8de2021-02-26 08:51:2640PROXY_PRIMARY_THREAD_DESTRUCTOR()
Taylor Brandstetter3a034e12020-07-09 22:32:3441PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
42PROXY_METHOD0(void, UnregisterObserver)
43BYPASS_PROXY_CONSTMETHOD0(std::string, label)
44BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
45BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
46BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
47BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
48BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
49BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
50BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
51BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
52// Can't bypass the proxy since the id may change.
53PROXY_CONSTMETHOD0(int, id)
54BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
55PROXY_CONSTMETHOD0(DataState, state)
56PROXY_CONSTMETHOD0(RTCError, error)
57PROXY_CONSTMETHOD0(uint32_t, messages_sent)
58PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
59PROXY_CONSTMETHOD0(uint32_t, messages_received)
60PROXY_CONSTMETHOD0(uint64_t, bytes_received)
61PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
62PROXY_METHOD0(void, Close)
63// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
64PROXY_METHOD1(bool, Send, const DataBuffer&)
Markus Handell3d46d0b2021-05-27 19:42:5765END_PROXY_MAP(DataChannel)
Taylor Brandstetter3a034e12020-07-09 22:32:3466
67} // namespace
68
69InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
70 : DataChannelInit(base), open_handshake_role(kOpener) {
71 // If the channel is externally negotiated, do not send the OPEN message.
72 if (base.negotiated) {
73 open_handshake_role = kNone;
74 } else {
75 // Datachannel is externally negotiated. Ignore the id value.
76 // Specified in createDataChannel, WebRTC spec section 6.1 bullet 13.
77 id = -1;
78 }
Florent Castelli5183f002021-05-07 11:52:4479 // Backwards compatibility: If maxRetransmits or maxRetransmitTime
80 // are negative, the feature is not enabled.
81 // Values are clamped to a 16bit range.
82 if (maxRetransmits) {
83 if (*maxRetransmits < 0) {
84 RTC_LOG(LS_ERROR)
85 << "Accepting maxRetransmits < 0 for backwards compatibility";
86 maxRetransmits = absl::nullopt;
87 } else if (*maxRetransmits > std::numeric_limits<uint16_t>::max()) {
88 maxRetransmits = std::numeric_limits<uint16_t>::max();
89 }
Taylor Brandstetter3a034e12020-07-09 22:32:3490 }
Florent Castelli5183f002021-05-07 11:52:4491
92 if (maxRetransmitTime) {
93 if (*maxRetransmitTime < 0) {
94 RTC_LOG(LS_ERROR)
95 << "Accepting maxRetransmitTime < 0 for backwards compatibility";
96 maxRetransmitTime = absl::nullopt;
97 } else if (*maxRetransmitTime > std::numeric_limits<uint16_t>::max()) {
98 maxRetransmitTime = std::numeric_limits<uint16_t>::max();
99 }
Taylor Brandstetter3a034e12020-07-09 22:32:34100 }
101}
102
Tommic2429a02023-03-03 10:28:23103bool InternalDataChannelInit::IsValid() const {
104 if (id < -1)
105 return false;
106
107 if (maxRetransmits.has_value() && maxRetransmits.value() < 0)
108 return false;
109
110 if (maxRetransmitTime.has_value() && maxRetransmitTime.value() < 0)
111 return false;
112
113 // Only one of these can be set.
114 if (maxRetransmits.has_value() && maxRetransmitTime.has_value())
115 return false;
116
117 return true;
118}
119
Tommi8efaec62023-03-21 17:45:24120SctpSidAllocator::SctpSidAllocator() {
121 sequence_checker_.Detach();
Taylor Brandstetter3a034e12020-07-09 22:32:34122}
123
Tommi8efaec62023-03-21 17:45:24124StreamId SctpSidAllocator::AllocateSid(rtc::SSLRole role) {
125 RTC_DCHECK_RUN_ON(&sequence_checker_);
126 int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
127 while (potential_sid <= static_cast<int>(cricket::kMaxSctpSid)) {
128 StreamId sid(potential_sid);
129 if (used_sids_.insert(sid).second)
130 return sid;
131 potential_sid += 2;
132 }
133 RTC_LOG(LS_ERROR) << "SCTP sid allocation pool exhausted.";
134 return StreamId();
135}
136
137bool SctpSidAllocator::ReserveSid(StreamId sid) {
138 RTC_DCHECK_RUN_ON(&sequence_checker_);
Tommi492296c2023-03-12 15:59:25139 if (!sid.HasValue() || sid.stream_id_int() > cricket::kMaxSctpSid)
Taylor Brandstetter3a034e12020-07-09 22:32:34140 return false;
Tommi492296c2023-03-12 15:59:25141 return used_sids_.insert(sid).second;
Taylor Brandstetter3a034e12020-07-09 22:32:34142}
143
Tommi8efaec62023-03-21 17:45:24144void SctpSidAllocator::ReleaseSid(StreamId sid) {
145 RTC_DCHECK_RUN_ON(&sequence_checker_);
Tommi492296c2023-03-12 15:59:25146 used_sids_.erase(sid);
Taylor Brandstetter3a034e12020-07-09 22:32:34147}
148
Tommi1c0d91f2023-03-02 14:42:06149// static
Taylor Brandstetter3a034e12020-07-09 22:32:34150rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
Tommi1c0d91f2023-03-02 14:42:06151 rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
Taylor Brandstetter3a034e12020-07-09 22:32:34152 const std::string& label,
Tommie9aa8672023-03-20 13:43:09153 bool connected_to_transport,
Taylor Brandstetter3a034e12020-07-09 22:32:34154 const InternalDataChannelInit& config,
155 rtc::Thread* signaling_thread,
156 rtc::Thread* network_thread) {
Tommic2429a02023-03-03 10:28:23157 RTC_DCHECK(controller);
Tommi9296a162023-03-21 15:28:52158 RTC_DCHECK(config.IsValid());
159 return rtc::make_ref_counted<SctpDataChannel>(
Tommie9aa8672023-03-20 13:43:09160 config, std::move(controller), label, connected_to_transport,
161 signaling_thread, network_thread);
Taylor Brandstetter3a034e12020-07-09 22:32:34162}
163
164// static
165rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
166 rtc::scoped_refptr<SctpDataChannel> channel) {
Tommi55f72802023-03-27 10:39:33167 // Copy thread params to local variables before `std::move()`.
Tomas Gunnarsson0d5ce622022-03-18 14:57:15168 auto* signaling_thread = channel->signaling_thread_;
Tommi55f72802023-03-27 10:39:33169 auto* network_thread = channel->network_thread_;
170 return DataChannelProxy::Create(signaling_thread, network_thread,
171 std::move(channel));
Taylor Brandstetter3a034e12020-07-09 22:32:34172}
173
Tommi1c0d91f2023-03-02 14:42:06174SctpDataChannel::SctpDataChannel(
175 const InternalDataChannelInit& config,
176 rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
177 const std::string& label,
Tommie9aa8672023-03-20 13:43:09178 bool connected_to_transport,
Tommi1c0d91f2023-03-02 14:42:06179 rtc::Thread* signaling_thread,
180 rtc::Thread* network_thread)
Taylor Brandstetter3a034e12020-07-09 22:32:34181 : signaling_thread_(signaling_thread),
182 network_thread_(network_thread),
Tommi492296c2023-03-12 15:59:25183 id_(config.id),
Taylor Brandstetter3a034e12020-07-09 22:32:34184 internal_id_(GenerateUniqueId()),
185 label_(label),
Tommi492296c2023-03-12 15:59:25186 protocol_(config.protocol),
187 max_retransmit_time_(config.maxRetransmitTime),
188 max_retransmits_(config.maxRetransmits),
189 priority_(config.priority),
190 negotiated_(config.negotiated),
191 ordered_(config.ordered),
Taylor Brandstetter3a034e12020-07-09 22:32:34192 observer_(nullptr),
Tommie9aa8672023-03-20 13:43:09193 controller_(std::move(controller)),
194 connected_to_transport_(connected_to_transport) {
Taylor Brandstetter3a034e12020-07-09 22:32:34195 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castellidcb9ffc2021-06-29 12:58:23196 RTC_UNUSED(network_thread_);
Tommi492296c2023-03-12 15:59:25197 RTC_DCHECK(config.IsValid());
Tommifaf33872023-03-16 08:25:29198 RTC_DCHECK(controller_);
Taylor Brandstetter3a034e12020-07-09 22:32:34199
Tommi492296c2023-03-12 15:59:25200 switch (config.open_handshake_role) {
Tommi4e1c9572023-03-15 11:36:20201 case InternalDataChannelInit::kNone: // pre-negotiated
Taylor Brandstetter3a034e12020-07-09 22:32:34202 handshake_state_ = kHandshakeReady;
203 break;
Tommi4e1c9572023-03-15 11:36:20204 case InternalDataChannelInit::kOpener:
Taylor Brandstetter3a034e12020-07-09 22:32:34205 handshake_state_ = kHandshakeShouldSendOpen;
206 break;
Tommi4e1c9572023-03-15 11:36:20207 case InternalDataChannelInit::kAcker:
Taylor Brandstetter3a034e12020-07-09 22:32:34208 handshake_state_ = kHandshakeShouldSendAck;
209 break;
210 }
Tommie9aa8672023-03-20 13:43:09211
212 // Try to connect to the transport in case the transport channel already
213 // exists.
Tommi55f72802023-03-27 10:39:33214 if (id_.HasValue() && connected_to_transport_) {
215 network_thread_->BlockingCall(
216 [c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
Tommie9aa8672023-03-20 13:43:09217 }
Tommic2429a02023-03-03 10:28:23218}
219
Taylor Brandstetter3a034e12020-07-09 22:32:34220SctpDataChannel::~SctpDataChannel() {
221 RTC_DCHECK_RUN_ON(signaling_thread_);
222}
223
224void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
225 RTC_DCHECK_RUN_ON(signaling_thread_);
226 observer_ = observer;
227 DeliverQueuedReceivedData();
228}
229
230void SctpDataChannel::UnregisterObserver() {
231 RTC_DCHECK_RUN_ON(signaling_thread_);
232 observer_ = nullptr;
233}
234
Tommi492296c2023-03-12 15:59:25235std::string SctpDataChannel::label() const {
236 return label_;
237}
238
Taylor Brandstetter3a034e12020-07-09 22:32:34239bool SctpDataChannel::reliable() const {
240 // May be called on any thread.
Tommi492296c2023-03-12 15:59:25241 return !max_retransmits_ && !max_retransmit_time_;
242}
243
244bool SctpDataChannel::ordered() const {
245 return ordered_;
246}
247
248uint16_t SctpDataChannel::maxRetransmitTime() const {
249 return max_retransmit_time_ ? *max_retransmit_time_
250 : static_cast<uint16_t>(-1);
251}
252
253uint16_t SctpDataChannel::maxRetransmits() const {
254 return max_retransmits_ ? *max_retransmits_ : static_cast<uint16_t>(-1);
255}
256
257absl::optional<int> SctpDataChannel::maxPacketLifeTime() const {
258 return max_retransmit_time_;
259}
260
261absl::optional<int> SctpDataChannel::maxRetransmitsOpt() const {
262 return max_retransmits_;
263}
264
265std::string SctpDataChannel::protocol() const {
266 return protocol_;
267}
268
269bool SctpDataChannel::negotiated() const {
270 return negotiated_;
271}
272
273int SctpDataChannel::id() const {
274 return id_.stream_id_int();
275}
276
277Priority SctpDataChannel::priority() const {
278 return priority_ ? *priority_ : Priority::kLow;
Taylor Brandstetter3a034e12020-07-09 22:32:34279}
280
281uint64_t SctpDataChannel::buffered_amount() const {
282 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castelli65956852021-10-18 09:13:22283 return queued_send_data_.byte_count();
Taylor Brandstetter3a034e12020-07-09 22:32:34284}
285
286void SctpDataChannel::Close() {
287 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castelli8f04c7c2022-05-05 21:43:44288 if (state_ == kClosing || state_ == kClosed)
Taylor Brandstetter3a034e12020-07-09 22:32:34289 return;
290 SetState(kClosing);
291 // Will send queued data before beginning the underlying closing procedure.
292 UpdateState();
293}
294
295SctpDataChannel::DataState SctpDataChannel::state() const {
296 RTC_DCHECK_RUN_ON(signaling_thread_);
297 return state_;
298}
299
300RTCError SctpDataChannel::error() const {
301 RTC_DCHECK_RUN_ON(signaling_thread_);
302 return error_;
303}
304
305uint32_t SctpDataChannel::messages_sent() const {
306 RTC_DCHECK_RUN_ON(signaling_thread_);
307 return messages_sent_;
308}
309
310uint64_t SctpDataChannel::bytes_sent() const {
311 RTC_DCHECK_RUN_ON(signaling_thread_);
312 return bytes_sent_;
313}
314
315uint32_t SctpDataChannel::messages_received() const {
316 RTC_DCHECK_RUN_ON(signaling_thread_);
317 return messages_received_;
318}
319
320uint64_t SctpDataChannel::bytes_received() const {
321 RTC_DCHECK_RUN_ON(signaling_thread_);
322 return bytes_received_;
323}
324
325bool SctpDataChannel::Send(const DataBuffer& buffer) {
326 RTC_DCHECK_RUN_ON(signaling_thread_);
327 // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
328 // thread. Bring buffer management etc to the network thread and keep the
329 // operational state management on the signaling thread.
330
331 if (state_ != kOpen) {
332 return false;
333 }
334
Taylor Brandstetter3a034e12020-07-09 22:32:34335 // If the queue is non-empty, we're waiting for SignalReadyToSend,
336 // so just add to the end of the queue and keep waiting.
337 if (!queued_send_data_.Empty()) {
Tommi51edb562023-03-14 08:23:51338 return QueueSendDataMessage(buffer);
Taylor Brandstetter3a034e12020-07-09 22:32:34339 }
340
341 SendDataMessage(buffer, true);
342
343 // Always return true for SCTP DataChannel per the spec.
344 return true;
345}
346
Tommi492296c2023-03-12 15:59:25347void SctpDataChannel::SetSctpSid(const StreamId& sid) {
Taylor Brandstetter3a034e12020-07-09 22:32:34348 RTC_DCHECK_RUN_ON(signaling_thread_);
Tommi492296c2023-03-12 15:59:25349 RTC_DCHECK(!id_.HasValue());
350 RTC_DCHECK(sid.HasValue());
Taylor Brandstetter3a034e12020-07-09 22:32:34351 RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
352 RTC_DCHECK_EQ(state_, kConnecting);
353
Tommi492296c2023-03-12 15:59:25354 id_ = sid;
Tommi55f72802023-03-27 10:39:33355 if (connected_to_transport_) {
356 network_thread_->BlockingCall(
357 [c = controller_.get(), sid] { c->AddSctpDataStream(sid); });
358 }
Taylor Brandstetter3a034e12020-07-09 22:32:34359}
360
Tommi00264ca2023-03-14 12:21:06361void SctpDataChannel::OnClosingProcedureStartedRemotely() {
Taylor Brandstetter3a034e12020-07-09 22:32:34362 RTC_DCHECK_RUN_ON(signaling_thread_);
Tommi00264ca2023-03-14 12:21:06363 if (state_ != kClosing && state_ != kClosed) {
Taylor Brandstetter3a034e12020-07-09 22:32:34364 // Don't bother sending queued data since the side that initiated the
365 // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
366 // discussion about this.
367 queued_send_data_.Clear();
368 queued_control_data_.Clear();
369 // Just need to change state to kClosing, SctpTransport will handle the
370 // rest of the closing procedure and OnClosingProcedureComplete will be
371 // called later.
372 started_closing_procedure_ = true;
373 SetState(kClosing);
374 }
375}
376
Tommi51edb562023-03-14 08:23:51377void SctpDataChannel::OnClosingProcedureComplete() {
Taylor Brandstetter3a034e12020-07-09 22:32:34378 RTC_DCHECK_RUN_ON(signaling_thread_);
Tommi51edb562023-03-14 08:23:51379 // If the closing procedure is complete, we should have finished sending
380 // all pending data and transitioned to kClosing already.
381 RTC_DCHECK_EQ(state_, kClosing);
382 RTC_DCHECK(queued_send_data_.Empty());
Tommi51edb562023-03-14 08:23:51383 SetState(kClosed);
Taylor Brandstetter3a034e12020-07-09 22:32:34384}
385
386void SctpDataChannel::OnTransportChannelCreated() {
387 RTC_DCHECK_RUN_ON(signaling_thread_);
Tommie9aa8672023-03-20 13:43:09388 RTC_DCHECK(controller_);
389
390 connected_to_transport_ = true;
391
Harald Alvestrand9e5aeb92022-05-11 09:35:36392 // The sid may have been unassigned when controller_->ConnectDataChannel was
393 // done. So always add the streams even if connected_to_transport_ is true.
Tommi55f72802023-03-27 10:39:33394 if (id_.HasValue() && connected_to_transport_) {
395 network_thread_->BlockingCall(
396 [c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
Taylor Brandstetter3a034e12020-07-09 22:32:34397 }
398}
399
Florent Castellidcb9ffc2021-06-29 12:58:23400void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
Tommi492296c2023-03-12 15:59:25401 // The SctpTransport is unusable, which could come from multiple reasons:
Florent Castellidcb9ffc2021-06-29 12:58:23402 // - the SCTP m= section was rejected
403 // - the DTLS transport is closed
404 // - the SCTP transport is closed
Taylor Brandstetter3a034e12020-07-09 22:32:34405 CloseAbruptlyWithError(std::move(error));
406}
407
408DataChannelStats SctpDataChannel::GetStats() const {
409 RTC_DCHECK_RUN_ON(signaling_thread_);
410 DataChannelStats stats{internal_id_, id(), label(),
411 protocol(), state(), messages_sent(),
412 messages_received(), bytes_sent(), bytes_received()};
413 return stats;
414}
415
Tommi4e1c9572023-03-15 11:36:20416void SctpDataChannel::OnDataReceived(DataMessageType type,
Taylor Brandstetter3a034e12020-07-09 22:32:34417 const rtc::CopyOnWriteBuffer& payload) {
418 RTC_DCHECK_RUN_ON(signaling_thread_);
Taylor Brandstetter3a034e12020-07-09 22:32:34419
Tommi4e1c9572023-03-15 11:36:20420 if (type == DataMessageType::kControl) {
Taylor Brandstetter3a034e12020-07-09 22:32:34421 if (handshake_state_ != kHandshakeWaitingForAck) {
422 // Ignore it if we are not expecting an ACK message.
423 RTC_LOG(LS_WARNING)
424 << "DataChannel received unexpected CONTROL message, sid = "
Tommi4e1c9572023-03-15 11:36:20425 << id_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34426 return;
427 }
428 if (ParseDataChannelOpenAckMessage(payload)) {
429 // We can send unordered as soon as we receive the ACK message.
430 handshake_state_ = kHandshakeReady;
431 RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
Tommi4e1c9572023-03-15 11:36:20432 << id_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34433 } else {
434 RTC_LOG(LS_WARNING)
435 << "DataChannel failed to parse OPEN_ACK message, sid = "
Tommi4e1c9572023-03-15 11:36:20436 << id_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34437 }
438 return;
439 }
440
Tommi4e1c9572023-03-15 11:36:20441 RTC_DCHECK(type == DataMessageType::kBinary ||
442 type == DataMessageType::kText);
Taylor Brandstetter3a034e12020-07-09 22:32:34443
Tommi934a88a2023-03-15 13:34:56444 RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
445 << id_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34446 // We can send unordered as soon as we receive any DATA message since the
447 // remote side must have received the OPEN (and old clients do not send
448 // OPEN_ACK).
449 if (handshake_state_ == kHandshakeWaitingForAck) {
450 handshake_state_ = kHandshakeReady;
451 }
452
Tommi4e1c9572023-03-15 11:36:20453 bool binary = (type == DataMessageType::kBinary);
Taylor Brandstetter3a034e12020-07-09 22:32:34454 auto buffer = std::make_unique<DataBuffer>(payload, binary);
455 if (state_ == kOpen && observer_) {
456 ++messages_received_;
457 bytes_received_ += buffer->size();
458 observer_->OnMessage(*buffer.get());
459 } else {
460 if (queued_received_data_.byte_count() + payload.size() >
461 kMaxQueuedReceivedDataBytes) {
462 RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
463
464 queued_received_data_.Clear();
465 CloseAbruptlyWithError(
466 RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
467 "Queued received data exceeds the max buffer size."));
468
469 return;
470 }
471 queued_received_data_.PushBack(std::move(buffer));
472 }
473}
474
Tommie9aa8672023-03-20 13:43:09475void SctpDataChannel::OnTransportReady() {
Taylor Brandstetter3a034e12020-07-09 22:32:34476 RTC_DCHECK_RUN_ON(signaling_thread_);
477
Tommie9aa8672023-03-20 13:43:09478 // TODO(tommi, hta): We don't need the `writable_` flag for SCTP datachannels.
479 // Remove it and just rely on `connected_to_transport_` instead.
480 // In practice the transport is configured inside
481 // `PeerConnection::SetupDataChannelTransport_n`, which results in
482 // `SctpDataChannel` getting the OnTransportChannelCreated callback, and then
483 // that's immediately followed by calling `transport->SetDataSink` which is
484 // what triggers the callback to `OnTransportReady()`.
485 // These steps are currently accomplished via two separate PostTask calls to
486 // the signaling thread, but could simply be done in single method call on
487 // the network thread (which incidentally is the thread that we'll need to
488 // be on for the below `Send*` calls, which currently do a BlockingCall
489 // from the signaling thread to the network thread.
490 RTC_DCHECK(connected_to_transport_);
491 writable_ = true;
Taylor Brandstetter3a034e12020-07-09 22:32:34492
493 SendQueuedControlMessages();
494 SendQueuedDataMessages();
495
496 UpdateState();
497}
498
499void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
500 RTC_DCHECK_RUN_ON(signaling_thread_);
501
502 if (state_ == kClosed) {
503 return;
504 }
505
Tommie9aa8672023-03-20 13:43:09506 connected_to_transport_ = false;
Taylor Brandstetter3a034e12020-07-09 22:32:34507
508 // Closing abruptly means any queued data gets thrown away.
Taylor Brandstetter3a034e12020-07-09 22:32:34509 queued_send_data_.Clear();
510 queued_control_data_.Clear();
511
512 // Still go to "kClosing" before "kClosed", since observers may be expecting
513 // that.
514 SetState(kClosing);
515 error_ = std::move(error);
516 SetState(kClosed);
517}
518
519void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
520 const std::string& message) {
521 RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
522 error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
523 CloseAbruptlyWithError(std::move(error));
524}
525
526void SctpDataChannel::UpdateState() {
527 RTC_DCHECK_RUN_ON(signaling_thread_);
528 // UpdateState determines what to do from a few state variables. Include
529 // all conditions required for each state transition here for
530 // clarity. OnTransportReady(true) will send any queued data and then invoke
531 // UpdateState().
532
533 switch (state_) {
534 case kConnecting: {
Harald Alvestrand9e5aeb92022-05-11 09:35:36535 if (connected_to_transport_) {
Taylor Brandstetter3a034e12020-07-09 22:32:34536 if (handshake_state_ == kHandshakeShouldSendOpen) {
537 rtc::CopyOnWriteBuffer payload;
Tommi492296c2023-03-12 15:59:25538 WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_,
539 max_retransmits_, max_retransmit_time_,
540 &payload);
Taylor Brandstetter3a034e12020-07-09 22:32:34541 SendControlMessage(payload);
542 } else if (handshake_state_ == kHandshakeShouldSendAck) {
543 rtc::CopyOnWriteBuffer payload;
544 WriteDataChannelOpenAckMessage(&payload);
545 SendControlMessage(payload);
546 }
547 if (writable_ && (handshake_state_ == kHandshakeReady ||
548 handshake_state_ == kHandshakeWaitingForAck)) {
549 SetState(kOpen);
550 // If we have received buffers before the channel got writable.
551 // Deliver them now.
552 DeliverQueuedReceivedData();
553 }
Tommie9aa8672023-03-20 13:43:09554 } else {
555 RTC_DCHECK(!id_.HasValue());
Taylor Brandstetter3a034e12020-07-09 22:32:34556 }
557 break;
558 }
559 case kOpen: {
560 break;
561 }
562 case kClosing: {
Tommif21354c2023-03-07 07:43:24563 if (connected_to_transport_) {
564 // Wait for all queued data to be sent before beginning the closing
565 // procedure.
566 if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
567 // For SCTP data channels, we need to wait for the closing procedure
568 // to complete; after calling RemoveSctpDataStream,
569 // OnClosingProcedureComplete will end up called asynchronously
570 // afterwards.
Tommi492296c2023-03-12 15:59:25571 if (!started_closing_procedure_ && controller_ && id_.HasValue()) {
Tommif21354c2023-03-07 07:43:24572 started_closing_procedure_ = true;
Tommi55f72802023-03-27 10:39:33573 network_thread_->BlockingCall([c = controller_.get(), sid = id_] {
574 c->RemoveSctpDataStream(sid);
575 });
Tommif21354c2023-03-07 07:43:24576 }
Taylor Brandstetter3a034e12020-07-09 22:32:34577 }
Tommif21354c2023-03-07 07:43:24578 } else {
579 // When we're not connected to a transport, we'll transition
580 // directly to the `kClosed` state from here.
581 queued_send_data_.Clear();
582 queued_control_data_.Clear();
583 SetState(kClosed);
Taylor Brandstetter3a034e12020-07-09 22:32:34584 }
585 break;
586 }
587 case kClosed:
588 break;
589 }
590}
591
592void SctpDataChannel::SetState(DataState state) {
593 RTC_DCHECK_RUN_ON(signaling_thread_);
594 if (state_ == state) {
595 return;
596 }
597
598 state_ = state;
599 if (observer_) {
600 observer_->OnStateChange();
601 }
Tommid2afbaf2023-03-02 09:51:16602
Tommi1c0d91f2023-03-02 14:42:06603 if (controller_)
Tommid2afbaf2023-03-02 09:51:16604 controller_->OnChannelStateChanged(this, state_);
Taylor Brandstetter3a034e12020-07-09 22:32:34605}
606
Taylor Brandstetter3a034e12020-07-09 22:32:34607void SctpDataChannel::DeliverQueuedReceivedData() {
608 RTC_DCHECK_RUN_ON(signaling_thread_);
609 if (!observer_) {
610 return;
611 }
612
613 while (!queued_received_data_.Empty()) {
614 std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
615 ++messages_received_;
616 bytes_received_ += buffer->size();
617 observer_->OnMessage(*buffer);
618 }
619}
620
621void SctpDataChannel::SendQueuedDataMessages() {
622 RTC_DCHECK_RUN_ON(signaling_thread_);
623 if (queued_send_data_.Empty()) {
624 return;
625 }
626
627 RTC_DCHECK(state_ == kOpen || state_ == kClosing);
628
629 while (!queued_send_data_.Empty()) {
630 std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
631 if (!SendDataMessage(*buffer, false)) {
632 // Return the message to the front of the queue if sending is aborted.
633 queued_send_data_.PushFront(std::move(buffer));
634 break;
635 }
636 }
637}
638
639bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
640 bool queue_if_blocked) {
641 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castellid95b1492021-05-10 09:29:56642 SendDataParams send_params;
Tommi1c0d91f2023-03-02 14:42:06643 if (!controller_) {
Harald Alvestrand9e5aeb92022-05-11 09:35:36644 return false;
645 }
Taylor Brandstetter3a034e12020-07-09 22:32:34646
Tommi492296c2023-03-12 15:59:25647 send_params.ordered = ordered_;
Taylor Brandstetter3a034e12020-07-09 22:32:34648 // Send as ordered if it is still going through OPEN/ACK signaling.
Tommi492296c2023-03-12 15:59:25649 if (handshake_state_ != kHandshakeReady && !ordered_) {
Taylor Brandstetter3a034e12020-07-09 22:32:34650 send_params.ordered = true;
Tommi934a88a2023-03-15 13:34:56651 RTC_DLOG(LS_VERBOSE)
Taylor Brandstetter3a034e12020-07-09 22:32:34652 << "Sending data as ordered for unordered DataChannel "
653 "because the OPEN_ACK message has not been received.";
654 }
655
Tommi492296c2023-03-12 15:59:25656 send_params.max_rtx_count = max_retransmits_;
657 send_params.max_rtx_ms = max_retransmit_time_;
Florent Castellid95b1492021-05-10 09:29:56658 send_params.type =
659 buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
Taylor Brandstetter3a034e12020-07-09 22:32:34660
Tommi1fabbac2023-03-21 13:48:51661 RTCError error = controller_->SendData(id_, send_params, buffer.data);
Taylor Brandstetter3a034e12020-07-09 22:32:34662
Tommi1fabbac2023-03-21 13:48:51663 if (error.ok()) {
Taylor Brandstetter3a034e12020-07-09 22:32:34664 ++messages_sent_;
665 bytes_sent_ += buffer.size();
666
Taylor Brandstetter3a034e12020-07-09 22:32:34667 if (observer_ && buffer.size() > 0) {
668 observer_->OnBufferedAmountChange(buffer.size());
669 }
670 return true;
671 }
672
Tommi1fabbac2023-03-21 13:48:51673 if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
Taylor Brandstetter3a034e12020-07-09 22:32:34674 if (!queue_if_blocked || QueueSendDataMessage(buffer)) {
675 return false;
676 }
677 }
678 // Close the channel if the error is not SDR_BLOCK, or if queuing the
679 // message failed.
680 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
681 "send_result = "
Tommi1fabbac2023-03-21 13:48:51682 << ToString(error.type()) << ":" << error.message();
Taylor Brandstetter3a034e12020-07-09 22:32:34683 CloseAbruptlyWithError(
684 RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
685
686 return false;
687}
688
689bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
690 RTC_DCHECK_RUN_ON(signaling_thread_);
691 size_t start_buffered_amount = queued_send_data_.byte_count();
Florent Castellia563a2a2021-10-18 09:46:21692 if (start_buffered_amount + buffer.size() >
693 DataChannelInterface::MaxSendQueueSize()) {
Taylor Brandstetter3a034e12020-07-09 22:32:34694 RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
695 return false;
696 }
697 queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
698 return true;
699}
700
701void SctpDataChannel::SendQueuedControlMessages() {
702 RTC_DCHECK_RUN_ON(signaling_thread_);
703 PacketQueue control_packets;
704 control_packets.Swap(&queued_control_data_);
705
706 while (!control_packets.Empty()) {
707 std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
708 SendControlMessage(buf->data);
709 }
710}
711
712void SctpDataChannel::QueueControlMessage(
713 const rtc::CopyOnWriteBuffer& buffer) {
714 RTC_DCHECK_RUN_ON(signaling_thread_);
715 queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
716}
717
718bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
719 RTC_DCHECK_RUN_ON(signaling_thread_);
720 RTC_DCHECK(writable_);
Tommie9aa8672023-03-20 13:43:09721 RTC_DCHECK(connected_to_transport_);
Tommi492296c2023-03-12 15:59:25722 RTC_DCHECK(id_.HasValue());
Taylor Brandstetter3a034e12020-07-09 22:32:34723
Tommi1c0d91f2023-03-02 14:42:06724 if (!controller_) {
Harald Alvestrand9e5aeb92022-05-11 09:35:36725 return false;
726 }
Taylor Brandstetter3a034e12020-07-09 22:32:34727 bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
Tommi492296c2023-03-12 15:59:25728 RTC_DCHECK(!is_open_message || !negotiated_);
Taylor Brandstetter3a034e12020-07-09 22:32:34729
Florent Castellid95b1492021-05-10 09:29:56730 SendDataParams send_params;
Taylor Brandstetter3a034e12020-07-09 22:32:34731 // Send data as ordered before we receive any message from the remote peer to
732 // make sure the remote peer will not receive any data before it receives the
733 // OPEN message.
Tommi492296c2023-03-12 15:59:25734 send_params.ordered = ordered_ || is_open_message;
Florent Castellid95b1492021-05-10 09:29:56735 send_params.type = DataMessageType::kControl;
Taylor Brandstetter3a034e12020-07-09 22:32:34736
Tommi1fabbac2023-03-21 13:48:51737 RTCError err = controller_->SendData(id_, send_params, buffer);
738 if (err.ok()) {
Tommi934a88a2023-03-15 13:34:56739 RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
740 << id_.stream_id_int();
Taylor Brandstetter3a034e12020-07-09 22:32:34741
742 if (handshake_state_ == kHandshakeShouldSendAck) {
743 handshake_state_ = kHandshakeReady;
744 } else if (handshake_state_ == kHandshakeShouldSendOpen) {
745 handshake_state_ = kHandshakeWaitingForAck;
746 }
Tommi1fabbac2023-03-21 13:48:51747 } else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
Taylor Brandstetter3a034e12020-07-09 22:32:34748 QueueControlMessage(buffer);
749 } else {
750 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
751 " the CONTROL message, send_result = "
Tommi1fabbac2023-03-21 13:48:51752 << ToString(err.type());
753 err.set_message("Failed to send a CONTROL message");
754 CloseAbruptlyWithError(err);
Taylor Brandstetter3a034e12020-07-09 22:32:34755 }
Tommi1fabbac2023-03-21 13:48:51756 return err.ok();
Taylor Brandstetter3a034e12020-07-09 22:32:34757}
758
759// static
760void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
761 g_unique_id = new_value;
762}
763
Harald Alvestrand9e5aeb92022-05-11 09:35:36764SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
765 DataChannelInterface* channel) {
766 return static_cast<SctpDataChannel*>(
767 static_cast<DataChannelProxy*>(channel)->internal());
768}
769
Taylor Brandstetter3a034e12020-07-09 22:32:34770} // namespace webrtc