AllocationSequence: migrate from rtc::Message to TaskQueue.

AllocationSequence uses legacy rtc::Thread message handling. In order
to cancel callbacks it uses rtc::Thread::Clear() which uses locks and
necessitates looping through all currently queued (unbounded) messages
in the thread. In particular, these Clear calls are common during
negotiation and the probability of having a lot of queued messages is
high due to a long-running network thread function invoked on the
network thread.

Fix this by migrating AllocationSequence to task queues.

Bug: webrtc:12840, webrtc:9702
Change-Id: I42bbdb59fb2c88b50e866326ba15134dcc6ce691
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221369
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34241}
diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc
index 49d4958..9d7a4f9 100644
--- a/p2p/client/basic_port_allocator.cc
+++ b/p2p/client/basic_port_allocator.cc
@@ -39,10 +39,6 @@
 namespace cricket {
 namespace {
 
-enum {
-  MSG_ALLOCATION_PHASE,
-};
-
 const int PHASE_UDP = 0;
 const int PHASE_RELAY = 1;
 const int PHASE_TCP = 2;
@@ -1261,10 +1257,6 @@
   Stop();
 }
 
-AllocationSequence::~AllocationSequence() {
-  session_->network_thread()->Clear(this);
-}
-
 void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
                                                  PortConfiguration* config,
                                                  uint32_t* flags) {
@@ -1339,7 +1331,9 @@
 
 void AllocationSequence::Start() {
   state_ = kRunning;
-  session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
+
+  session_->network_thread()->PostTask(webrtc::ToQueuedTask(
+      safety_, [this, epoch = epoch_] { Process(epoch); }));
   // Take a snapshot of the best IP, so that when DisableEquivalentPhases is
   // called next time, we enable all phases if the best IP has since changed.
   previous_best_ip_ = network_->GetBestIP();
@@ -1349,16 +1343,18 @@
   // If the port is completed, don't set it to stopped.
   if (state_ == kRunning) {
     state_ = kStopped;
-    session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
+    // Cause further Process calls in the previous epoch to be ignored.
+    ++epoch_;
   }
 }
 
-void AllocationSequence::OnMessage(rtc::Message* msg) {
+void AllocationSequence::Process(int epoch) {
   RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
-  RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
-
   const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
 
+  if (epoch != epoch_)
+    return;
+
   // Perform all of the phases in the current step.
   RTC_LOG(LS_INFO) << network_->ToString()
                    << ": Allocation Phase=" << PHASE_NAMES[phase_];
@@ -1384,13 +1380,15 @@
 
   if (state() == kRunning) {
     ++phase_;
-    session_->network_thread()->PostDelayed(RTC_FROM_HERE,
-                                            session_->allocator()->step_delay(),
-                                            this, MSG_ALLOCATION_PHASE);
+    session_->network_thread()->PostDelayedTask(
+        webrtc::ToQueuedTask(safety_,
+                             [this, epoch = epoch_] { Process(epoch); }),
+        session_->allocator()->step_delay());
   } else {
-    // If all phases in AllocationSequence are completed, no allocation
-    // steps needed further. Canceling  pending signal.
-    session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
+    // No allocation steps needed further if all phases in AllocationSequence
+    // are completed. Cause further Process calls in the previous epoch to be
+    // ignored.
+    ++epoch_;
     port_allocation_complete_callback_();
   }
 }
diff --git a/p2p/client/basic_port_allocator.h b/p2p/client/basic_port_allocator.h
index ede9395..77aceb1 100644
--- a/p2p/client/basic_port_allocator.h
+++ b/p2p/client/basic_port_allocator.h
@@ -327,8 +327,8 @@
 
 // Performs the allocation of ports, in a sequenced (timed) manner, for a given
 // network and IP address.
-class AllocationSequence : public rtc::MessageHandler,
-                           public sigslot::has_slots<> {
+// This class is thread-compatible.
+class AllocationSequence : public sigslot::has_slots<> {
  public:
   enum State {
     kInit,       // Initial state.
@@ -350,7 +350,6 @@
                      PortConfiguration* config,
                      uint32_t flags,
                      std::function<void()> port_allocation_complete_callback);
-  ~AllocationSequence() override;
   void Init();
   void Clear();
   void OnNetworkFailed();
@@ -372,9 +371,6 @@
   void Start();
   void Stop();
 
-  // MessageHandler
-  void OnMessage(rtc::Message* msg) override;
-
  protected:
   // For testing.
   void CreateTurnPort(const RelayServerConfig& config);
@@ -382,6 +378,7 @@
  private:
   typedef std::vector<ProtocolType> ProtocolList;
 
+  void Process(int epoch);
   bool IsFlagSet(uint32_t flag) { return ((flags_ & flag) != 0); }
   void CreateUDPPorts();
   void CreateTCPPorts();
@@ -411,6 +408,11 @@
   std::vector<Port*> relay_ports_;
   int phase_;
   std::function<void()> port_allocation_complete_callback_;
+  // This counter is sampled and passed together with tasks when tasks are
+  // posted. If the sampled counter doesn't match |epoch_| on reception, the
+  // posted task is ignored.
+  int epoch_ = 0;
+  webrtc::ScopedTaskSafety safety_;
 };
 
 }  // namespace cricket