Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation

BUG=webrtc:7188

Review-Url: https://codereview.webrtc.org/2709603002
Cr-Commit-Position: refs/heads/master@{#16786}
diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h
index 5fcf00f..92a1c94 100644
--- a/webrtc/base/task_queue.h
+++ b/webrtc/base/task_queue.h
@@ -27,6 +27,9 @@
 #endif
 
 #if defined(WEBRTC_BUILD_LIBEVENT)
+#include "webrtc/base/refcountedobject.h"
+#include "webrtc/base/scoped_ref_ptr.h"
+
 struct event_base;
 struct event;
 #endif
@@ -237,11 +240,13 @@
   static void RunTask(int fd, short flags, void* context);       // NOLINT
   static void RunTimer(int fd, short flags, void* context);      // NOLINT
 
+  class ReplyTaskOwner;
   class PostAndReplyTask;
   class SetTimerTask;
 
-  void PrepareReplyTask(PostAndReplyTask* reply_task);
-  void ReplyTaskDone(PostAndReplyTask* reply_task);
+  typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
+
+  void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
 
   struct QueueContext;
 
@@ -252,7 +257,8 @@
   PlatformThread thread_;
   rtc::CriticalSection pending_lock_;
   std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
-  std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
+  std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
+      GUARDED_BY(pending_lock_);
 #elif defined(WEBRTC_MAC)
   struct QueueContext;
   struct TaskContext;
diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc
index 2c7d649..d5a4a6c 100644
--- a/webrtc/base/task_queue_gcd.cc
+++ b/webrtc/base/task_queue_gcd.cc
@@ -69,14 +69,13 @@
                                    std::unique_ptr<QueuedTask> second_task)
       : TaskContext(second_queue_ctx, std::move(second_task)),
         first_queue_ctx(first_queue_ctx),
-        first_task(std::move(first_task)) {
+        first_task(std::move(first_task)),
+        reply_queue_(second_queue_ctx->queue->queue_) {
     // Retain the reply queue for as long as this object lives.
     // If we don't, we may have memory leaks and/or failures.
-    dispatch_retain(first_queue_ctx->queue->queue_);
+    dispatch_retain(reply_queue_);
   }
-  ~PostTaskAndReplyContext() override {
-    dispatch_release(first_queue_ctx->queue->queue_);
-  }
+  ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); }
 
   static void RunTask(void* context) {
     auto* rc = static_cast<PostTaskAndReplyContext*>(context);
@@ -87,11 +86,12 @@
     }
     // Post the reply task.  This hands the work over to the parent struct.
     // This task will eventually delete |this|.
-    dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask);
+    dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask);
   }
 
   QueueContext* const first_queue_ctx;
   std::unique_ptr<QueuedTask> first_task;
+  dispatch_queue_t reply_queue_;
 };
 
 TaskQueue::TaskQueue(const char* queue_name)
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
index c802588..c6ce5b3 100644
--- a/webrtc/base/task_queue_libevent.cc
+++ b/webrtc/base/task_queue_libevent.cc
@@ -11,6 +11,7 @@
 #include "webrtc/base/task_queue.h"
 
 #include <fcntl.h>
