In criket::BaseChannel replace AsyncInvoker with task queue functions

all invokes, as well as BaseChannel constructor and destructor
should run on the same task queue which allow to use
simpler cancellation of pending task on BaseChannel destruction

Bug: webrtc:12339
Change-Id: I311b6de940cc24cf6bb5b49e1bbd132fea2439e4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202032
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33009}
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 9e296b5..0155bc7 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -121,6 +121,8 @@
     "../rtc_base/synchronization:sequence_checker",
     "../rtc_base/system:file_wrapper",
     "../rtc_base/system:rtc_export",
+    "../rtc_base/task_utils:pending_task_safety_flag",
+    "../rtc_base/task_utils:to_queued_task",
     "../rtc_base/third_party/base64",
     "../rtc_base/third_party/sigslot",
     "../system_wrappers:field_trial",
diff --git a/pc/channel.cc b/pc/channel.cc
index 69b2ca1..98a8f9d 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -32,15 +32,19 @@
 #include "rtc_base/strings/string_builder.h"
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/trace_event.h"
 
 namespace cricket {
-using rtc::Bind;
-using rtc::UniqueRandomIdGenerator;
-using webrtc::SdpType;
-
 namespace {
 
+using ::rtc::Bind;
+using ::rtc::UniqueRandomIdGenerator;
+using ::webrtc::PendingTaskSafetyFlag;
+using ::webrtc::SdpType;
+using ::webrtc::ToQueuedTask;
+
 struct SendPacketMessageData : public rtc::MessageData {
   rtc::CopyOnWriteBuffer packet;
   rtc::PacketOptions options;
@@ -135,6 +139,7 @@
     : worker_thread_(worker_thread),
       network_thread_(network_thread),
       signaling_thread_(signaling_thread),
+      alive_(PendingTaskSafetyFlag::Create()),
       content_name_(content_name),
       srtp_required_(srtp_required),
       crypto_options_(crypto_options),
@@ -151,7 +156,7 @@
   RTC_DCHECK_RUN_ON(worker_thread_);
 
   // Eats any outstanding messages or packets.
-  worker_thread_->Clear(&invoker_);
+  alive_->SetNotAlive();
   worker_thread_->Clear(this);
   // The media channel is destroyed at the end of the destructor, since it
   // is a std::unique_ptr. The transport channel (rtp_transport) must outlive
@@ -223,7 +228,6 @@
       DisconnectFromRtpTransport();
     }
     // Clear pending read packets/messages.
-    network_thread_->Clear(&invoker_);
     network_thread_->Clear(this);
   });
 }
@@ -401,10 +405,10 @@
   // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot
   // work correctly. Intentionally leave it broken to simplify the code and
   // encourage the users to stop using non-muxing RTCP.
-  invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
+  worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] {
     RTC_DCHECK_RUN_ON(worker_thread());
     media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
-  });
+  }));
 }
 
 sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
@@ -420,10 +424,10 @@
 }
 
 void BaseChannel::OnTransportReadyToSend(bool ready) {
-  invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
+  worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] {
     RTC_DCHECK_RUN_ON(worker_thread());
     media_channel_->OnReadyToSend(ready);
-  });
+  }));
 }
 
 bool BaseChannel::SendPacket(bool rtcp,
@@ -527,11 +531,11 @@
 
   auto packet_buffer = parsed_packet.Buffer();
 
-  invoker_.AsyncInvoke<void>(
-      RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] {
+  worker_thread_->PostTask(
+      ToQueuedTask(alive_, [this, packet_buffer, packet_time_us] {
         RTC_DCHECK_RUN_ON(worker_thread());
         media_channel_->OnPacketReceived(packet_buffer, packet_time_us);
-      });
+      }));
 }
 
 void BaseChannel::EnableMedia_w() {
@@ -574,11 +578,11 @@
   // We only have to do this AsyncInvoke once, when first transitioning to
   // writable.
   if (!was_ever_writable_n_) {
-    invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [this] {
+    worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
       RTC_DCHECK_RUN_ON(worker_thread());
       was_ever_writable_ = true;
       UpdateMediaSendRecvState_w();
-    });
+    }));
   }
   was_ever_writable_n_ = true;
 }
@@ -830,11 +834,10 @@
 }
 
 void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) {
-  invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
-                             [this, sent_packet] {
-                               RTC_DCHECK_RUN_ON(worker_thread());
-                               SignalSentPacket()(sent_packet);
-                             });
+  worker_thread_->PostTask(ToQueuedTask(alive_, [this, sent_packet] {
+    RTC_DCHECK_RUN_ON(worker_thread());
+    SignalSentPacket()(sent_packet);
+  }));
 }
 
 void BaseChannel::SetNegotiatedHeaderExtensions_w(
diff --git a/pc/channel.h b/pc/channel.h
index bbb95d7..8240582 100644
--- a/pc/channel.h
+++ b/pc/channel.h
@@ -36,10 +36,10 @@
 #include "pc/rtp_transport.h"
 #include "pc/srtp_filter.h"
 #include "pc/srtp_transport.h"
-#include "rtc_base/async_invoker.h"
 #include "rtc_base/async_udp_socket.h"
 #include "rtc_base/network.h"
 #include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
 #include "rtc_base/thread_annotations.h"
 #include "rtc_base/unique_id_generator.h"
@@ -325,7 +325,7 @@
   rtc::Thread* const worker_thread_;
   rtc::Thread* const network_thread_;
   rtc::Thread* const signaling_thread_;
-  rtc::AsyncInvoker invoker_;
+  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> alive_;
   sigslot::signal1<ChannelInterface*> SignalFirstPacketReceived_
       RTC_GUARDED_BY(signaling_thread_);
   sigslot::signal1<const rtc::SentPacket&> SignalSentPacket_