| /* |
| * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| #include "net/dcsctp/socket/stream_reset_handler.h" |
| |
| #include <cstdint> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/types/optional.h" |
| #include "api/array_view.h" |
| #include "net/dcsctp/common/internal_types.h" |
| #include "net/dcsctp/common/str_join.h" |
| #include "net/dcsctp/packet/chunk/reconfig_chunk.h" |
| #include "net/dcsctp/packet/parameter/add_incoming_streams_request_parameter.h" |
| #include "net/dcsctp/packet/parameter/add_outgoing_streams_request_parameter.h" |
| #include "net/dcsctp/packet/parameter/incoming_ssn_reset_request_parameter.h" |
| #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" |
| #include "net/dcsctp/packet/parameter/parameter.h" |
| #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" |
| #include "net/dcsctp/packet/parameter/ssn_tsn_reset_request_parameter.h" |
| #include "net/dcsctp/packet/sctp_packet.h" |
| #include "net/dcsctp/packet/tlv_trait.h" |
| #include "net/dcsctp/public/dcsctp_socket.h" |
| #include "net/dcsctp/rx/data_tracker.h" |
| #include "net/dcsctp/rx/reassembly_queue.h" |
| #include "net/dcsctp/socket/context.h" |
| #include "net/dcsctp/timer/timer.h" |
| #include "net/dcsctp/tx/retransmission_queue.h" |
| #include "rtc_base/logging.h" |
| |
| namespace dcsctp { |
| namespace { |
| using ResponseResult = ReconfigurationResponseParameter::Result; |
| |
| bool DescriptorsAre(const std::vector<ParameterDescriptor>& c, |
| uint16_t e1, |
| uint16_t e2) { |
| return (c[0].type == e1 && c[1].type == e2) || |
| (c[0].type == e2 && c[1].type == e1); |
| } |
| |
| } // namespace |
| |
| bool StreamResetHandler::Validate(const ReConfigChunk& chunk) { |
| const Parameters& parameters = chunk.parameters(); |
| |
| // https://tools.ietf.org/html/rfc6525#section-3.1 |
| // "Note that each RE-CONFIG chunk holds at least one parameter |
| // and at most two parameters. Only the following combinations are allowed:" |
| std::vector<ParameterDescriptor> descriptors = parameters.descriptors(); |
| if (descriptors.size() == 1) { |
| if ((descriptors[0].type == OutgoingSSNResetRequestParameter::kType) || |
| (descriptors[0].type == IncomingSSNResetRequestParameter::kType) || |
| (descriptors[0].type == SSNTSNResetRequestParameter::kType) || |
| (descriptors[0].type == AddOutgoingStreamsRequestParameter::kType) || |
| (descriptors[0].type == AddIncomingStreamsRequestParameter::kType) || |
| (descriptors[0].type == ReconfigurationResponseParameter::kType)) { |
| return true; |
| } |
| } else if (descriptors.size() == 2) { |
| if (DescriptorsAre(descriptors, OutgoingSSNResetRequestParameter::kType, |
| IncomingSSNResetRequestParameter::kType) || |
| DescriptorsAre(descriptors, AddOutgoingStreamsRequestParameter::kType, |
| AddIncomingStreamsRequestParameter::kType) || |
| DescriptorsAre(descriptors, ReconfigurationResponseParameter::kType, |
| OutgoingSSNResetRequestParameter::kType) || |
| DescriptorsAre(descriptors, ReconfigurationResponseParameter::kType, |
| ReconfigurationResponseParameter::kType)) { |
| return true; |
| } |
| } |
| |
| RTC_LOG(LS_WARNING) << "Invalid set of RE-CONFIG parameters"; |
| return false; |
| } |
| |
| absl::optional<std::vector<ReconfigurationResponseParameter>> |
| StreamResetHandler::Process(const ReConfigChunk& chunk) { |
| if (!Validate(chunk)) { |
| return absl::nullopt; |
| } |
| |
| std::vector<ReconfigurationResponseParameter> responses; |
| |
| for (const ParameterDescriptor& desc : chunk.parameters().descriptors()) { |
| switch (desc.type) { |
| case OutgoingSSNResetRequestParameter::kType: |
| HandleResetOutgoing(desc, responses); |
| break; |
| |
| case IncomingSSNResetRequestParameter::kType: |
| HandleResetIncoming(desc, responses); |
| break; |
| |
| case ReconfigurationResponseParameter::kType: |
| HandleResponse(desc); |
| break; |
| } |
| } |
| |
| return responses; |
| } |
| |
| void StreamResetHandler::HandleReConfig(ReConfigChunk chunk) { |
| absl::optional<std::vector<ReconfigurationResponseParameter>> responses = |
| Process(chunk); |
| |
| if (!responses.has_value()) { |
| ctx_->callbacks().OnError(ErrorKind::kParseFailed, |
| "Failed to parse RE-CONFIG command"); |
| return; |
| } |
| |
| if (!responses->empty()) { |
| SctpPacket::Builder b = ctx_->PacketBuilder(); |
| Parameters::Builder params_builder; |
| for (const auto& response : *responses) { |
| params_builder.Add(response); |
| } |
| b.Add(ReConfigChunk(params_builder.Build())); |
| ctx_->Send(b); |
| } |
| } |
| |
| bool StreamResetHandler::ValidateReqSeqNbr( |
| ReconfigRequestSN req_seq_nbr, |
| std::vector<ReconfigurationResponseParameter>& responses) { |
| if (req_seq_nbr == last_processed_req_seq_nbr_) { |
| // https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.1 "If the |
| // received RE-CONFIG chunk contains at least one request and based on the |
| // analysis of the Re-configuration Request Sequence Numbers this is the |
| // last received RE-CONFIG chunk (i.e., a retransmission), the same |
| // RE-CONFIG chunk MUST to be sent back in response, as it was earlier." |
| RTC_DLOG(LS_VERBOSE) << log_prefix_ << "req=" << *req_seq_nbr |
| << " already processed, returning result=" |
| << ToString(last_processed_req_result_); |
| responses.push_back(ReconfigurationResponseParameter( |
| req_seq_nbr, last_processed_req_result_)); |
| return false; |
| } |
| |
| if (req_seq_nbr != ReconfigRequestSN(*last_processed_req_seq_nbr_ + 1)) { |
| // Too old, too new, from wrong association etc. |
| // This is expected to happen when handing over a RTCPeerConnection from one |
| // server to another. The client will notice this and may decide to close |
| // old data channels, which may be sent to the wrong (or both) servers |
| // during a handover. |
| RTC_DLOG(LS_VERBOSE) << log_prefix_ << "req=" << *req_seq_nbr |
| << " bad seq_nbr"; |
| responses.push_back(ReconfigurationResponseParameter( |
| req_seq_nbr, ResponseResult::kErrorBadSequenceNumber)); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| void StreamResetHandler::HandleResetOutgoing( |
| const ParameterDescriptor& descriptor, |
| std::vector<ReconfigurationResponseParameter>& responses) { |
| absl::optional<OutgoingSSNResetRequestParameter> req = |
| OutgoingSSNResetRequestParameter::Parse(descriptor.data); |
| if (!req.has_value()) { |
| ctx_->callbacks().OnError(ErrorKind::kParseFailed, |
| "Failed to parse Outgoing Reset command"); |
| return; |
| } |
| |
| if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) { |
| RTC_DLOG(LS_VERBOSE) << log_prefix_ |
| << "Reset outgoing streams with req_seq_nbr=" |
| << *req->request_sequence_number(); |
| |
| last_processed_req_seq_nbr_ = req->request_sequence_number(); |
| last_processed_req_result_ = reassembly_queue_->ResetStreams( |
| *req, data_tracker_->last_cumulative_acked_tsn()); |
| if (last_processed_req_result_ == ResponseResult::kSuccessPerformed) { |
| ctx_->callbacks().OnIncomingStreamsReset(req->stream_ids()); |
| } |
| responses.push_back(ReconfigurationResponseParameter( |
| req->request_sequence_number(), last_processed_req_result_)); |
| } |
| } |
| |
| void StreamResetHandler::HandleResetIncoming( |
| const ParameterDescriptor& descriptor, |
| std::vector<ReconfigurationResponseParameter>& responses) { |
| absl::optional<IncomingSSNResetRequestParameter> req = |
| IncomingSSNResetRequestParameter::Parse(descriptor.data); |
| if (!req.has_value()) { |
| ctx_->callbacks().OnError(ErrorKind::kParseFailed, |
| "Failed to parse Incoming Reset command"); |
| return; |
| } |
| if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) { |
| responses.push_back(ReconfigurationResponseParameter( |
| req->request_sequence_number(), ResponseResult::kSuccessNothingToDo)); |
| last_processed_req_seq_nbr_ = req->request_sequence_number(); |
| } |
| } |
| |
| void StreamResetHandler::HandleResponse(const ParameterDescriptor& descriptor) { |
| absl::optional<ReconfigurationResponseParameter> resp = |
| ReconfigurationResponseParameter::Parse(descriptor.data); |
| if (!resp.has_value()) { |
| ctx_->callbacks().OnError( |
| ErrorKind::kParseFailed, |
| "Failed to parse Reconfiguration Response command"); |
| return; |
| } |
| |
| if (current_request_.has_value() && current_request_->has_been_sent() && |
| resp->response_sequence_number() == current_request_->req_seq_nbr()) { |
| reconfig_timer_->Stop(); |
| |
| switch (resp->result()) { |
| case ResponseResult::kSuccessNothingToDo: |
| case ResponseResult::kSuccessPerformed: |
| RTC_DLOG(LS_VERBOSE) |
| << log_prefix_ << "Reset stream success, req_seq_nbr=" |
| << *current_request_->req_seq_nbr() << ", streams=" |
| << StrJoin(current_request_->streams(), ",", |
| [](rtc::StringBuilder& sb, StreamID stream_id) { |
| sb << *stream_id; |
| }); |
| ctx_->callbacks().OnStreamsResetPerformed(current_request_->streams()); |
| current_request_ = absl::nullopt; |
| retransmission_queue_->CommitResetStreams(); |
| break; |
| case ResponseResult::kInProgress: |
| RTC_DLOG(LS_VERBOSE) |
| << log_prefix_ << "Reset stream still pending, req_seq_nbr=" |
| << *current_request_->req_seq_nbr() << ", streams=" |
| << StrJoin(current_request_->streams(), ",", |
| [](rtc::StringBuilder& sb, StreamID stream_id) { |
| sb << *stream_id; |
| }); |
| // Force this request to be sent again, but with new req_seq_nbr. |
| current_request_->PrepareRetransmission(); |
| reconfig_timer_->set_duration(ctx_->current_rto()); |
| reconfig_timer_->Start(); |
| break; |
| case ResponseResult::kErrorRequestAlreadyInProgress: |
| case ResponseResult::kDenied: |
| case ResponseResult::kErrorWrongSSN: |
| case ResponseResult::kErrorBadSequenceNumber: |
| RTC_DLOG(LS_WARNING) |
| << log_prefix_ << "Reset stream error=" << ToString(resp->result()) |
| << ", req_seq_nbr=" << *current_request_->req_seq_nbr() |
| << ", streams=" |
| << StrJoin(current_request_->streams(), ",", |
| [](rtc::StringBuilder& sb, StreamID stream_id) { |
| sb << *stream_id; |
| }); |
| ctx_->callbacks().OnStreamsResetFailed(current_request_->streams(), |
| ToString(resp->result())); |
| current_request_ = absl::nullopt; |
| retransmission_queue_->RollbackResetStreams(); |
| break; |
| } |
| } |
| } |
| |
| absl::optional<ReConfigChunk> StreamResetHandler::MakeStreamResetRequest() { |
| // Only send stream resets if there are streams to reset, and no current |
| // ongoing request (there can only be one at a time), and if the stream |
| // can be reset. |
| if (current_request_.has_value() || |
| !retransmission_queue_->HasStreamsReadyToBeReset()) { |
| return absl::nullopt; |
| } |
| |
| current_request_.emplace(TSN(*retransmission_queue_->next_tsn() - 1), |
| retransmission_queue_->GetStreamsReadyToBeReset()); |
| reconfig_timer_->set_duration(ctx_->current_rto()); |
| reconfig_timer_->Start(); |
| return MakeReconfigChunk(); |
| } |
| |
| ReConfigChunk StreamResetHandler::MakeReconfigChunk() { |
| // The req_seq_nbr will be empty if the request has never been sent before, |
| // or if it was sent, but the sender responded "in progress", and then the |
| // req_seq_nbr will be cleared to re-send with a new number. But if the |
| // request is re-sent due to timeout (reconfig-timer expiring), the same |
| // req_seq_nbr will be used. |
| RTC_DCHECK(current_request_.has_value()); |
| |
| if (!current_request_->has_been_sent()) { |
| current_request_->PrepareToSend(next_outgoing_req_seq_nbr_); |
| next_outgoing_req_seq_nbr_ = |
| ReconfigRequestSN(*next_outgoing_req_seq_nbr_ + 1); |
| } |
| |
| Parameters::Builder params_builder = |
| Parameters::Builder().Add(OutgoingSSNResetRequestParameter( |
| current_request_->req_seq_nbr(), current_request_->req_seq_nbr(), |
| current_request_->sender_last_assigned_tsn(), |
| current_request_->streams())); |
| |
| return ReConfigChunk(params_builder.Build()); |
| } |
| |
| void StreamResetHandler::ResetStreams( |
| rtc::ArrayView<const StreamID> outgoing_streams) { |
| for (StreamID stream_id : outgoing_streams) { |
| retransmission_queue_->PrepareResetStream(stream_id); |
| } |
| } |
| |
| absl::optional<DurationMs> StreamResetHandler::OnReconfigTimerExpiry() { |
| if (current_request_->has_been_sent()) { |
| // There is an outstanding request, which timed out while waiting for a |
| // response. |
| if (!ctx_->IncrementTxErrorCounter("RECONFIG timeout")) { |
| // Timed out. The connection will close after processing the timers. |
| return absl::nullopt; |
| } |
| } else { |
| // There is no outstanding request, but there is a prepared one. This means |
| // that the receiver has previously responded "in progress", which resulted |
| // in retrying the request (but with a new req_seq_nbr) after a while. |
| } |
| |
| ctx_->Send(ctx_->PacketBuilder().Add(MakeReconfigChunk())); |
| return ctx_->current_rto(); |
| } |
| |
| HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const { |
| HandoverReadinessStatus status; |
| if (retransmission_queue_->HasStreamsReadyToBeReset()) { |
| status.Add(HandoverUnreadinessReason::kPendingStreamReset); |
| } |
| if (current_request_.has_value()) { |
| status.Add(HandoverUnreadinessReason::kPendingStreamResetRequest); |
| } |
| return status; |
| } |
| |
| void StreamResetHandler::AddHandoverState(DcSctpSocketHandoverState& state) { |
| state.rx.last_completed_reset_req_sn = last_processed_req_seq_nbr_.value(); |
| state.tx.next_reset_req_sn = next_outgoing_req_seq_nbr_.value(); |
| } |
| |
| } // namespace dcsctp |