+#include <signal.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -27,6 +28,28 @@
 namespace {
 static const char kQuit = 1;
 static const char kRunTask = 2;
+static const char kRunReplyTask = 3;
+
+// This ignores the SIGPIPE signal on the calling thread.
+// This signal can be fired when trying to write() to a pipe that's being
+// closed or while closing a pipe that's being written to.
+// We can run into that situation (e.g. reply tasks that don't get a chance to
+// run because the task queue is being deleted) so we ignore this signal and
+// continue as normal.
+// As a side note for this implementation, it would be great if we could safely
+// restore the sigmask, but unfortunately the operation of restoring it, can
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
+// The SIGPIPE signal by default causes the process to be terminated, so we
+// don't want to risk that.
+// An alternative to this approach is to ignore the signal for the whole
+// process:
+//   signal(SIGPIPE, SIG_IGN);
+void IgnoreSigPipeSignalOnCurrentThread() {
+  sigset_t sigpipe_mask;
+  sigemptyset(&sigpipe_mask);
+  sigaddset(&sigpipe_mask, SIGPIPE);
+  pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
+}
 
 struct TimerEvent {
   explicit TimerEvent(std::unique_ptr<QueuedTask> task)
@@ -71,43 +94,88 @@
   std::list<TimerEvent*> pending_timers_;
 };
 
+// Posting a reply task is tricky business. This class owns the reply task
+// and a reference to it is held by both the reply queue and the first task.
+// Here's an outline of what happens when dealing with a reply task.
+// * The ReplyTaskOwner owns the |reply_| task.
+// * One ref owned by PostAndReplyTask
+// * One ref owned by the reply TaskQueue
+// * ReplyTaskOwner has a flag |run_task_| initially set to false.
+// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
+// * After successfully running the original |task_|, PostAndReplyTask() calls
+//   set_should_run_task(). This sets |run_task_| to true.
+// * In PostAndReplyTask's dtor:
+//   * It releases its reference to ReplyTaskOwner (important to do this first).
+//   * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
+// * PostAndReplyTask doesn't care if write() fails, but when it does:
+//   * The reply queue is gone.
+//   * ReplyTaskOwner has already been deleted and the reply task too.
+// * If write() succeeds:
+//   * ReplyQueue receives the kRunReplyTask message
+//   * Goes through all pending tasks, finding the first that HasOneRef()
+//   * Calls ReplyTaskOwner::Run()
+//     * if set_should_run_task() was called, the reply task will be run
+//   * Release the reference to ReplyTaskOwner
+//   * ReplyTaskOwner and associated |reply_| are deleted.
+class TaskQueue::ReplyTaskOwner {
+ public:
+  ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
+      : reply_(std::move(reply)) {}
+
+  void Run() {
+    RTC_DCHECK(reply_);
+    if (run_task_) {
+      if (!reply_->Run())
+        reply_.release();
+    }
+    reply_.reset();
+  }
+
+  void set_should_run_task() {
+    RTC_DCHECK(!run_task_);
+    run_task_ = true;
+  }
+
+ private:
+  std::unique_ptr<QueuedTask> reply_;
+  bool run_task_ = false;
+};
+
 class TaskQueue::PostAndReplyTask : public QueuedTask {
  public:
   PostAndReplyTask(std::unique_ptr<QueuedTask> task,
                    std::unique_ptr<QueuedTask> reply,
-                   TaskQueue* reply_queue)
+                   TaskQueue* reply_queue,
+                   int reply_pipe)
       : task_(std::move(task)),
-        reply_(std::move(reply)),
-        reply_queue_(reply_queue) {
-    reply_queue->PrepareReplyTask(this);
+        reply_pipe_(reply_pipe),
+        reply_task_owner_(
+            new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
+    reply_queue->PrepareReplyTask(reply_task_owner_);
   }
 
   ~PostAndReplyTask() override {
-    CritScope lock(&lock_);
-    if (reply_queue_)
-      reply_queue_->ReplyTaskDone(this);
-  }
-
-  void OnReplyQueueGone() {
-    CritScope lock(&lock_);
-    reply_queue_ = nullptr;
+    reply_task_owner_ = nullptr;
+    IgnoreSigPipeSignalOnCurrentThread();
+    // Send a signal to the reply queue that the reply task can run now.
+    // Depending on whether |set_should_run_task()| was called by the
+    // PostAndReplyTask(), the reply task may or may not actually run.
+    // In either case, it will be deleted.
+    char message = kRunReplyTask;
+    write(reply_pipe_, &message, sizeof(message));
   }
 
  private:
   bool Run() override {
     if (!task_->Run())
       task_.release();
-
-    CritScope lock(&lock_);
-    if (reply_queue_)
-      reply_queue_->PostTask(std::move(reply_));
+    reply_task_owner_->set_should_run_task();
     return true;
   }
 
-  CriticalSection lock_;
   std::unique_ptr<QueuedTask> task_;
-  std::unique_ptr<QueuedTask> reply_;
-  TaskQueue* reply_queue_ GUARDED_BY(lock_);
+  int reply_pipe_;
+  scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
 };
 
 class TaskQueue::SetTimerTask : public QueuedTask {
@@ -144,6 +212,7 @@
   SetNonBlocking(fds[1]);
   wakeup_pipe_out_ = fds[0];
   wakeup_pipe_in_ = fds[1];
+
   EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
               EV_READ | EV_PERSIST, OnWakeup, this);
   event_add(wakeup_event_.get(), 0);
@@ -165,20 +234,14 @@
   thread_.Stop();
 
   event_del(wakeup_event_.get());
+
+  IgnoreSigPipeSignalOnCurrentThread();
+
   close(wakeup_pipe_in_);
   close(wakeup_pipe_out_);
   wakeup_pipe_in_ = -1;
   wakeup_pipe_out_ = -1;
 
-  {
-    // Synchronize against any pending reply tasks that might be running on
-    // other queues.
-    CritScope lock(&pending_lock_);
-    for (auto* reply : pending_replies_)
-      reply->OnReplyQueueGone();
-    pending_replies_.clear();
-  }
-
   event_base_free(event_base_);
 }
 
