Integrate FlexfecReceiveStream with Call.

Call demultiplexes received RTP packets and delivers these to the
appropriate {Video,Flexfec}ReceiveStreams. A single video stream
could conceivably be protected by multiple FlexFEC streams.

BUG=webrtc:5654

Review-Url: https://codereview.webrtc.org/2388303009
Cr-Commit-Position: refs/heads/master@{#14727}
diff --git a/webrtc/call.h b/webrtc/call.h
index 9855b60..d914303 100644
--- a/webrtc/call.h
+++ b/webrtc/call.h
@@ -16,6 +16,7 @@
 #include "webrtc/api/call/audio_receive_stream.h"
 #include "webrtc/api/call/audio_send_stream.h"
 #include "webrtc/api/call/audio_state.h"
+#include "webrtc/api/call/flexfec_receive_stream.h"
 #include "webrtc/base/networkroute.h"
 #include "webrtc/base/platform_file.h"
 #include "webrtc/base/socket.h"
@@ -131,6 +132,11 @@
   virtual void DestroyVideoReceiveStream(
       VideoReceiveStream* receive_stream) = 0;
 
+  virtual FlexfecReceiveStream* CreateFlexfecReceiveStream(
+      FlexfecReceiveStream::Config configuration) = 0;
+  virtual void DestroyFlexfecReceiveStream(
+      FlexfecReceiveStream* receive_stream) = 0;
+
   // All received RTP and RTCP packets for the call should be inserted to this
   // PacketReceiver. The PacketReceiver pointer is valid as long as the
   // Call instance exists.
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index 489983e..d727217 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -12,6 +12,7 @@
 #include <algorithm>
 #include <map>
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "webrtc/audio/audio_receive_stream.h"
@@ -28,6 +29,7 @@
 #include "webrtc/base/trace_event.h"
 #include "webrtc/call.h"
 #include "webrtc/call/bitrate_allocator.h"
+#include "webrtc/call/flexfec_receive_stream.h"
 #include "webrtc/config.h"
 #include "webrtc/logging/rtc_event_log/rtc_event_log.h"
 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
@@ -66,6 +68,7 @@
   explicit Call(const Call::Config& config);
   virtual ~Call();
 
+  // Implements webrtc::Call.
   PacketReceiver* Receiver() override;
 
   webrtc::AudioSendStream* CreateAudioSendStream(
@@ -87,8 +90,14 @@
   void DestroyVideoReceiveStream(
       webrtc::VideoReceiveStream* receive_stream) override;
 
+  webrtc::FlexfecReceiveStream* CreateFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream::Config configuration) override;
+  void DestroyFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream* receive_stream) override;
+
   Stats GetStats() const override;
 
+  // Implements PacketReceiver.
   DeliveryStatus DeliverPacket(MediaType media_type,
                                const uint8_t* packet,
                                size_t length,
@@ -153,13 +162,22 @@
   NetworkState video_network_state_;
 
   std::unique_ptr<RWLockWrapper> receive_crit_;
-  // Audio and Video receive streams are owned by the client that creates them.
+  // Audio, Video, and FlexFEC receive streams are owned by the client that
+  // creates them.
   std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_
       GUARDED_BY(receive_crit_);
   std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
       GUARDED_BY(receive_crit_);
   std::set<VideoReceiveStream*> video_receive_streams_
       GUARDED_BY(receive_crit_);
+  // Each media stream could conceivably be protected by multiple FlexFEC
+  // streams.
+  std::multimap<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_media_
+      GUARDED_BY(receive_crit_);
+  std::map<uint32_t, FlexfecReceiveStream*> flexfec_receive_ssrcs_protection_
+      GUARDED_BY(receive_crit_);
+  std::set<FlexfecReceiveStream*> flexfec_receive_streams_
+      GUARDED_BY(receive_crit_);
   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
       GUARDED_BY(receive_crit_);
 
@@ -578,6 +596,58 @@
   delete receive_stream_impl;
 }
 
