Integrate FlexfecReceiveStream with Call.

Call demultiplexes received RTP packets and delivers these to the
appropriate {Video,Flexfec}ReceiveStreams. A single video stream
could conceivably be protected by multiple FlexFEC streams.

BUG=webrtc:5654

Review-Url: https://codereview.webrtc.org/2388303009
Cr-Commit-Position: refs/heads/master@{#14727}
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index 489983e..d727217 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -12,6 +12,7 @@
 #include <algorithm>
 #include <map>
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "webrtc/audio/audio_receive_stream.h"
@@ -28,6 +29,7 @@
 #include "webrtc/base/trace_event.h"
 #include "webrtc/call.h"
 #include "webrtc/call/bitrate_allocator.h"
+#include "webrtc/call/flexfec_receive_stream.h"
 #include "webrtc/config.h"
 #include "webrtc/logging/rtc_event_log/rtc_event_log.h"
 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
@@ -66,6 +68,7 @@
   explicit Call(const Call::Config& config);
   virtual ~Call();
 
+  // Implements webrtc::Call.
   PacketReceiver* Receiver() override;
 
   webrtc::AudioSendStream* CreateAudioSendStream(
@@ -87,8 +90,14 @@
   void DestroyVideoReceiveStream(
       webrtc::VideoReceiveStream* receive_stream) override;
 
+  webrtc::FlexfecReceiveStream* CreateFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream::Config configuration) override;
+  void DestroyFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream* receive_stream) override;
+
   Stats GetStats() const override;
 
+  // Implements PacketReceiver.
   DeliveryStatus DeliverPacket(MediaType media_type,
                                const uint8_t* packet,
                                size_t length,
@@ -153,13 +162,22 @@
   NetworkState video_network_state_;
 
   std::unique_ptr<RWLockWrapper> receive_crit_;
-  // Audio and Video receive streams are owned by the client that creates them.
+  // Audio, Video, and FlexFEC receive streams are owned by the client that
+  // creates them.
   std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_
       GUARDED_BY(receive_crit_);
   std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
       GUARDED_BY(receive_crit_);
   std::set<VideoReceiveStream*> video_receive_streams_
       GUARDED_BY(receive_crit_);
+  // Each media stream could conceivably be protected by multiple FlexFEC
+  // streams.
+  std::multimap<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_media_
+      GUARDED_BY(receive_crit_);
+  std::map<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_protection_
+      GUARDED_BY(receive_crit_);
+  std::set<FlexfecReceiveStream*> flexfec_receive_streams_
+      GUARDED_BY(receive_crit_);
   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
       GUARDED_BY(receive_crit_);
 
@@ -578,6 +596,58 @@
   delete receive_stream_impl;
 }
 
+webrtc::FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream::Config configuration) {
+  TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
+  RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+  FlexfecReceiveStream* receive_stream =
+      new FlexfecReceiveStream(std::move(configuration), this);
+
+  const webrtc::FlexfecReceiveStream::Config& config = receive_stream->config();
+  {
+    WriteLockScoped write_lock(*receive_crit_);
+    for (auto ssrc : config.protected_media_ssrcs)
+      flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
+    RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.flexfec_ssrc) ==
+               flexfec_receive_ssrcs_protection_.end());
+    flexfec_receive_ssrcs_protection_[config.flexfec_ssrc] = receive_stream;
+    flexfec_receive_streams_.insert(receive_stream);
+  }
+  // TODO(brandtr): Store config in RtcEventLog here.
+  return receive_stream;
+}
+
+void Call::DestroyFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream* receive_stream) {
+  TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
+  RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+  RTC_DCHECK(receive_stream != nullptr);
+  // There exist no other derived classes of webrtc::FlexfecReceiveStream,
+  // so this downcast is safe.
+  FlexfecReceiveStream* receive_stream_impl =
+      static_cast<FlexfecReceiveStream*>(receive_stream);
+  {
+    WriteLockScoped write_lock(*receive_crit_);
+    // Remove all SSRCs pointing to the FlexfecReceiveStream to be destroyed.
+    auto media_it = flexfec_receive_ssrcs_media_.begin();
+    while (media_it != flexfec_receive_ssrcs_media_.end()) {
+      if (media_it->second == receive_stream_impl)
+        media_it = flexfec_receive_ssrcs_media_.erase(media_it);
+      else
+        ++media_it;
+    }
+    auto prot_it = flexfec_receive_ssrcs_protection_.begin();
+    while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
+      if (prot_it->second == receive_stream_impl)
+        prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
+      else
+        ++prot_it;
+    }
+    flexfec_receive_streams_.erase(receive_stream_impl);
+  }
+  delete receive_stream_impl;
+}
+
 Call::Stats Call::GetStats() const {
   // TODO(solenberg): Some test cases in EndToEndTest use this from a different
   // thread. Re-enable once that is fixed.
@@ -923,6 +993,21 @@
       auto status = it->second->DeliverRtp(packet, length, packet_time)
                         ? DELIVERY_OK
                         : DELIVERY_PACKET_ERROR;
+      // Deliver media packets to FlexFEC subsystem.
+      auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
+      for (auto it = it_bounds.first; it != it_bounds.second; ++it)
+        it->second->AddAndProcessReceivedPacket(packet, length);
+      if (status == DELIVERY_OK)
+        event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
+      return status;
+    }
+  }
+  if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
+    auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
+    if (it != flexfec_receive_ssrcs_protection_.end()) {
+      auto status = it->second->AddAndProcessReceivedPacket(packet, length)
+                        ? DELIVERY_OK
+                        : DELIVERY_PACKET_ERROR;
       if (status == DELIVERY_OK)
         event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
       return status;