Implement WaitPoll for Fuchsia

Fuchsia's libc provides `select` and `poll` but not `epoll`.

This CL adds a `WaitPoll` method, which is modeled after `WaitSelect` but uses `poll`. The pre-existing `WaitPoll` method was renamed to `WaitPollOneDispatcher`.

TESTED="2p video call on Fuchsia. WaitPoll is faster compared to
WaitSelect, primarily because WaitSelect pessimistically calls
getsockopt(SO_ERROR) on each fd, while WaitPoll does so only on fds that
have entered an error state."

Original author: tombergan@google.com

Bug: None
Change-Id: I83cc824fca40d691fd93712c1c933ff21b3f877c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/296826
Reviewed-by: Markus Handell <handellm@webrtc.org>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tom Bergan <tombergan@google.com>
Reviewed-by: Taylor Brandstetter <deadbeef@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39564}
diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc
index b7d6914..be7d968 100644
--- a/rtc_base/physical_socket_server.cc
+++ b/rtc_base/physical_socket_server.cc
@@ -25,6 +25,8 @@
 #if defined(WEBRTC_USE_EPOLL)
 // "poll" will be used to wait for the signal dispatcher.
 #include <poll.h>
+#elif defined(WEBRTC_USE_POLL)
+#include <poll.h>
 #endif
 #include <sys/ioctl.h>
 #include <sys/select.h>
@@ -1266,17 +1268,22 @@
   RTC_DCHECK(!waiting_);
   ScopedSetTrue s(&waiting_);
   const int cmsWait = ToCmsWait(max_wait_duration);
+
+#if defined(WEBRTC_USE_POLL)
+  return WaitPoll(cmsWait, process_io);
+#else
 #if defined(WEBRTC_USE_EPOLL)
   // We don't keep a dedicated "epoll" descriptor containing only the non-IO
   // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
   // "select" to support sockets larger than FD_SETSIZE.
   if (!process_io) {
-    return WaitPoll(cmsWait, signal_wakeup_);
+    return WaitPollOneDispatcher(cmsWait, signal_wakeup_);
   } else if (epoll_fd_ != INVALID_SOCKET) {
     return WaitEpoll(cmsWait);
   }
 #endif
   return WaitSelect(cmsWait, process_io);
+#endif
 }
 
 // `error_event` is true if we are responding to an event where we know an
@@ -1346,6 +1353,34 @@
   }
 }
 
+#if defined(WEBRTC_USE_POLL) || defined(WEBRTC_USE_EPOLL)
+static void ProcessPollEvents(Dispatcher* dispatcher, const pollfd& pfd) {
+  bool readable = (pfd.revents & (POLLIN | POLLPRI));
+  bool writable = (pfd.revents & POLLOUT);
+  bool error = (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP));
+
+  ProcessEvents(dispatcher, readable, writable, error, error);
+}
+
+static pollfd DispatcherToPollfd(Dispatcher* dispatcher) {
+  pollfd fd{
+      .fd = dispatcher->GetDescriptor(),
+      .events = 0,
+      .revents = 0,
+  };
+
+  uint32_t ff = dispatcher->GetRequestedEvents();
+  if (ff & (DE_READ | DE_ACCEPT)) {
+    fd.events |= POLLIN;
+  }
+  if (ff & (DE_WRITE | DE_CONNECT)) {
+    fd.events |= POLLOUT;
+  }
+
+  return fd;
+}
+#endif  // WEBRTC_USE_POLL || WEBRTC_USE_EPOLL
+
 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
   // Calculate timing information
 