+webrtc::FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream::Config configuration) {
+  TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
+  RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+  FlexfecReceiveStream* receive_stream =
+      new FlexfecReceiveStream(std::move(configuration), this);
+
+  const webrtc::FlexfecReceiveStream::Config& config = receive_stream->config();
+  {
+    WriteLockScoped write_lock(*receive_crit_);
+    for (auto ssrc : config.protected_media_ssrcs)
+      flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
+    RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.flexfec_ssrc) ==
+               flexfec_receive_ssrcs_protection_.end());
+    flexfec_receive_ssrcs_protection_[config.flexfec_ssrc] = receive_stream;
+    flexfec_receive_streams_.insert(receive_stream);
+  }
+  // TODO(brandtr): Store config in RtcEventLog here.
+  return receive_stream;
+}
+
+void Call::DestroyFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream* receive_stream) {
+  TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
+  RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+  RTC_DCHECK(receive_stream != nullptr);
+  // There exist no other derived classes of webrtc::FlexfecReceiveStream,
+  // so this downcast is safe.
+  FlexfecReceiveStream* receive_stream_impl =
+      static_cast<FlexfecReceiveStream*>(receive_stream);
+  {
+    WriteLockScoped write_lock(*receive_crit_);
+    // Remove all SSRCs pointing to the FlexfecReceiveStream to be destroyed.
+    auto media_it = flexfec_receive_ssrcs_media_.begin();
+    while (media_it != flexfec_receive_ssrcs_media_.end()) {
+      if (media_it->second == receive_stream_impl)
+        media_it = flexfec_receive_ssrcs_media_.erase(media_it);
+      else
+        ++media_it;
+    }
+    auto prot_it = flexfec_receive_ssrcs_protection_.begin();
+    while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
+      if (prot_it->second == receive_stream_impl)
+        prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
+      else
+        ++prot_it;
+    }
+    flexfec_receive_streams_.erase(receive_stream_impl);
+  }
+  delete receive_stream_impl;
+}
+
 Call::Stats Call::GetStats() const {
   // TODO(solenberg): Some test cases in EndToEndTest use this from a different
   // thread. Re-enable once that is fixed.
@@ -923,6 +993,21 @@
       auto status = it->second->DeliverRtp(packet, length, packet_time)
                         ? DELIVERY_OK
                         : DELIVERY_PACKET_ERROR;
+      // Deliver media packets to FlexFEC subsystem.
+      auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
+      for (auto it = it_bounds.first; it != it_bounds.second; ++it)
+        it->second->AddAndProcessReceivedPacket(packet, length);
+      if (status == DELIVERY_OK)
+        event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
+      return status;
+    }
+  }
+  if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
+    auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
+    if (it != flexfec_receive_ssrcs_protection_.end()) {
+      auto status = it->second->AddAndProcessReceivedPacket(packet, length)
+                        ? DELIVERY_OK
+                        : DELIVERY_PACKET_ERROR;
       if (status == DELIVERY_OK)
         event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
       return status;
diff --git a/webrtc/call/call_unittest.cc b/webrtc/call/call_unittest.cc
index d452927..1cdd48a 100644
--- a/webrtc/call/call_unittest.cc
+++ b/webrtc/call/call_unittest.cc
@@ -117,4 +117,75 @@
     streams.clear();
   }
 }
