AllocationSequence: migrate from rtc::Message to TaskQueue.
AllocationSequence uses legacy rtc::Thread message handling. In order
to cancel callbacks it uses rtc::Thread::Clear() which uses locks and
necessitates looping through all currently queued (unbounded) messages
in the thread. In particular, these Clear calls are common during
negotiation and the probability of having a lot of queued messages is
high due to a long-running network thread function invoked on the
network thread.
Fix this by migrating AllocationSequence to task queues.
Bug: webrtc:12840, webrtc:9702
Change-Id: I42bbdb59fb2c88b50e866326ba15134dcc6ce691
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221369
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34241}
diff --git a/p2p/client/basic_port_allocator.cc b/p2p/client/basic_port_allocator.cc
index 49d4958..9d7a4f9 100644
--- a/p2p/client/basic_port_allocator.cc
+++ b/p2p/client/basic_port_allocator.cc
@@ -39,10 +39,6 @@
namespace cricket {
namespace {
-enum {
- MSG_ALLOCATION_PHASE,
-};
-
const int PHASE_UDP = 0;
const int PHASE_RELAY = 1;
const int PHASE_TCP = 2;
@@ -1261,10 +1257,6 @@
Stop();
}
-AllocationSequence::~AllocationSequence() {
- session_->network_thread()->Clear(this);
-}
-
void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
PortConfiguration* config,
uint32_t* flags) {
@@ -1339,7 +1331,9 @@
void AllocationSequence::Start() {
state_ = kRunning;
- session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
+
+ session_->network_thread()->PostTask(webrtc::ToQueuedTask(
+ safety_, [this, epoch = epoch_] { Process(epoch); }));
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
@@ -1349,16 +1343,18 @@
// If the port is completed, don't set it to stopped.
if (state_ == kRunning) {
state_ = kStopped;
- session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
+ // Cause further Process calls in the previous epoch to be ignored.
+ ++epoch_;
}
}
-void AllocationSequence::OnMessage(rtc::Message* msg) {
+void AllocationSequence::Process(int epoch) {
RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
- RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
-
const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
+ if (epoch != epoch_)
+ return;
+
// Perform all of the phases in the current step.
RTC_LOG(LS_INFO) << network_->ToString()
<< ": Allocation Phase=" << PHASE_NAMES[phase_];
@@ -1384,13 +1380,15 @@
if (state() == kRunning) {
++phase_;
- session_->network_thread()->PostDelayed(RTC_FROM_HERE,
- session_->allocator()->step_delay(),
- this, MSG_ALLOCATION_PHASE);
+ session_->network_thread()->PostDelayedTask(
+ webrtc::ToQueuedTask(safety_,
+ [this, epoch = epoch_] { Process(epoch); }),
+ session_->allocator()->step_delay());
} else {
- // If all phases in AllocationSequence are completed, no allocation
- // steps needed further. Canceling pending signal.
- session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
+ // No allocation steps needed further if all phases in AllocationSequence
+ // are completed. Cause further Process calls in the previous epoch to be
+ // ignored.
+ ++epoch_;
port_allocation_complete_callback_();
}
}
diff --git a/p2p/client/basic_port_allocator.h b/p2p/client/basic_port_allocator.h
index ede9395..77aceb1 100644
--- a/p2p/client/basic_port_allocator.h
+++ b/p2p/client/basic_port_allocator.h
@@ -327,8 +327,8 @@
// Performs the allocation of ports, in a sequenced (timed) manner, for a given
// network and IP address.
-class AllocationSequence : public rtc::MessageHandler,
- public sigslot::has_slots<> {
+// This class is thread-compatible.
+class AllocationSequence : public sigslot::has_slots<> {
public:
enum State {
kInit, // Initial state.
@@ -350,7 +350,6 @@
PortConfiguration* config,
uint32_t flags,
std::function<void()> port_allocation_complete_callback);
- ~AllocationSequence() override;
void Init();
void Clear();
void OnNetworkFailed();
@@ -372,9 +371,6 @@
void Start();
void Stop();
- // MessageHandler
- void OnMessage(rtc::Message* msg) override;
-
protected:
// For testing.
void CreateTurnPort(const RelayServerConfig& config);
@@ -382,6 +378,7 @@
private:
typedef std::vector<ProtocolType> ProtocolList;
+ void Process(int epoch);
bool IsFlagSet(uint32_t flag) { return ((flags_ & flag) != 0); }
void CreateUDPPorts();
void CreateTCPPorts();
@@ -411,6 +408,11 @@
std::vector<Port*> relay_ports_;
int phase_;
std::function<void()> port_allocation_complete_callback_;
+ // This counter is sampled and passed together with tasks when tasks are
+ // posted. If the sampled counter doesn't match |epoch_| on reception, the
+ // posted task is ignored.
+ int epoch_ = 0;
+ webrtc::ScopedTaskSafety safety_;
};
} // namespace cricket