Relanding: Fix data channel message integrity violation

Patch originally submitted by Lennart Grahl:
https://webrtc-review.googlesource.com/c/src/+/177527

SCTP message chunks and notifications are being delivered interleaved.
However, the way the code was structured previously, a notification
would interrupt reassembly of a message chunk and hand out the partial
message, thereby violating message integrity. This patch separates the
handling of notifications and reassembly of messages.

Additional changes:

- Remove illegal cast from non-validated u32 to enum (PPID)
- Drop partial messages if the SID has been changed but EOR not yet
  received instead of delivering them. (This should never happen
  anyway.)
- Don't treat TSN as timestamp (wat)
- Replace "usrsctplib/usrsctp.h" with <usrsctp.h>, allowing a hack
  to be removed from media/BUILD.gn

Bug: webrtc:11708
Change-Id: I29733b03f67a3d840104b8608a7f0083466c2d0f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178469
Commit-Queue: Taylor <deadbeef@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31641}
diff --git a/AUTHORS b/AUTHORS
index 188503e..689220b 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -90,6 +90,7 @@
 Miguel Paris <mparisdiaz@gmail.com>
 Raman Budny <budnyjj@gmail.com>
 Stephan Hartmann <stha09@googlemail.com>
+Lennart Grahl <lennart.grahl@gmail.com>
 
 &yet LLC <*@andyet.com>
 8x8 Inc. <*@sip-communicator.org>
diff --git a/media/BUILD.gn b/media/BUILD.gn
index b6c78fd..f1ead11 100644
--- a/media/BUILD.gn
+++ b/media/BUILD.gn
@@ -429,11 +429,6 @@
   }
 
   if (rtc_enable_sctp && rtc_build_usrsctp) {
-    include_dirs = [
-      # TODO(jiayl): move this into the public_configs of
-      # //third_party/usrsctp/BUILD.gn.
-      "//third_party/usrsctp/usrsctplib",
-    ]
     deps += [ "//third_party/usrsctp" ]
   }
 }
@@ -659,5 +654,9 @@
     if (is_ios) {
       deps += [ ":rtc_media_unittests_bundle_data" ]
     }
+
+    if (rtc_enable_sctp && rtc_build_usrsctp) {
+      deps += [ "//third_party/usrsctp" ]
+    }
   }
 }
diff --git a/media/sctp/sctp_transport.cc b/media/sctp/sctp_transport.cc
index 35824b7..c399dfe 100644
--- a/media/sctp/sctp_transport.cc
+++ b/media/sctp/sctp_transport.cc
@@ -16,10 +16,16 @@
   SCTP_EINPROGRESS = EINPROGRESS,
   SCTP_EWOULDBLOCK = EWOULDBLOCK
 };
+
+// Successful return value from usrsctp callbacks. Is not actually used by
+// usrsctp, but all example programs for usrsctp use 1 as their return value.
+constexpr int kSctpSuccessReturn = 1;
+
 }  // namespace
 
 #include <stdarg.h>
 #include <stdio.h>
+#include <usrsctp.h>
 
 #include <memory>
 #include <unordered_map>
@@ -43,7 +49,6 @@
 #include "rtc_base/thread_annotations.h"
 #include "rtc_base/thread_checker.h"
 #include "rtc_base/trace_event.h"