+
+TEST(CallTest, CreateDestroy_FlexfecReceiveStream) {
+  CallHelper call;
+  FlexfecReceiveStream::Config config;
+  config.flexfec_payload_type = 118;
+  config.flexfec_ssrc = 38837212;
+  config.protected_media_ssrcs = {27273};
+
+  FlexfecReceiveStream* stream = call->CreateFlexfecReceiveStream(config);
+  EXPECT_NE(stream, nullptr);
+  call->DestroyFlexfecReceiveStream(stream);
+}
+
+TEST(CallTest, CreateDestroy_FlexfecReceiveStreams) {
+  CallHelper call;
+  FlexfecReceiveStream::Config config;
+  config.flexfec_payload_type = 118;
+  std::list<FlexfecReceiveStream*> streams;
+
+  for (int i = 0; i < 2; ++i) {
+    for (uint32_t ssrc = 0; ssrc < 1234567; ssrc += 34567) {
+      config.flexfec_ssrc = ssrc;
+      config.protected_media_ssrcs = {ssrc + 1};
+      FlexfecReceiveStream* stream = call->CreateFlexfecReceiveStream(config);
+      EXPECT_NE(stream, nullptr);
+      if (ssrc & 1) {
+        streams.push_back(stream);
+      } else {
+        streams.push_front(stream);
+      }
+    }
+    for (auto s : streams) {
+      call->DestroyFlexfecReceiveStream(s);
+    }
+    streams.clear();
+  }
+}
+
+TEST(CallTest, MultipleFlexfecReceiveStreamsProtectingSingleVideoStream) {
+  CallHelper call;
+  FlexfecReceiveStream::Config config;
+  config.flexfec_payload_type = 118;
+  config.protected_media_ssrcs = {1324234};
+  FlexfecReceiveStream* stream;
+  std::list<FlexfecReceiveStream*> streams;
+
+  config.flexfec_ssrc = 838383;
+  stream = call->CreateFlexfecReceiveStream(config);
+  EXPECT_NE(stream, nullptr);
+  streams.push_back(stream);
+
+  config.flexfec_ssrc = 424993;
+  stream = call->CreateFlexfecReceiveStream(config);
+  EXPECT_NE(stream, nullptr);
+  streams.push_back(stream);
+
+  config.flexfec_ssrc = 99383;
+  stream = call->CreateFlexfecReceiveStream(config);
+  EXPECT_NE(stream, nullptr);
+  streams.push_back(stream);
+
+  config.flexfec_ssrc = 5548;
+  stream = call->CreateFlexfecReceiveStream(config);
+  EXPECT_NE(stream, nullptr);
+  streams.push_back(stream);
+
+  for (auto s : streams) {
+    call->DestroyFlexfecReceiveStream(s);
+  }
+}
+
 }  // namespace webrtc
diff --git a/webrtc/media/engine/fakewebrtccall.cc b/webrtc/media/engine/fakewebrtccall.cc
index d021eca..a25fe54 100644
--- a/webrtc/media/engine/fakewebrtccall.cc
+++ b/webrtc/media/engine/fakewebrtccall.cc
@@ -433,6 +433,17 @@
   }
 }
 
+webrtc::FlexfecReceiveStream* FakeCall::CreateFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream::Config config) {
+  // TODO(brandtr): Implement when adding integration with WebRtcVideoEngine2.
+  return nullptr;
+}
+
+void FakeCall::DestroyFlexfecReceiveStream(
+    webrtc::FlexfecReceiveStream* receive_stream) {
+  // TODO(brandtr): Implement when adding integration with WebRtcVideoEngine2.
+}
+
 webrtc::PacketReceiver* FakeCall::Receiver() {
   return this;
 }
diff --git a/webrtc/media/engine/fakewebrtccall.h b/webrtc/media/engine/fakewebrtccall.h
index db3db18..aec22a3 100644
--- a/webrtc/media/engine/fakewebrtccall.h
+++ b/webrtc/media/engine/fakewebrtccall.h
@@ -228,6 +228,12 @@
       webrtc::VideoReceiveStream::Config config) override;
   void DestroyVideoReceiveStream(
       webrtc::VideoReceiveStream* receive_stream) override;
+
+  webrtc::FlexfecReceiveStream* CreateFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream::Config config) override;
+  void DestroyFlexfecReceiveStream(
+      webrtc::FlexfecReceiveStream* receive_stream) override;
+
   webrtc::PacketReceiver* Receiver() override;
 
   DeliveryStatus DeliverPacket(webrtc::MediaType media_type,