Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation
BUG=webrtc:7188
Review-Url: https://codereview.webrtc.org/2709603002
Cr-Commit-Position: refs/heads/master@{#16786}
diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h
index 5fcf00f..92a1c94 100644
--- a/webrtc/base/task_queue.h
+++ b/webrtc/base/task_queue.h
@@ -27,6 +27,9 @@
#endif
#if defined(WEBRTC_BUILD_LIBEVENT)
+#include "webrtc/base/refcountedobject.h"
+#include "webrtc/base/scoped_ref_ptr.h"
+
struct event_base;
struct event;
#endif
@@ -237,11 +240,13 @@
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
+ class ReplyTaskOwner;
class PostAndReplyTask;
class SetTimerTask;
- void PrepareReplyTask(PostAndReplyTask* reply_task);
- void ReplyTaskDone(PostAndReplyTask* reply_task);
+ typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
+
+ void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
struct QueueContext;
@@ -252,7 +257,8 @@
PlatformThread thread_;
rtc::CriticalSection pending_lock_;
std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
- std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
+ std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
+ GUARDED_BY(pending_lock_);
#elif defined(WEBRTC_MAC)
struct QueueContext;
struct TaskContext;
diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc
index 2c7d649..d5a4a6c 100644
--- a/webrtc/base/task_queue_gcd.cc
+++ b/webrtc/base/task_queue_gcd.cc
@@ -69,14 +69,13 @@
std::unique_ptr<QueuedTask> second_task)
: TaskContext(second_queue_ctx, std::move(second_task)),
first_queue_ctx(first_queue_ctx),
- first_task(std::move(first_task)) {
+ first_task(std::move(first_task)),
+ reply_queue_(second_queue_ctx->queue->queue_) {
// Retain the reply queue for as long as this object lives.
// If we don't, we may have memory leaks and/or failures.
- dispatch_retain(first_queue_ctx->queue->queue_);
+ dispatch_retain(reply_queue_);
}
- ~PostTaskAndReplyContext() override {
- dispatch_release(first_queue_ctx->queue->queue_);
- }
+ ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); }
static void RunTask(void* context) {
auto* rc = static_cast<PostTaskAndReplyContext*>(context);
@@ -87,11 +86,12 @@
}
// Post the reply task. This hands the work over to the parent struct.
// This task will eventually delete |this|.
- dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask);
+ dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask);
}
QueueContext* const first_queue_ctx;
std::unique_ptr<QueuedTask> first_task;
+ dispatch_queue_t reply_queue_;
};
TaskQueue::TaskQueue(const char* queue_name)
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
index c802588..c6ce5b3 100644
--- a/webrtc/base/task_queue_libevent.cc
+++ b/webrtc/base/task_queue_libevent.cc
@@ -11,6 +11,7 @@
#include "webrtc/base/task_queue.h"
#include <fcntl.h>
+#include <signal.h>
#include <string.h>
#include <unistd.h>
@@ -27,6 +28,28 @@
namespace {
static const char kQuit = 1;
static const char kRunTask = 2;
+static const char kRunReplyTask = 3;
+
+// This ignores the SIGPIPE signal on the calling thread.
+// This signal can be fired when trying to write() to a pipe that's being
+// closed or while closing a pipe that's being written to.
+// We can run into that situation (e.g. reply tasks that don't get a chance to
+// run because the task queue is being deleted) so we ignore this signal and
+// continue as normal.
+// As a side note for this implementation, it would be great if we could safely
+// restore the sigmask, but unfortunately the operation of restoring it, can
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
+// The SIGPIPE signal by default causes the process to be terminated, so we
+// don't want to risk that.
+// An alternative to this approach is to ignore the signal for the whole
+// process:
+// signal(SIGPIPE, SIG_IGN);
+void IgnoreSigPipeSignalOnCurrentThread() {
+ sigset_t sigpipe_mask;
+ sigemptyset(&sigpipe_mask);
+ sigaddset(&sigpipe_mask, SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
+}
struct TimerEvent {
explicit TimerEvent(std::unique_ptr<QueuedTask> task)
@@ -71,43 +94,88 @@
std::list<TimerEvent*> pending_timers_;
};
+// Posting a reply task is tricky business. This class owns the reply task
+// and a reference to it is held by both the reply queue and the first task.
+// Here's an outline of what happens when dealing with a reply task.
+// * The ReplyTaskOwner owns the |reply_| task.
+// * One ref owned by PostAndReplyTask
+// * One ref owned by the reply TaskQueue
+// * ReplyTaskOwner has a flag |run_task_| initially set to false.
+// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
+// * After successfully running the original |task_|, PostAndReplyTask() calls
+// set_should_run_task(). This sets |run_task_| to true.
+// * In PostAndReplyTask's dtor:
+// * It releases its reference to ReplyTaskOwner (important to do this first).
+// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
+// * PostAndReplyTask doesn't care if write() fails, but when it does:
+// * The reply queue is gone.
+// * ReplyTaskOwner has already been deleted and the reply task too.
+// * If write() succeeds:
+// * ReplyQueue receives the kRunReplyTask message
+// * Goes through all pending tasks, finding the first that HasOneRef()
+// * Calls ReplyTaskOwner::Run()
+// * if set_should_run_task() was called, the reply task will be run
+// * Release the reference to ReplyTaskOwner
+// * ReplyTaskOwner and associated |reply_| are deleted.
+class TaskQueue::ReplyTaskOwner {
+ public:
+ ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
+ : reply_(std::move(reply)) {}
+
+ void Run() {
+ RTC_DCHECK(reply_);
+ if (run_task_) {
+ if (!reply_->Run())
+ reply_.release();
+ }
+ reply_.reset();
+ }
+
+ void set_should_run_task() {
+ RTC_DCHECK(!run_task_);
+ run_task_ = true;
+ }
+
+ private:
+ std::unique_ptr<QueuedTask> reply_;
+ bool run_task_ = false;
+};
+
class TaskQueue::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue)
+ TaskQueue* reply_queue,
+ int reply_pipe)
: task_(std::move(task)),
- reply_(std::move(reply)),
- reply_queue_(reply_queue) {
- reply_queue->PrepareReplyTask(this);
+ reply_pipe_(reply_pipe),
+ reply_task_owner_(
+ new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
+ reply_queue->PrepareReplyTask(reply_task_owner_);
}
~PostAndReplyTask() override {
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->ReplyTaskDone(this);
- }
-
- void OnReplyQueueGone() {
- CritScope lock(&lock_);
- reply_queue_ = nullptr;
+ reply_task_owner_ = nullptr;
+ IgnoreSigPipeSignalOnCurrentThread();
+ // Send a signal to the reply queue that the reply task can run now.
+ // Depending on whether |set_should_run_task()| was called by the
+ // PostAndReplyTask(), the reply task may or may not actually run.
+ // In either case, it will be deleted.
+ char message = kRunReplyTask;
+ write(reply_pipe_, &message, sizeof(message));
}
private:
bool Run() override {
if (!task_->Run())
task_.release();
-
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->PostTask(std::move(reply_));
+ reply_task_owner_->set_should_run_task();
return true;
}
- CriticalSection lock_;
std::unique_ptr<QueuedTask> task_;
- std::unique_ptr<QueuedTask> reply_;
- TaskQueue* reply_queue_ GUARDED_BY(lock_);
+ int reply_pipe_;
+ scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
class TaskQueue::SetTimerTask : public QueuedTask {
@@ -144,6 +212,7 @@
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
+
EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this);
event_add(wakeup_event_.get(), 0);
@@ -165,20 +234,14 @@
thread_.Stop();
event_del(wakeup_event_.get());
+
+ IgnoreSigPipeSignalOnCurrentThread();
+
close(wakeup_pipe_in_);
close(wakeup_pipe_out_);
wakeup_pipe_in_ = -1;
wakeup_pipe_out_ = -1;
- {
- // Synchronize against any pending reply tasks that might be running on
- // other queues.
- CritScope lock(&pending_lock_);
- for (auto* reply : pending_replies_)
- reply->OnReplyQueueGone();
- pending_replies_.clear();
- }
-
event_base_free(event_base_);
}
@@ -246,7 +309,8 @@
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+ new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
+ reply_queue->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
@@ -296,6 +360,22 @@
task.release();
break;
}
+ case kRunReplyTask: {
+ scoped_refptr<ReplyTaskOwnerRef> reply_task;
+ {
+ CritScope lock(&ctx->queue->pending_lock_);
+ for (auto it = ctx->queue->pending_replies_.begin();
+ it != ctx->queue->pending_replies_.end(); ++it) {
+ if ((*it)->HasOneRef()) {
+ reply_task = std::move(*it);
+ ctx->queue->pending_replies_.erase(it);
+ break;
+ }
+ }
+ }
+ reply_task->Run();
+ break;
+ }
default:
RTC_NOTREACHED();
break;
@@ -320,15 +400,10 @@
delete timer;
}
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
- pending_replies_.push_back(reply_task);
-}
-
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
- CritScope lock(&pending_lock_);
- pending_replies_.remove(reply_task);
+ pending_replies_.push_back(std::move(reply_task));
}
} // namespace rtc
diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc
index 74433b9..4389d35 100644
--- a/webrtc/base/task_queue_unittest.cc
+++ b/webrtc/base/task_queue_unittest.cc
@@ -37,22 +37,21 @@
TEST(TaskQueueTest, PostAndCheckCurrent) {
static const char kQueueName[] = "PostAndCheckCurrent";
+ Event event(false, false);
TaskQueue queue(kQueueName);
// We're not running a task, so there shouldn't be a current queue.
EXPECT_FALSE(queue.IsCurrent());
EXPECT_FALSE(TaskQueue::Current());
- Event event(false, false);
queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue));
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostCustomTask) {
static const char kQueueName[] = "PostCustomImplementation";
- TaskQueue queue(kQueueName);
-
Event event(false, false);
+ TaskQueue queue(kQueueName);
class CustomTask : public QueuedTask {
public:
@@ -74,18 +73,18 @@
TEST(TaskQueueTest, PostLambda) {
static const char kQueueName[] = "PostLambda";
+ Event event(false, false);
TaskQueue queue(kQueueName);
- Event event(false, false);
queue.PostTask([&event]() { event.Set(); });
EXPECT_TRUE(event.Wait(1000));
}
TEST(TaskQueueTest, PostFromQueue) {
static const char kQueueName[] = "PostFromQueue";
+ Event event(false, false);
TaskQueue queue(kQueueName);
- Event event(false, false);
queue.PostTask(
[&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); });
EXPECT_TRUE(event.Wait(1000));
@@ -93,9 +92,9 @@
TEST(TaskQueueTest, PostDelayed) {
static const char kQueueName[] = "PostDelayed";
+ Event event(false, false);
TaskQueue queue(kQueueName);
- Event event(false, false);
uint32_t start = Time();
queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
EXPECT_TRUE(event.Wait(1000));
@@ -135,10 +134,10 @@
TEST(TaskQueueTest, PostAndReply) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
+ Event event(false, false);
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
- Event event(false, false);
post_queue.PostTaskAndReply(
Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue),
Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue);
@@ -148,6 +147,7 @@
TEST(TaskQueueTest, PostAndReuse) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
+ Event event(false, false);
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
@@ -184,7 +184,6 @@
Event* const event_;
};
- Event event(false, false);
std::unique_ptr<QueuedTask> task(
new ReusedTask(&call_count, &reply_queue, &event));
@@ -195,10 +194,10 @@
TEST(TaskQueueTest, PostAndReplyLambda) {
static const char kPostQueue[] = "PostQueue";
static const char kReplyQueue[] = "ReplyQueue";
+ Event event(false, false);
TaskQueue post_queue(kPostQueue);
TaskQueue reply_queue(kReplyQueue);
- Event event(false, false);
bool my_flag = false;
post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; },
[&event]() { event.Set(); }, &reply_queue);
@@ -206,6 +205,21 @@
EXPECT_TRUE(my_flag);
}
+// This test covers a particular bug that we had in the libevent implementation
+// where we could hit a deadlock while trying to post a reply task to a queue
+// that was being deleted. The test isn't guaranteed to hit that case but it's
+// written in a way that makes it likely and by running with --gtest_repeat=1000
+// the bug would occur. Alas, now it should be fixed.
+TEST(TaskQueueTest, PostAndReplyDeadlock) {
+ Event event(false, false);
+ TaskQueue post_queue("PostQueue");
+ TaskQueue reply_queue("ReplyQueue");
+
+ post_queue.PostTaskAndReply([&event]() { event.Set(); }, []() {},
+ &reply_queue);
+ EXPECT_TRUE(event.Wait(1000));
+}
+
void TestPostTaskAndReply(TaskQueue* work_queue,
const char* work_queue_name,
Event* event) {
@@ -220,10 +234,10 @@
TEST(TaskQueueTest, PostAndReply2) {
static const char kQueueName[] = "PostAndReply2";
static const char kWorkQueueName[] = "PostAndReply2_Worker";
+ Event event(false, false);
TaskQueue queue(kQueueName);
TaskQueue work_queue(kWorkQueueName);
- Event event(false, false);
queue.PostTask(
Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event));
EXPECT_TRUE(event.Wait(1000));