@@ -1387,7 +1422,6 @@
       for (auto const& kv : dispatcher_by_key_) {
         uint64_t key = kv.first;
         Dispatcher* pdispatcher = kv.second;
-        // Query dispatchers for read and write wait state
         if (!process_io && (pdispatcher != signal_wakeup_))
           continue;
         current_dispatcher_keys_.push_back(key);
@@ -1428,9 +1462,9 @@
     } else {
       // We have signaled descriptors
       CritScope cr(&crit_);
-      // Iterate only on the dispatchers whose sockets were passed into
-      // WSAEventSelect; this avoids the ABA problem (a socket being
-      // destroyed and a new one created with the same file descriptor).
+      // Iterate only on the dispatchers whose file descriptors were passed into
+      // select; this avoids the ABA problem (a socket being destroyed and a new
+      // one created with the same file descriptor).
       for (uint64_t key : current_dispatcher_keys_) {
         if (!dispatcher_by_key_.count(key))
           continue;
@@ -1547,11 +1581,11 @@
 
 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
-  int64_t tvWait = -1;
-  int64_t tvStop = -1;
+  int64_t msWait = -1;
+  int64_t msStop = -1;
   if (cmsWait != kForeverMs) {
-    tvWait = cmsWait;
-    tvStop = TimeAfter(cmsWait);
+    msWait = cmsWait;
+    msStop = TimeAfter(cmsWait);
   }
 
   fWait_ = true;
@@ -1561,7 +1595,7 @@
     // 0 means timeout
     // > 0 means count of descriptors ready
     int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
-                       static_cast<int>(tvWait));
+                       static_cast<int>(msWait));
     if (n < 0) {
       if (errno != EINTR) {
         RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
@@ -1595,8 +1629,8 @@
     }
 
     if (cmsWait != kForeverMs) {
-      tvWait = TimeDiff(tvStop, TimeMillis());
-      if (tvWait <= 0) {
+      msWait = TimeDiff(msStop, TimeMillis());
+      if (msWait <= 0) {
         // Return success on timeout.
         return true;
       }
@@ -1606,37 +1640,27 @@
   return true;
 }
 
-bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
+bool PhysicalSocketServer::WaitPollOneDispatcher(int cmsWait,
+                                                 Dispatcher* dispatcher) {
   RTC_DCHECK(dispatcher);
-  int64_t tvWait = -1;
-  int64_t tvStop = -1;
+  int64_t msWait = -1;
+  int64_t msStop = -1;
   if (cmsWait != kForeverMs) {
-    tvWait = cmsWait;
-    tvStop = TimeAfter(cmsWait);
+    msWait = cmsWait;
+    msStop = TimeAfter(cmsWait);
   }
 
   fWait_ = true;
-
-  struct pollfd fds = {0};
-  int fd = dispatcher->GetDescriptor();
-  fds.fd = fd;
+  const int fd = dispatcher->GetDescriptor();
 
   while (fWait_) {
-    uint32_t ff = dispatcher->GetRequestedEvents();
-    fds.events = 0;
-    if (ff & (DE_READ | DE_ACCEPT)) {
-      fds.events |= POLLIN;
-    }
-    if (ff & (DE_WRITE | DE_CONNECT)) {
-      fds.events |= POLLOUT;
-    }
-    fds.revents = 0;
+    auto fds = DispatcherToPollfd(dispatcher);
 
     // Wait then call handlers as appropriate
     // < 0 means error
     // 0 means timeout
     // > 0 means count of descriptors ready
-    int n = poll(&fds, 1, static_cast<int>(tvWait));
+    int n = poll(&fds, 1, static_cast<int>(msWait));
     if (n < 0) {
       if (errno != EINTR) {
         RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
@@ -1653,17 +1677,12 @@
       // We have signaled descriptors (should only be the passed dispatcher).
       RTC_DCHECK_EQ(n, 1);
       RTC_DCHECK_EQ(fds.fd, fd);
-
-      bool readable = (fds.revents & (POLLIN | POLLPRI));
-      bool writable = (fds.revents & POLLOUT);
-      bool error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
-
-      ProcessEvents(dispatcher, readable, writable, error, error);
+      ProcessPollEvents(dispatcher, fds);
     }
 
     if (cmsWait != kForeverMs) {
-      tvWait = TimeDiff(tvStop, TimeMillis());
-      if (tvWait < 0) {
+      msWait = TimeDiff(msStop, TimeMillis());
+      if (msWait < 0) {
         // Return success on timeout.
         return true;
       }
@@ -1673,7 +1692,80 @@
   return true;
 }
 
-#endif  // WEBRTC_USE_EPOLL
+#elif defined(WEBRTC_USE_POLL)
+
+bool PhysicalSocketServer::WaitPoll(int cmsWait, bool process_io) {
+  int64_t msWait = -1;
+  int64_t msStop = -1;
+  if (cmsWait != kForeverMs) {
+    msWait = cmsWait;
+    msStop = TimeAfter(cmsWait);
+  }
+
+  std::vector<pollfd> pollfds;
+  fWait_ = true;
+
+  while (fWait_) {
+    {
+      CritScope cr(&crit_);
+      current_dispatcher_keys_.clear();
+      pollfds.clear();
+      pollfds.reserve(dispatcher_by_key_.size());
+
+      for (auto const& kv : dispatcher_by_key_) {
+        uint64_t key = kv.first;
+        Dispatcher* pdispatcher = kv.second;
+        if (!process_io && (pdispatcher != signal_wakeup_))
+          continue;
+        current_dispatcher_keys_.push_back(key);
+        pollfds.push_back(DispatcherToPollfd(pdispatcher));
+      }
+    }
+
+    // Wait then call handlers as appropriate
+    // < 0 means error
+    // 0 means timeout
+    // > 0 means count of descriptors ready
+    int n = poll(pollfds.data(), pollfds.size(), static_cast<int>(msWait));
+    if (n < 0) {
+      if (errno != EINTR) {
+        RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
+        return false;
+      }
+      // Else ignore the error and keep going. If this EINTR was for one of the
+      // signals managed by this PhysicalSocketServer, the
+      // PosixSignalDeliveryDispatcher will be in the signaled state in the next
+      // iteration.
+    } else if (n == 0) {
+      // If timeout, return success
+      return true;
+    } else {
+      // We have signaled descriptors
+      CritScope cr(&crit_);
+      // Iterate only on the dispatchers whose file descriptors were passed into
+      // poll; this avoids the ABA problem (a socket being destroyed and a new
+      // one created with the same file descriptor).
+      for (size_t i = 0; i < current_dispatcher_keys_.size(); ++i) {
+        uint64_t key = current_dispatcher_keys_[i];
+        if (!dispatcher_by_key_.count(key))
+          continue;
+        ProcessPollEvents(dispatcher_by_key_.at(key), pollfds[i]);
+      }
+    }
+
+    if (cmsWait != kForeverMs) {
+      msWait = TimeDiff(msStop, TimeMillis());
+      if (msWait < 0) {
+        // Return success on timeout.
+        return true;
+      }
+    }
+  }
+
+  return true;
+}
+
+#endif  // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL
 
 #endif  // WEBRTC_POSIX
 
diff --git a/rtc_base/physical_socket_server.h b/rtc_base/physical_socket_server.h
index 7b11780..650db80 100644
--- a/rtc_base/physical_socket_server.h
+++ b/rtc_base/physical_socket_server.h
@@ -12,10 +12,21 @@
 #define RTC_BASE_PHYSICAL_SOCKET_SERVER_H_
 
 #include "api/units/time_delta.h"
-#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
+
+#if defined(WEBRTC_POSIX)
+#if defined(WEBRTC_LINUX)
+// On Linux, use epoll.
 #include <sys/epoll.h>
 #define WEBRTC_USE_EPOLL 1
-#endif
+#elif defined(WEBRTC_FUCHSIA)
+// Fuchsia implements select and poll but not epoll, and testing shows that poll
+// is faster than select.
+#include <poll.h>
+#define WEBRTC_USE_POLL 1
+#else
+// On other POSIX systems, use select by default.
+#endif  // WEBRTC_LINUX, WEBRTC_FUCHSIA
+#endif  // WEBRTC_POSIX
 
 #include <array>
 #include <memory>
@@ -89,15 +100,16 @@
   static constexpr int kForeverMs = -1;
 
   static int ToCmsWait(webrtc::TimeDelta max_wait_duration);
+
 #if defined(WEBRTC_POSIX)
   bool WaitSelect(int cmsWait, bool process_io);
-#endif  // WEBRTC_POSIX
+
 #if defined(WEBRTC_USE_EPOLL)
   void AddEpoll(Dispatcher* dispatcher, uint64_t key);
   void RemoveEpoll(Dispatcher* dispatcher);
   void UpdateEpoll(Dispatcher* dispatcher, uint64_t key);
   bool WaitEpoll(int cmsWait);
-  bool WaitPoll(int cmsWait, Dispatcher* dispatcher);
+  bool WaitPollOneDispatcher(int cmsWait, Dispatcher* dispatcher);
 
   // This array is accessed in isolation by a thread calling into Wait().
   // It's useless to use a SequenceChecker to guard it because a socket
@@ -105,7 +117,16 @@
   // to have to reset the sequence checker on Wait calls.
   std::array<epoll_event, kNumEpollEvents> epoll_events_;
   const int epoll_fd_ = INVALID_SOCKET;
-#endif  // WEBRTC_USE_EPOLL
+
+#elif defined(WEBRTC_USE_POLL)
+  void AddPoll(Dispatcher* dispatcher, uint64_t key);
+  void RemovePoll(Dispatcher* dispatcher);
+  void UpdatePoll(Dispatcher* dispatcher, uint64_t key);
+  bool WaitPoll(int cmsWait, bool process_io);
+
+#endif  // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL
+#endif  // WEBRTC_POSIX
+
   // uint64_t keys are used to uniquely identify a dispatcher in order to avoid
   // the ABA problem during the epoll loop (a dispatcher being destroyed and
   // replaced by one with the same address).
@@ -116,9 +137,9 @@
   std::unordered_map<Dispatcher*, uint64_t> key_by_dispatcher_
       RTC_GUARDED_BY(crit_);
   // A list of dispatcher keys that we're interested in for the current
-  // select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA
-  // problem (a socket being destroyed and a new one created with the same
-  // handle, erroneously receiving the events from the destroyed socket).
+  // select(), poll(), or WSAWaitForMultipleEvents() loop. Again, used to avoid
+  // the ABA problem (a socket being destroyed and a new one created with the
+  // same handle, erroneously receiving the events from the destroyed socket).
   //
   // Kept as a member variable just for efficiency.
   std::vector<uint64_t> current_dispatcher_keys_;