@@ -246,7 +309,8 @@
                                  std::unique_ptr<QueuedTask> reply,
                                  TaskQueue* reply_queue) {
   std::unique_ptr<QueuedTask> wrapper_task(
-      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
+                           reply_queue->wakeup_pipe_in_));
   PostTask(std::move(wrapper_task));
 }
 
@@ -296,6 +360,22 @@
         task.release();
       break;
     }
+    case kRunReplyTask: {
+      scoped_refptr<ReplyTaskOwnerRef> reply_task;
+      {
+        CritScope lock(&ctx->queue->pending_lock_);
+        for (auto it = ctx->queue->pending_replies_.begin();
+             it != ctx->queue->pending_replies_.end(); ++it) {
+          if ((*it)->HasOneRef()) {
+            reply_task = std::move(*it);
+            ctx->queue->pending_replies_.erase(it);
+            break;
+          }
+        }
+      }
+      reply_task->Run();
+      break;
+    }
     default:
       RTC_NOTREACHED();
       break;
@@ -320,15 +400,10 @@
   delete timer;
 }
 
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
   RTC_DCHECK(reply_task);
   CritScope lock(&pending_lock_);
-  pending_replies_.push_back(reply_task);
-}
-
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
-  CritScope lock(&pending_lock_);
-  pending_replies_.remove(reply_task);
+  pending_replies_.push_back(std::move(reply_task));
 }
 
 }  // namespace rtc
diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc
index 74433b9..4389d35 100644
--- a/webrtc/base/task_queue_unittest.cc
+++ b/webrtc/base/task_queue_unittest.cc
@@ -37,22 +37,21 @@
 
 TEST(TaskQueueTest, PostAndCheckCurrent) {
   static const char kQueueName[] = "PostAndCheckCurrent";
+  Event event(false, false);
   TaskQueue queue(kQueueName);
 
   // We're not running a task, so there shouldn't be a current queue.
   EXPECT_FALSE(queue.IsCurrent());
   EXPECT_FALSE(TaskQueue::Current());
 
-  Event event(false, false);
   queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue));
   EXPECT_TRUE(event.Wait(1000));
 }
 
 TEST(TaskQueueTest, PostCustomTask) {
   static const char kQueueName[] = "PostCustomImplementation";
-  TaskQueue queue(kQueueName);
-
   Event event(false, false);
+  TaskQueue queue(kQueueName);
 
   class CustomTask : public QueuedTask {
    public:
@@ -74,18 +73,18 @@
 
 TEST(TaskQueueTest, PostLambda) {
   static const char kQueueName[] = "PostLambda";
+  Event event(false, false);
   TaskQueue queue(kQueueName);
 
-  Event event(false, false);
   queue.PostTask([&event]() { event.Set(); });
   EXPECT_TRUE(event.Wait(1000));
 }
 
 TEST(TaskQueueTest, PostFromQueue) {
   static const char kQueueName[] = "PostFromQueue";
+  Event event(false, false);
   TaskQueue queue(kQueueName);
 
-  Event event(false, false);
   queue.PostTask(
       [&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); });
   EXPECT_TRUE(event.Wait(1000));
@@ -93,9 +92,9 @@
 
 TEST(TaskQueueTest, PostDelayed) {
   static const char kQueueName[] = "PostDelayed";
+  Event event(false, false);
   TaskQueue queue(kQueueName);
 
-  Event event(false, false);
   uint32_t start = Time();
   queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
   EXPECT_TRUE(event.Wait(1000));
@@ -135,10 +134,10 @@
 TEST(TaskQueueTest, PostAndReply) {
   static const char kPostQueue[] = "PostQueue";
   static const char kReplyQueue[] = "ReplyQueue";
+  Event event(false, false);
   TaskQueue post_queue(kPostQueue);
   TaskQueue reply_queue(kReplyQueue);
 
-  Event event(false, false);
   post_queue.PostTaskAndReply(
       Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue),
       Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue);
@@ -148,6 +147,7 @@
 TEST(TaskQueueTest, PostAndReuse) {
   static const char kPostQueue[] = "PostQueue";
   static const char kReplyQueue[] = "ReplyQueue";
+  Event event(false, false);
   TaskQueue post_queue(kPostQueue);
   TaskQueue reply_queue(kReplyQueue);
 
@@ -184,7 +184,6 @@
     Event* const event_;
   };
 
-  Event event(false, false);
   std::unique_ptr<QueuedTask> task(
       new ReusedTask(&call_count, &reply_queue, &event));
 
@@ -195,10 +194,10 @@
 TEST(TaskQueueTest, PostAndReplyLambda) {
   static const char kPostQueue[] = "PostQueue";
   static const char kReplyQueue[] = "ReplyQueue";
+  Event event(false, false);
   TaskQueue post_queue(kPostQueue);
   TaskQueue reply_queue(kReplyQueue);
 
-  Event event(false, false);
   bool my_flag = false;
   post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; },
                               [&event]() { event.Set(); }, &reply_queue);
@@ -206,6 +205,21 @@
   EXPECT_TRUE(my_flag);
 }
 
+// This test covers a particular bug that we had in the libevent implementation
+// where we could hit a deadlock while trying to post a reply task to a queue
+// that was being deleted.  The test isn't guaranteed to hit that case but it's
+// written in a way that makes it likely and by running with --gtest_repeat=1000
+// the bug would occur. Alas, now it should be fixed.
+TEST(TaskQueueTest, PostAndReplyDeadlock) {
+  Event event(false, false);
+  TaskQueue post_queue("PostQueue");
+  TaskQueue reply_queue("ReplyQueue");
+
+  post_queue.PostTaskAndReply([&event]() { event.Set(); }, []() {},
+                              &reply_queue);
+  EXPECT_TRUE(event.Wait(1000));
+}
+
 void TestPostTaskAndReply(TaskQueue* work_queue,
                           const char* work_queue_name,
                           Event* event) {
@@ -220,10 +234,10 @@
 TEST(TaskQueueTest, PostAndReply2) {
   static const char kQueueName[] = "PostAndReply2";
   static const char kWorkQueueName[] = "PostAndReply2_Worker";
+  Event event(false, false);
   TaskQueue queue(kQueueName);
   TaskQueue work_queue(kWorkQueueName);
 
-  Event event(false, false);
   queue.PostTask(
       Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event));
   EXPECT_TRUE(event.Wait(1000));