/*
 *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "pc/data_channel_controller.h"

#include <utility>

#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/peer_connection_interface.h"
#include "api/rtc_error.h"
#include "pc/peer_connection_internal.h"
#include "pc/sctp_utils.h"
#include "rtc_base/logging.h"

namespace webrtc {

DataChannelController::~DataChannelController() {
  RTC_DCHECK(sctp_data_channels_n_.empty())
      << "Missing call to TeardownDataChannelTransport_n?";
  RTC_DCHECK(!signaling_safety_.flag()->alive())
      << "Missing call to PrepareForShutdown?";
}

bool DataChannelController::HasDataChannels() const {
  RTC_DCHECK_RUN_ON(signaling_thread());
  return channel_usage_ == DataChannelUsage::kInUse;
}

bool DataChannelController::HasUsedDataChannels() const {
  RTC_DCHECK_RUN_ON(signaling_thread());
  return channel_usage_ != DataChannelUsage::kNeverUsed;
}

RTCError DataChannelController::SendData(
    StreamId sid,
    const SendDataParams& params,
    const rtc::CopyOnWriteBuffer& payload) {
  RTC_DCHECK_RUN_ON(network_thread());
  if (!data_channel_transport_) {
    RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
    return RTCError(RTCErrorType::INVALID_STATE);
  }
  return data_channel_transport_->SendData(sid.stream_id_int(), params,
                                           payload);
}

void DataChannelController::AddSctpDataStream(StreamId sid) {
  RTC_DCHECK_RUN_ON(network_thread());
  if (data_channel_transport_) {
    data_channel_transport_->OpenChannel(sid.stream_id_int());
  }
}

void DataChannelController::RemoveSctpDataStream(StreamId sid) {
  RTC_DCHECK_RUN_ON(network_thread());
  if (data_channel_transport_) {
    data_channel_transport_->CloseChannel(sid.stream_id_int());
  }
}

void DataChannelController::OnChannelStateChanged(
    SctpDataChannel* channel,
    DataChannelInterface::DataState state) {
  RTC_DCHECK_RUN_ON(network_thread());

  // Stash away the internal id here in case `OnSctpDataChannelClosed` ends up
  // releasing the last reference to the channel.
  const int channel_id = channel->internal_id();

  if (state == DataChannelInterface::DataState::kClosed)
    OnSctpDataChannelClosed(channel);

  DataChannelUsage channel_usage = sctp_data_channels_n_.empty()
                                       ? DataChannelUsage::kHaveBeenUsed
                                       : DataChannelUsage::kInUse;
  signaling_thread()->PostTask(SafeTask(
      signaling_safety_.flag(), [this, channel_id, state, channel_usage] {
        RTC_DCHECK_RUN_ON(signaling_thread());
        channel_usage_ = channel_usage;
        pc_->OnSctpDataChannelStateChanged(channel_id, state);
      }));
}

size_t DataChannelController::buffered_amount(StreamId sid) const {
  RTC_DCHECK_RUN_ON(network_thread());
  if (!data_channel_transport_) {
    return 0;
  }
  return data_channel_transport_->buffered_amount(sid.stream_id_int());
}

size_t DataChannelController::buffered_amount_low_threshold(
    StreamId sid) const {
  RTC_DCHECK_RUN_ON(network_thread());
  if (!data_channel_transport_) {
    return 0;
  }
  return data_channel_transport_->buffered_amount_low_threshold(
      sid.stream_id_int());
}

void DataChannelController::SetBufferedAmountLowThreshold(StreamId sid,
                                                          size_t bytes) {
  RTC_DCHECK_RUN_ON(network_thread());
  if (!data_channel_transport_) {
    return;
  }
  data_channel_transport_->SetBufferedAmountLowThreshold(sid.stream_id_int(),
                                                         bytes);
}

void DataChannelController::OnDataReceived(
    int channel_id,
    DataMessageType type,
    const rtc::CopyOnWriteBuffer& buffer) {
  RTC_DCHECK_RUN_ON(network_thread());

  if (HandleOpenMessage_n(channel_id, type, buffer))
    return;

  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
    return c->sid_n().has_value() && c->sid_n()->stream_id_int() == channel_id;
  });

  if (it != sctp_data_channels_n_.end())
    (*it)->OnDataReceived(type, buffer);
}

void DataChannelController::OnChannelClosing(int channel_id) {
  RTC_DCHECK_RUN_ON(network_thread());
  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
    return c->sid_n().has_value() && c->sid_n()->stream_id_int() == channel_id;
  });

  if (it != sctp_data_channels_n_.end())
    (*it)->OnClosingProcedureStartedRemotely();
}

void DataChannelController::OnChannelClosed(int channel_id) {
  RTC_DCHECK_RUN_ON(network_thread());
  StreamId sid(channel_id);
  sid_allocator_.ReleaseSid(sid);
  auto it = absl::c_find_if(sctp_data_channels_n_,
                            [&](const auto& c) { return c->sid_n() == sid; });

  if (it != sctp_data_channels_n_.end()) {
    rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
    sctp_data_channels_n_.erase(it);
    channel->OnClosingProcedureComplete();
  }
}

void DataChannelController::OnReadyToSend() {
  RTC_DCHECK_RUN_ON(network_thread());
  auto copy = sctp_data_channels_n_;
  for (const auto& channel : copy) {
    if (channel->sid_n().has_value()) {
      channel->OnTransportReady();
    } else {
      // This happens for role==SSL_SERVER channels when we get notified by
      // the transport *before* the SDP code calls `AllocateSctpSids` to
      // trigger assignment of sids. In this case OnTransportReady() will be
      // called from within `AllocateSctpSids` below.
      RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
    }
  }
}

void DataChannelController::OnTransportClosed(RTCError error) {
  RTC_DCHECK_RUN_ON(network_thread());

  // This loop will close all data channels and trigger a callback to
  // `OnSctpDataChannelClosed`. We'll empty `sctp_data_channels_n_`, first
  // and `OnSctpDataChannelClosed` will become a noop but we'll release the
  // StreamId here.
  std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
  temp_sctp_dcs.swap(sctp_data_channels_n_);
  for (const auto& channel : temp_sctp_dcs) {
    channel->OnTransportChannelClosed(error);
    if (channel->sid_n().has_value()) {
      sid_allocator_.ReleaseSid(*channel->sid_n());
    }
  }
}

void DataChannelController::OnBufferedAmountLow(int channel_id) {
  RTC_DCHECK_RUN_ON(network_thread());
  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
    return c->sid_n().has_value() && c->sid_n()->stream_id_int() == channel_id;
  });

  if (it != sctp_data_channels_n_.end())
    (*it)->OnBufferedAmountLow();
}

void DataChannelController::SetupDataChannelTransport_n(
    DataChannelTransportInterface* transport) {
  RTC_DCHECK_RUN_ON(network_thread());
  RTC_DCHECK(transport);
  set_data_channel_transport(transport);
}

void DataChannelController::PrepareForShutdown() {
  RTC_DCHECK_RUN_ON(signaling_thread());
  signaling_safety_.reset(PendingTaskSafetyFlag::CreateDetachedInactive());
  if (channel_usage_ != DataChannelUsage::kNeverUsed)
    channel_usage_ = DataChannelUsage::kHaveBeenUsed;
}

void DataChannelController::TeardownDataChannelTransport_n(RTCError error) {
  RTC_DCHECK_RUN_ON(network_thread());
  OnTransportClosed(error);
  set_data_channel_transport(nullptr);
  RTC_DCHECK(sctp_data_channels_n_.empty());
  weak_factory_.InvalidateWeakPtrs();
}

void DataChannelController::OnTransportChanged(
    DataChannelTransportInterface* new_data_channel_transport) {
  RTC_DCHECK_RUN_ON(network_thread());
  if (data_channel_transport_ &&
      data_channel_transport_ != new_data_channel_transport) {
    // Changed which data channel transport is used for `sctp_mid_` (eg. now
    // it's bundled).
    set_data_channel_transport(new_data_channel_transport);
  }
}

std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
    const {
  RTC_DCHECK_RUN_ON(network_thread());
  std::vector<DataChannelStats> stats;
  stats.reserve(sctp_data_channels_n_.size());
  for (const auto& channel : sctp_data_channels_n_)
    stats.push_back(channel->GetStats());
  return stats;
}

bool DataChannelController::HandleOpenMessage_n(
    int channel_id,
    DataMessageType type,
    const rtc::CopyOnWriteBuffer& buffer) {
  if (type != DataMessageType::kControl || !IsOpenMessage(buffer))
    return false;

  // Received OPEN message; parse and signal that a new data channel should
  // be created.
  std::string label;
  InternalDataChannelInit config;
  config.id = channel_id;
  if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
    RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for sid "
                        << channel_id;
  } else {
    config.open_handshake_role = InternalDataChannelInit::kAcker;
    auto channel_or_error = CreateDataChannel(label, config);
    if (channel_or_error.ok()) {
      signaling_thread()->PostTask(SafeTask(
          signaling_safety_.flag(),
          [this, channel = channel_or_error.MoveValue(),
           ready_to_send = data_channel_transport_->IsReadyToSend()] {
            RTC_DCHECK_RUN_ON(signaling_thread());
            OnDataChannelOpenMessage(std::move(channel), ready_to_send);
          }));
    } else {
      RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
                        << ToString(channel_or_error.error().type());
    }
  }
  return true;
}

void DataChannelController::OnDataChannelOpenMessage(
    rtc::scoped_refptr<SctpDataChannel> channel,
    bool ready_to_send) {
  channel_usage_ = DataChannelUsage::kInUse;
  auto proxy = SctpDataChannel::CreateProxy(channel, signaling_safety_.flag());

  pc_->Observer()->OnDataChannel(proxy);
  pc_->NoteDataAddedEvent();

  if (ready_to_send) {
    network_thread()->PostTask([channel = std::move(channel)] {
      if (channel->state() != DataChannelInterface::DataState::kClosed)
        channel->OnTransportReady();
    });
  }
}

// RTC_RUN_ON(network_thread())
RTCError DataChannelController::ReserveOrAllocateSid(
    absl::optional<StreamId>& sid,
    absl::optional<rtc::SSLRole> fallback_ssl_role) {
  if (sid.has_value()) {
    return sid_allocator_.ReserveSid(*sid)
               ? RTCError::OK()
               : RTCError(RTCErrorType::INVALID_RANGE, "StreamId reserved.");
  }

  // Attempt to allocate an ID based on the negotiated role.
  absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
  if (!role)
    role = fallback_ssl_role;
  if (role) {
    sid = sid_allocator_.AllocateSid(*role);
    if (!sid.has_value())
      return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
  }
  // When we get here, we may still not have an ID, but that's a supported case
  // whereby an id will be assigned later.
  RTC_DCHECK(sid.has_value() || !role);
  return RTCError::OK();
}

// RTC_RUN_ON(network_thread())
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
DataChannelController::CreateDataChannel(const std::string& label,
                                         InternalDataChannelInit& config) {
  absl::optional<StreamId> sid = absl::nullopt;
  if (config.id != -1) {
    if (config.id < 0 || config.id > cricket::kMaxSctpSid) {
      return RTCError(RTCErrorType::INVALID_RANGE, "StreamId out of range.");
    }
    sid = StreamId(config.id);
  }

  RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
  if (!err.ok())
    return err;

  // In case `sid` has changed. Update `config` accordingly.
  if (sid.has_value()) {
    config.id = sid->stream_id_int();
  }

  rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
      weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
      config, signaling_thread(), network_thread());
  RTC_DCHECK(channel);
  sctp_data_channels_n_.push_back(channel);

  // If we have an id already, notify the transport.
  if (sid.has_value())
    AddSctpDataStream(*sid);

  return channel;
}

RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
DataChannelController::InternalCreateDataChannelWithProxy(
    const std::string& label,
    const InternalDataChannelInit& config) {
  RTC_DCHECK_RUN_ON(signaling_thread());
  RTC_DCHECK(!pc_->IsClosed());
  if (!config.IsValid()) {
    LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER,
                         "Invalid DataChannelInit");
  }

  bool ready_to_send = false;
  InternalDataChannelInit new_config = config;
  auto ret = network_thread()->BlockingCall(
      [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
        RTC_DCHECK_RUN_ON(network_thread());
        auto channel = CreateDataChannel(label, new_config);
        if (!channel.ok())
          return channel;
        ready_to_send =
            data_channel_transport_ && data_channel_transport_->IsReadyToSend();
        if (ready_to_send) {
          // If the transport is ready to send because the initial channel
          // ready signal may have been sent before the DataChannel creation.
          // This has to be done async because the upper layer objects (e.g.
          // Chrome glue and WebKit) are not wired up properly until after
          // `InternalCreateDataChannelWithProxy` returns.
          network_thread()->PostTask([channel = channel.value()] {
            if (channel->state() != DataChannelInterface::DataState::kClosed)
              channel->OnTransportReady();
          });
        }

        return channel;
      });

  if (!ret.ok())
    return ret.MoveError();

  channel_usage_ = DataChannelUsage::kInUse;
  return SctpDataChannel::CreateProxy(ret.MoveValue(),
                                      signaling_safety_.flag());
}

void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
  RTC_DCHECK_RUN_ON(network_thread());

  const bool ready_to_send =
      data_channel_transport_ && data_channel_transport_->IsReadyToSend();

  std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
  std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
  for (auto it = sctp_data_channels_n_.begin();
       it != sctp_data_channels_n_.end();) {
    if (!(*it)->sid_n().has_value()) {
      absl::optional<StreamId> sid = sid_allocator_.AllocateSid(role);
      if (sid.has_value()) {
        (*it)->SetSctpSid_n(*sid);
        AddSctpDataStream(*sid);
        if (ready_to_send) {
          RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
          (*it)->OnTransportReady();
        }
        channels_to_update.push_back(std::make_pair((*it).get(), *sid));
      } else {
        channels_to_close.push_back(std::move(*it));
        it = sctp_data_channels_n_.erase(it);
        continue;
      }
    }
    ++it;
  }

  // Since closing modifies the list of channels, we have to do the actual
  // closing outside the loop.
  for (const auto& channel : channels_to_close) {
    channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
  }
}

void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
  RTC_DCHECK_RUN_ON(network_thread());
  // After the closing procedure is done, it's safe to use this ID for
  // another data channel.
  if (channel->sid_n().has_value()) {
    sid_allocator_.ReleaseSid(*channel->sid_n());
  }
  auto it = absl::c_find_if(sctp_data_channels_n_,
                            [&](const auto& c) { return c.get() == channel; });
  if (it != sctp_data_channels_n_.end())
    sctp_data_channels_n_.erase(it);
}

void DataChannelController::set_data_channel_transport(
    DataChannelTransportInterface* transport) {
  RTC_DCHECK_RUN_ON(network_thread());

  if (data_channel_transport_)
    data_channel_transport_->SetDataSink(nullptr);

  data_channel_transport_ = transport;

  if (data_channel_transport_) {
    // There's a new data channel transport.  This needs to be signaled to the
    // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
    // necessary when bundling is applied.
    NotifyDataChannelsOfTransportCreated();
    data_channel_transport_->SetDataSink(this);
  }
}

void DataChannelController::NotifyDataChannelsOfTransportCreated() {
  RTC_DCHECK_RUN_ON(network_thread());
  RTC_DCHECK(data_channel_transport_);

  for (const auto& channel : sctp_data_channels_n_) {
    if (channel->sid_n().has_value())
      AddSctpDataStream(*channel->sid_n());
    channel->OnTransportChannelCreated();
  }
}

rtc::Thread* DataChannelController::network_thread() const {
  return pc_->network_thread();
}

rtc::Thread* DataChannelController::signaling_thread() const {
  return pc_->signaling_thread();
}

}  // namespace webrtc