-#include "usrsctplib/usrsctp.h"
 
 namespace {
 
@@ -62,7 +67,7 @@
 // http://www.iana.org/assignments/sctp-parameters/sctp-parameters.xml
 // The value is not used by SCTP itself. It indicates the protocol running
 // on top of SCTP.
-enum PayloadProtocolIdentifier {
+enum {
   PPID_NONE = 0,  // No protocol is specified.
   // Matches the PPIDs in mozilla source and
   // https://datatracker.ietf.org/doc/draft-ietf-rtcweb-data-protocol Sec. 9
@@ -143,7 +148,7 @@
 }
 
 // Get the PPID to use for the terminating fragment of this type.
-PayloadProtocolIdentifier GetPpid(cricket::DataMessageType type) {
+uint32_t GetPpid(cricket::DataMessageType type) {
   switch (type) {
     default:
     case cricket::DMT_NONE:
@@ -157,8 +162,7 @@
   }
 }
 
-bool GetDataMediaType(PayloadProtocolIdentifier ppid,
-                      cricket::DataMessageType* dest) {
+bool GetDataMediaType(uint32_t ppid, cricket::DataMessageType* dest) {
   RTC_DCHECK(dest != NULL);
   switch (ppid) {
     case PPID_BINARY_PARTIAL:
@@ -382,77 +386,10 @@
                                  int flags,
                                  void* ulp_info) {
     SctpTransport* transport = static_cast<SctpTransport*>(ulp_info);
-    // Post data to the transport's receiver thread (copying it).
-    // TODO(ldixon): Unclear if copy is needed as this method is responsible for
-    // memory cleanup. But this does simplify code.
-    const PayloadProtocolIdentifier ppid =
-        static_cast<PayloadProtocolIdentifier>(
-            rtc::NetworkToHost32(rcv.rcv_ppid));
-    DataMessageType type = DMT_NONE;
-    if (!GetDataMediaType(ppid, &type) && !(flags & MSG_NOTIFICATION)) {
-      // It's neither a notification nor a recognized data packet.  Drop it.
-      RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid
-                        << " on an SCTP packet.  Dropping.";
-      free(data);
-    } else {
-      ReceiveDataParams params;
-
-      params.sid = rcv.rcv_sid;
-      params.seq_num = rcv.rcv_ssn;
-      params.timestamp = rcv.rcv_tsn;
-      params.type = type;
-
-      // Expect only continuation messages belonging to the same sid, the sctp
-      // stack should ensure this.
-      if ((transport->partial_incoming_message_.size() != 0) &&
-          (rcv.rcv_sid != transport->partial_params_.sid)) {
-        // A message with a new sid, but haven't seen the EOR for the
-        // previous message. Deliver the previous partial message to avoid
-        // merging messages from different sid's.
-        transport->invoker_.AsyncInvoke<void>(
-            RTC_FROM_HERE, transport->network_thread_,
-            rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport,
-                      transport, transport->partial_incoming_message_,
-                      transport->partial_params_, transport->partial_flags_));
-
-        transport->partial_incoming_message_.Clear();
-      }
-
-      transport->partial_incoming_message_.AppendData(
-          reinterpret_cast<uint8_t*>(data), length);
-      transport->partial_params_ = params;
-      transport->partial_flags_ = flags;
-
-      free(data);
-
-      // Merge partial messages until they exceed the maximum send buffer size.
-      // This enables messages from a single send to be delivered in a single
-      // callback. Larger messages (originating from other implementations) will
-      // still be delivered in chunks.
-      if (!(flags & MSG_EOR) &&
-          (transport->partial_incoming_message_.size() < kSctpSendBufferSize)) {
-        return 1;
-      }
-
-      if (!(flags & MSG_EOR)) {
-        // TODO(bugs.webrtc.org/7774): We currently chunk messages if they are
-        // >= kSctpSendBufferSize. The better thing to do here is buffer up to
-        // the size negotiated in the SDP, and if a larger message is received
-        // close the channel and report the error. See discussion in the bug.
-        RTC_LOG(LS_WARNING) << "Chunking SCTP message without the EOR bit set.";
-      }
-
-      // The ownership of the packet transfers to |invoker_|. Using
-      // CopyOnWriteBuffer is the most convenient way to do this.
-      transport->invoker_.AsyncInvoke<void>(
-          RTC_FROM_HERE, transport->network_thread_,
-          rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport,
-                    transport, transport->partial_incoming_message_, params,
-                    flags));
-
-      transport->partial_incoming_message_.Clear();
-    }
-    return 1;
+    int result =
+        transport->OnDataOrNotificationFromSctp(data, length, rcv, flags);
+    free(data);
+    return result;
   }
 
   static SctpTransport* GetTransportFromSocket(struct socket* sock) {
@@ -1132,31 +1069,120 @@
                          rtc::PacketOptions(), PF_NORMAL);
 }
 
-void SctpTransport::OnInboundPacketFromSctpToTransport(
-    const rtc::CopyOnWriteBuffer& buffer,
-    ReceiveDataParams params,
+int SctpTransport::InjectDataOrNotificationFromSctpForTesting(
+    void* data,
+    size_t length,
+    struct sctp_rcvinfo rcv,
     int flags) {
-  RTC_DCHECK_RUN_ON(network_thread_);
-  RTC_LOG(LS_VERBOSE) << debug_name_
-                      << "->OnInboundPacketFromSctpToTransport(...): "
-                         "Received SCTP data:"
-                         " sid="
-                      << params.sid
-                      << " notification: " << (flags & MSG_NOTIFICATION)
-                      << " length=" << buffer.size();
-  // Sending a packet with data == NULL (no data) is SCTPs "close the
-  // connection" message. This sets sock_ = NULL;
-  if (!buffer.size() || !buffer.data()) {
+  return OnDataOrNotificationFromSctp(data, length, rcv, flags);
+}
+
+int SctpTransport::OnDataOrNotificationFromSctp(void* data,
+                                                size_t length,
+                                                struct sctp_rcvinfo rcv,
+                                                int flags) {
+  // If data is NULL, the SCTP association has been closed.
+  if (!data) {
     RTC_LOG(LS_INFO) << debug_name_
-                     << "->OnInboundPacketFromSctpToTransport(...): "
+                     << "->OnSctpInboundPacket(...): "
                         "No data, closing.";
-    return;
+    return kSctpSuccessReturn;
   }
+
+  // Handle notifications early.
+  // Note: Notifications are never split into chunks, so they can and should
+  //       be handled early and entirely separate from the reassembly
+  //       process.
   if (flags & MSG_NOTIFICATION) {
-    OnNotificationFromSctp(buffer);
-  } else {
-    OnDataFromSctpToTransport(params, buffer);
+    RTC_LOG(LS_VERBOSE) << debug_name_
+                        << "->OnSctpInboundPacket(...): SCTP notification"
+                        << " length=" << length;
+
+    // Copy and dispatch asynchronously
+    rtc::CopyOnWriteBuffer notification(reinterpret_cast<uint8_t*>(data),
+                                        length);
+    invoker_.AsyncInvoke<void>(
+        RTC_FROM_HERE, network_thread_,
+        rtc::Bind(&SctpTransport::OnNotificationFromSctp, this, notification));
+    return kSctpSuccessReturn;
   }
+
+  // Log data chunk
+  const uint32_t ppid = rtc::NetworkToHost32(rcv.rcv_ppid);
+  RTC_LOG(LS_VERBOSE) << debug_name_
+                      << "->OnSctpInboundPacket(...): SCTP data chunk"
+                      << " length=" << length << ", sid=" << rcv.rcv_sid
+                      << ", ppid=" << ppid << ", ssn=" << rcv.rcv_ssn
+                      << ", cum-tsn=" << rcv.rcv_cumtsn
+                      << ", eor=" << ((flags & MSG_EOR) ? "y" : "n");
+
+  // Validate payload protocol identifier
+  DataMessageType type = DMT_NONE;
+  if (!GetDataMediaType(ppid, &type)) {
+    // Unexpected PPID, dropping
+    RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid
+                      << " on an SCTP packet.  Dropping.";
+    return kSctpSuccessReturn;
+  }
+
+  // Expect only continuation messages belonging to the same SID. The SCTP
+  // stack is expected to ensure this as long as the User Message
+  // Interleaving extension (RFC 8260) is not explicitly enabled, so this
+  // merely acts as a safeguard.
+  if ((partial_incoming_message_.size() != 0) &&
+      (rcv.rcv_sid != partial_params_.sid)) {
+    RTC_LOG(LS_ERROR) << "Received a new SID without EOR in the previous"
+                      << " SCTP packet. Discarding the previous packet.";
+    partial_incoming_message_.Clear();
+  }
+
+  // Copy metadata of interest
+  ReceiveDataParams params;
+  params.type = type;
+  params.sid = rcv.rcv_sid;
+  // Note that the SSN is identical for each chunk of the same message.
+  // Furthermore, it is increased per stream and not on the whole
+  // association.
+  params.seq_num = rcv.rcv_ssn;
+  // There is no timestamp field in the SCTP API
+  params.timestamp = 0;
+
+  // Append the chunk's data to the message buffer
+  partial_incoming_message_.AppendData(reinterpret_cast<uint8_t*>(data),
+                                       length);
+  partial_params_ = params;
+  partial_flags_ = flags;
+
+  // If the message is not yet complete...
+  if (!(flags & MSG_EOR)) {
+    if (partial_incoming_message_.size() < kSctpSendBufferSize) {
+      // We still have space in the buffer. Continue buffering chunks until
+      // the message is complete before handing it out.
+      return kSctpSuccessReturn;
+    } else {
+      // The sender is exceeding the maximum message size that we announced.
+      // Spit out a warning but still hand out the partial message. Note that
+      // this behaviour is undesirable, see the discussion in issue 7774.
+      //
+      // TODO(lgrahl): Once sufficient time has passed and all supported
+      // browser versions obey the announced maximum message size, we should
+      // abort the SCTP association instead to prevent message integrity
+      // violation.
+      RTC_LOG(LS_ERROR) << "Handing out partial SCTP message.";
+    }
+  }
+
+  // Dispatch the complete message.
+  // The ownership of the packet transfers to |invoker_|. Using
+  // CopyOnWriteBuffer is the most convenient way to do this.
+  invoker_.AsyncInvoke<void>(
+      RTC_FROM_HERE, network_thread_,
+      rtc::Bind(&SctpTransport::OnDataFromSctpToTransport, this, params,
+                partial_incoming_message_));
+
+  // Reset the message buffer
+  partial_incoming_message_.Clear();
+  return kSctpSuccessReturn;
 }
 
 void SctpTransport::OnDataFromSctpToTransport(
diff --git a/media/sctp/sctp_transport.h b/media/sctp/sctp_transport.h
index 758503b..38029ff 100644
--- a/media/sctp/sctp_transport.h
+++ b/media/sctp/sctp_transport.h
@@ -34,6 +34,7 @@
 // Defined by "usrsctplib/usrsctp.h"
 struct sockaddr_conn;
 struct sctp_assoc_change;
+struct sctp_rcvinfo;
 struct sctp_stream_reset_event;
 struct sctp_sendv_spa;
 
@@ -58,8 +59,8 @@
 //  8.  usrsctp_conninput(wrapped_data)
 // [network thread returns; sctp thread then calls the following]
 //  9.  OnSctpInboundData(data)
+//  10. SctpTransport::OnDataFromSctpToTransport(data)
 // [sctp thread returns having async invoked on the network thread]
-//  10. SctpTransport::OnInboundPacketFromSctpToTransport(inboundpacket)
 //  11. SctpTransport::OnDataFromSctpToTransport(data)
 //  12. SctpTransport::SignalDataReceived(data)
 // [from the same thread, methods registered/connected to
@@ -94,6 +95,10 @@
   void set_debug_name_for_testing(const char* debug_name) override {
     debug_name_ = debug_name;
   }
+  int InjectDataOrNotificationFromSctpForTesting(void* data,
+                                                 size_t length,
+                                                 struct sctp_rcvinfo rcv,
+                                                 int flags);
 
   // Exposed to allow Post call from c-callbacks.
   // TODO(deadbeef): Remove this or at least make it return a const pointer.
@@ -173,14 +178,17 @@
 
   // Called using |invoker_| to send packet on the network.
   void OnPacketFromSctpToNetwork(const rtc::CopyOnWriteBuffer& buffer);
-  // Called using |invoker_| to decide what to do with the packet.
-  // The |flags| parameter is used by SCTP to distinguish notification packets
-  // from other types of packets.
-  void OnInboundPacketFromSctpToTransport(const rtc::CopyOnWriteBuffer& buffer,
-                                          ReceiveDataParams params,
-                                          int flags);
+
+  // Called on the SCTP thread.
+  // Flags are standard socket API flags (RFC 6458).
+  int OnDataOrNotificationFromSctp(void* data,
+                                   size_t length,
+                                   struct sctp_rcvinfo rcv,
+                                   int flags);
+  // Called using |invoker_| to decide what to do with the data.
   void OnDataFromSctpToTransport(const ReceiveDataParams& params,
                                  const rtc::CopyOnWriteBuffer& buffer);
+  // Called using |invoker_| to decide what to do with the notification.
   void OnNotificationFromSctp(const rtc::CopyOnWriteBuffer& buffer);
   void OnNotificationAssocChange(const sctp_assoc_change& change);
 
diff --git a/media/sctp/sctp_transport_unittest.cc b/media/sctp/sctp_transport_unittest.cc
index da6c629..540d2bd 100644
--- a/media/sctp/sctp_transport_unittest.cc
+++ b/media/sctp/sctp_transport_unittest.cc
@@ -12,6 +12,7 @@
 
 #include <stdio.h>
 #include <string.h>
+#include <usrsctp.h>
 
 #include <memory>
 #include <string>
@@ -238,6 +239,73 @@
   void OnChan2ReadyToSend() { ++transport2_ready_to_send_count_; }
 };
 
+TEST_F(SctpTransportTest, MessageInterleavedWithNotification) {
+  FakeDtlsTransport fake_dtls1("fake dtls 1", 0);
+  FakeDtlsTransport fake_dtls2("fake dtls 2", 0);
+  SctpFakeDataReceiver recv1;
+  SctpFakeDataReceiver recv2;
+  std::unique_ptr<SctpTransport> transport1(
+      CreateTransport(&fake_dtls1, &recv1));
+  std::unique_ptr<SctpTransport> transport2(
+      CreateTransport(&fake_dtls2, &recv2));
+
+  // Add a stream.
+  transport1->OpenStream(1);
+  transport2->OpenStream(1);
+
+  // Start SCTP transports.
+  transport1->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
+  transport2->Start(kSctpDefaultPort, kSctpDefaultPort, kSctpSendBufferSize);
+
+  // Connect the two fake DTLS transports.
+  fake_dtls1.SetDestination(&fake_dtls2, false);
+
+  // Ensure the SCTP association has been established
+  // Note: I'd rather watch for an assoc established state here but couldn't
+  //       find any exposed...
+  SendDataResult result;
+  ASSERT_TRUE(SendData(transport2.get(), 1, "meow", &result));
+  EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "meow"), kDefaultTimeout);
+
+  // Detach the DTLS transport to ensure only we will inject packets from here
+  // on.
+  transport1->SetDtlsTransport(nullptr);
+
+  // Prepare chunk buffer and metadata
+  auto chunk = rtc::CopyOnWriteBuffer(32);
+  struct sctp_rcvinfo meta = {0};
+  meta.rcv_sid = 1;
+  meta.rcv_ssn = 1337;
+  meta.rcv_ppid = rtc::HostToNetwork32(51);  // text (complete)
+
+  // Inject chunk 1/2.
+  meta.rcv_tsn = 42;
+  meta.rcv_cumtsn = 42;
+  chunk.SetData("meow?", 5);
+  EXPECT_EQ(1, transport1->InjectDataOrNotificationFromSctpForTesting(
+                   chunk.data(), chunk.size(), meta, 0));
+
+  // Inject a notification in between chunks.
+  union sctp_notification notification;
+  memset(&notification, 0, sizeof(notification));
+  // Type chosen since it's not handled apart from being logged
+  notification.sn_header.sn_type = SCTP_PEER_ADDR_CHANGE;
+  notification.sn_header.sn_flags = 0;
+  notification.sn_header.sn_length = sizeof(notification);
+  EXPECT_EQ(1, transport1->InjectDataOrNotificationFromSctpForTesting(
+                   &notification, sizeof(notification), {0}, MSG_NOTIFICATION));
+
+  // Inject chunk 2/2
+  meta.rcv_tsn = 42;
+  meta.rcv_cumtsn = 43;
+  chunk.SetData(" rawr!", 6);
+  EXPECT_EQ(1, transport1->InjectDataOrNotificationFromSctpForTesting(
+                   chunk.data(), chunk.size(), meta, MSG_EOR));
+
+  // Expect the message to contain both chunks.
+  EXPECT_TRUE_WAIT(ReceivedData(&recv1, 1, "meow? rawr!"), kDefaultTimeout);
+}
+
 // Test that data can be sent end-to-end when an SCTP transport starts with one
 // transport (which is unwritable), and then switches to another transport. A
 // common scenario due to how BUNDLE works.