Remove asyncinvoker from PeerConnection.

The callback that the asyncinvoker was being used for, will now use
a safety flag to check if call_ is valid before issuing calls.
Using the flag is a step towards removing the call_ptr_ variable
but in this CL we're just looking at replacing use of the async invoker.

The safety flag is cleared at the same time as call_ is, which prevents
pending callbacks for that call instance from running.

Also adding TODOs related to this change that will be
followed upon in other CLs.

Bug: webrtc:11988, webrtc:11992, webrtc:11993
Change-Id: If3986758af6d01d39b2db0cce82e57fc48be9d7f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185508
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32208}
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index c872550..df2fabc 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -56,6 +56,7 @@
 #include "rtc_base/logging.h"
 #include "rtc_base/string_encode.h"
 #include "rtc_base/strings/string_builder.h"
+#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/trace_event.h"
 #include "system_wrappers/include/clock.h"
 #include "system_wrappers/include/metrics.h"
@@ -1038,6 +1039,8 @@
       local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
       data_channel_controller_(this),
       weak_ptr_factory_(this) {
+  RTC_DCHECK(factory_);
+  // Note: call_ appears to be set to nullptr by some callers.
   operations_chain_->SetOnChainEmptyCallback(
       [this_weak_ptr = weak_ptr_factory_.GetWeakPtr()]() {
         if (!this_weak_ptr)
@@ -1082,6 +1085,7 @@
   // call_ and event_log_ must be destroyed on the worker thread.
   worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
     RTC_DCHECK_RUN_ON(worker_thread());
+    call_safety_.reset();
     call_.reset();
     // The event log must outlive call (and any other object that uses it).
     event_log_.reset();
@@ -1215,25 +1219,7 @@
                               ? *configuration.crypto_options
                               : options.crypto_options;
   config.transport_observer = this;
-  // It's safe to pass |this| and using |rtcp_invoker_| and the |call_| pointer
-  // since the  JsepTransportController instance is owned by this PeerConnection
-  // instance and is destroyed before both |rtcp_invoker_| and the |call_|
-  // pointer.
-  config.rtcp_handler = [this](const rtc::CopyOnWriteBuffer& packet,
-                               int64_t packet_time_us) {
-    RTC_DCHECK_RUN_ON(network_thread());
-    rtcp_invoker_.AsyncInvoke<void>(
-        RTC_FROM_HERE, worker_thread(), [this, packet, packet_time_us] {
-          RTC_DCHECK_RUN_ON(worker_thread());
-          // |call_| is reset on the worker thread in the PeerConnection
-          // destructor, so we check that it's still valid before propagating
-          // the packet.
-          if (call_) {
-            call_->Receiver()->DeliverPacket(MediaType::ANY, packet,
-                                             packet_time_us);
-          }
-        });
-  };
+  config.rtcp_handler = InitializeRtcpCallback();
   config.event_log = event_log_ptr_;
 #if defined(ENABLE_EXTERNAL_AUTH)
   config.enable_external_auth = true;
@@ -3619,6 +3605,13 @@
     const cricket::ContentGroup* bundle_group) {
   RTC_DCHECK(IsUnifiedPlan());
   RTC_DCHECK(transceiver);
+  // TODO(bugs.webrtc.org/11992): This function always returns RTCError::OK().
+  // Some of the below methods, specifically Create & Destroy, need to be called
+  // on the worker thread. Consider if there should be a split here where we do
+  // things asynchronously in two steps and change the return type of the
+  // function to be void. Note that in the case of 'create', that would/could
+  // mean that SetChannel might get called at a much later stage than it happens
+  // now.
   cricket::ChannelInterface* channel = transceiver->internal()->channel();
   if (content.rejected) {
     if (channel) {
@@ -4462,6 +4455,7 @@
 
   worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
     RTC_DCHECK_RUN_ON(worker_thread());
+    call_safety_.reset();
     call_.reset();
     // The event log must outlive call (and any other object that uses it).
     event_log_.reset();
@@ -6676,6 +6670,9 @@
     const std::string& mid) {
   RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
 
+  // TODO(bugs.webrtc.org/11992): CreateVoiceChannel internally switches to the
+  // worker thread. We shouldn't be using the |call_ptr_| hack here but simply
+  // be on the worker thread and use |call_| (update upstream code).
   cricket::VoiceChannel* voice_channel = channel_manager()->CreateVoiceChannel(
       call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
       mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_,
@@ -6697,6 +6694,9 @@
     const std::string& mid) {
   RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
 
+  // TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the
+  // worker thread. We shouldn't be using the |call_ptr_| hack here but simply
+  // be on the worker thread and use |call_| (update upstream code).
   cricket::VideoChannel* video_channel = channel_manager()->CreateVideoChannel(
       call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
       mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_, video_options_,
@@ -7367,6 +7367,10 @@
 
 void PeerConnection::DestroyChannelInterface(
     cricket::ChannelInterface* channel) {
+  // TODO(bugs.webrtc.org/11992): All the below methods should be called on the
+  // worker thread. (they switch internally anyway). Change
+  // DestroyChannelInterface to either be called on the worker thread, or do
+  // this asynchronously on the worker.
   RTC_DCHECK(channel);
   switch (channel->media_type()) {
     case cricket::MEDIA_TYPE_AUDIO:
@@ -7786,4 +7790,37 @@
   return RTCError::OK();
 }
 
+std::function<void(const rtc::CopyOnWriteBuffer& packet,
+                   int64_t packet_time_us)>
+PeerConnection::InitializeRtcpCallback() {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+
+  auto flag =
+      worker_thread()->Invoke<rtc::scoped_refptr<PendingTaskSafetyFlag>>(
+          RTC_FROM_HERE, [this] {
+            RTC_DCHECK_RUN_ON(worker_thread());
+            if (!call_)
+              return rtc::scoped_refptr<PendingTaskSafetyFlag>();
+            if (!call_safety_)
+              call_safety_.reset(new ScopedTaskSafety());
+            return call_safety_->flag();
+          });
+
+  if (!flag)
+    return [](const rtc::CopyOnWriteBuffer&, int64_t) {};
+
+  return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet,
+                                        int64_t packet_time_us) {
+    RTC_DCHECK_RUN_ON(network_thread());
+    // TODO(bugs.webrtc.org/11993): We should actually be delivering this call
+    // directly to the Call class somehow directly on the network thread and not
+    // incur this hop here. The DeliverPacket() method will eventually just have
+    // to hop back over to the network thread.
+    worker_thread()->PostTask(ToQueuedTask(flag, [this, packet,
+                                                  packet_time_us] {
+      RTC_DCHECK_RUN_ON(worker_thread());
+      call_->Receiver()->DeliverPacket(MediaType::ANY, packet, packet_time_us);
+    }));
+  };
+}
 }  // namespace webrtc