blob: 4f56400741e7a28c036a47d0f975e9e7ce51d940 [file] [log] [blame]
tommic06b1332016-05-14 18:31:401/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Danil Chapovaloveb175242019-02-12 09:44:3811#include "rtc_base/task_queue_libevent.h"
tommic06b1332016-05-14 18:31:4012
Yves Gerey988cc082018-10-23 10:03:0113#include <errno.h>
tommic06b1332016-05-14 18:31:4014#include <fcntl.h>
Yves Gerey988cc082018-10-23 10:03:0115#include <pthread.h>
tommi8c80c6e2017-02-23 08:34:5216#include <signal.h>
Yves Gerey988cc082018-10-23 10:03:0117#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 18:31:4019#include <unistd.h>
Jonas Olssona4d87372019-07-05 17:08:3320
Danil Chapovalov02fddf62018-02-12 11:41:1621#include <list>
Yves Gerey988cc082018-10-23 10:03:0122#include <memory>
23#include <type_traits>
24#include <utility>
tommic06b1332016-05-14 18:31:4025
Steve Anton9a83dd72020-01-09 19:03:2526#include "absl/container/inlined_vector.h"
Danil Chapovaloveb175242019-02-12 09:44:3827#include "absl/strings/string_view.h"
28#include "api/task_queue/queued_task.h"
29#include "api/task_queue/task_queue_base.h"
tommic06b1332016-05-14 18:31:4030#include "base/third_party/libevent/event.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3131#include "rtc_base/checks.h"
32#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 09:42:2633#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3134#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 10:03:0135#include "rtc_base/platform_thread_types.h"
Markus Handell18523c32020-07-08 15:55:5836#include "rtc_base/synchronization/mutex.h"
Yves Gerey988cc082018-10-23 10:03:0137#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 17:11:0038#include "rtc_base/time_utils.h"
tommic06b1332016-05-14 18:31:4039
Danil Chapovaloveb175242019-02-12 09:44:3840namespace webrtc {
tommic06b1332016-05-14 18:31:4041namespace {
Danil Chapovaloveb175242019-02-12 09:44:3842constexpr char kQuit = 1;
Steve Anton9a83dd72020-01-09 19:03:2543constexpr char kRunTasks = 2;
tommi8c80c6e2017-02-23 08:34:5244
Danil Chapovaloveb175242019-02-12 09:44:3845using Priority = TaskQueueFactory::Priority;
tommic9bb7912017-02-24 18:42:1446
tommi8c80c6e2017-02-23 08:34:5247// This ignores the SIGPIPE signal on the calling thread.
48// This signal can be fired when trying to write() to a pipe that's being
49// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 14:46:5850// We can run into that situation so we ignore this signal and continue as
51// normal.
tommi8c80c6e2017-02-23 08:34:5252// As a side note for this implementation, it would be great if we could safely
53// restore the sigmask, but unfortunately the operation of restoring it, can
54// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
55// The SIGPIPE signal by default causes the process to be terminated, so we
56// don't want to risk that.
57// An alternative to this approach is to ignore the signal for the whole
58// process:
59// signal(SIGPIPE, SIG_IGN);
60void IgnoreSigPipeSignalOnCurrentThread() {
61 sigset_t sigpipe_mask;
62 sigemptyset(&sigpipe_mask);
63 sigaddset(&sigpipe_mask, SIGPIPE);
64 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
65}
tommic06b1332016-05-14 18:31:4066
tommic06b1332016-05-14 18:31:4067bool SetNonBlocking(int fd) {
68 const int flags = fcntl(fd, F_GETFL);
69 RTC_CHECK(flags != -1);
70 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
71}
tommi1666b612016-07-13 17:58:1272
73// TODO(tommi): This is a hack to support two versions of libevent that we're
74// compatible with. The method we really want to call is event_assign(),
75// since event_set() has been marked as deprecated (and doesn't accept
76// passing event_base__ as a parameter). However, the version of libevent
77// that we have in Chromium, doesn't have event_assign(), so we need to call
78// event_set() there.
79void EventAssign(struct event* ev,
80 struct event_base* base,
81 int fd,
82 short events,
83 void (*callback)(int, short, void*),
84 void* arg) {
85#if defined(_EVENT2_EVENT_H_)
86 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
87#else
88 event_set(ev, fd, events, callback, arg);
89 RTC_CHECK_EQ(0, event_base_set(base, ev));
90#endif
91}
tommic9bb7912017-02-24 18:42:1492
Danil Chapovaloveb175242019-02-12 09:44:3893rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
tommic9bb7912017-02-24 18:42:1494 switch (priority) {
95 case Priority::HIGH:
Markus Handellad5037b2021-05-07 13:02:3696 return rtc::ThreadPriority::kRealtime;
tommic9bb7912017-02-24 18:42:1497 case Priority::LOW:
Markus Handellad5037b2021-05-07 13:02:3698 return rtc::ThreadPriority::kLow;
tommic9bb7912017-02-24 18:42:1499 case Priority::NORMAL:
Markus Handellad5037b2021-05-07 13:02:36100 return rtc::ThreadPriority::kNormal;
tommic9bb7912017-02-24 18:42:14101 }
tommic9bb7912017-02-24 18:42:14102}
tommic06b1332016-05-14 18:31:40103
Danil Chapovaloveb175242019-02-12 09:44:38104class TaskQueueLibevent final : public TaskQueueBase {
perkj650fdae2017-08-25 12:00:11105 public:
Danil Chapovaloveb175242019-02-12 09:44:38106 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
perkj650fdae2017-08-25 12:00:11107
Danil Chapovaloveb175242019-02-12 09:44:38108 void Delete() override;
109 void PostTask(std::unique_ptr<QueuedTask> task) override;
110 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
111 uint32_t milliseconds) override;
perkj650fdae2017-08-25 12:00:11112
113 private:
Danil Chapovaloveb175242019-02-12 09:44:38114 class SetTimerTask;
115 struct TimerEvent;
116
117 ~TaskQueueLibevent() override = default;
118
perkj650fdae2017-08-25 12:00:11119 static void OnWakeup(int socket, short flags, void* context); // NOLINT
perkj650fdae2017-08-25 12:00:11120 static void RunTimer(int fd, short flags, void* context); // NOLINT
121
Danil Chapovaloveb175242019-02-12 09:44:38122 bool is_active_ = true;
perkj650fdae2017-08-25 12:00:11123 int wakeup_pipe_in_ = -1;
124 int wakeup_pipe_out_ = -1;
125 event_base* event_base_;
Danil Chapovaloveb175242019-02-12 09:44:38126 event wakeup_event_;
127 rtc::PlatformThread thread_;
Markus Handell18523c32020-07-08 15:55:58128 Mutex pending_lock_;
Steve Anton9a83dd72020-01-09 19:03:25129 absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
130 RTC_GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 18:31:40131 // Holds a list of events pending timers for cleanup when the loop exits.
132 std::list<TimerEvent*> pending_timers_;
133};
134
Danil Chapovaloveb175242019-02-12 09:44:38135struct TaskQueueLibevent::TimerEvent {
136 TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
137 : task_queue(task_queue), task(std::move(task)) {}
138 ~TimerEvent() { event_del(&ev); }
139
140 event ev;
141 TaskQueueLibevent* task_queue;
142 std::unique_ptr<QueuedTask> task;
143};
144
145class TaskQueueLibevent::SetTimerTask : public QueuedTask {
tommic06b1332016-05-14 18:31:40146 public:
147 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
148 : task_(std::move(task)),
149 milliseconds_(milliseconds),
Danil Chapovaloveb175242019-02-12 09:44:38150 posted_(rtc::Time32()) {}
tommic06b1332016-05-14 18:31:40151
152 private:
153 bool Run() override {
154 // Compensate for the time that has passed since construction
155 // and until we got here.
Danil Chapovaloveb175242019-02-12 09:44:38156 uint32_t post_time = rtc::Time32() - posted_;
157 TaskQueueLibevent::Current()->PostDelayedTask(
tommic06b1332016-05-14 18:31:40158 std::move(task_),
159 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
160 return true;
161 }
162
163 std::unique_ptr<QueuedTask> task_;
164 const uint32_t milliseconds_;
165 const uint32_t posted_;
166};
167
Danil Chapovaloveb175242019-02-12 09:44:38168TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
169 rtc::ThreadPriority priority)
Markus Handellad5037b2021-05-07 13:02:36170 : event_base_(event_base_new()) {
tommic06b1332016-05-14 18:31:40171 int fds[2];
172 RTC_CHECK(pipe(fds) == 0);
173 SetNonBlocking(fds[0]);
174 SetNonBlocking(fds[1]);
175 wakeup_pipe_out_ = fds[0];
176 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 08:34:52177
Danil Chapovaloveb175242019-02-12 09:44:38178 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
tommi1666b612016-07-13 17:58:12179 EV_READ | EV_PERSIST, OnWakeup, this);
Danil Chapovaloveb175242019-02-12 09:44:38180 event_add(&wakeup_event_, 0);
Markus Handellad5037b2021-05-07 13:02:36181 thread_ = rtc::PlatformThread::SpawnJoinable(
182 [this] {
183 {
184 CurrentTaskQueueSetter set_current(this);
185 while (is_active_)
186 event_base_loop(event_base_, 0);
187 }
188
189 for (TimerEvent* timer : pending_timers_)
190 delete timer;
191 },
192 queue_name, rtc::ThreadAttributes().SetPriority(priority));
tommic06b1332016-05-14 18:31:40193}
194
Danil Chapovaloveb175242019-02-12 09:44:38195void TaskQueueLibevent::Delete() {
tommic06b1332016-05-14 18:31:40196 RTC_DCHECK(!IsCurrent());
197 struct timespec ts;
198 char message = kQuit;
199 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
200 // The queue is full, so we have no choice but to wait and retry.
201 RTC_CHECK_EQ(EAGAIN, errno);
202 ts.tv_sec = 0;
203 ts.tv_nsec = 1000000;
204 nanosleep(&ts, nullptr);
205 }
206
Markus Handellad5037b2021-05-07 13:02:36207 thread_.Finalize();
tommic06b1332016-05-14 18:31:40208
Danil Chapovaloveb175242019-02-12 09:44:38209 event_del(&wakeup_event_);
tommi8c80c6e2017-02-23 08:34:52210
211 IgnoreSigPipeSignalOnCurrentThread();
212
tommic06b1332016-05-14 18:31:40213 close(wakeup_pipe_in_);
214 close(wakeup_pipe_out_);
215 wakeup_pipe_in_ = -1;
216 wakeup_pipe_out_ = -1;
217
tommic06b1332016-05-14 18:31:40218 event_base_free(event_base_);
Danil Chapovaloveb175242019-02-12 09:44:38219 delete this;
tommic06b1332016-05-14 18:31:40220}
221
Danil Chapovaloveb175242019-02-12 09:44:38222void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
Danil Chapovalov00e71ef2019-06-11 16:01:56223 {
Markus Handell18523c32020-07-08 15:55:58224 MutexLock lock(&pending_lock_);
Steve Anton9a83dd72020-01-09 19:03:25225 bool had_pending_tasks = !pending_.empty();
Danil Chapovalov00e71ef2019-06-11 16:01:56226 pending_.push_back(std::move(task));
Steve Anton9a83dd72020-01-09 19:03:25227
228 // Only write to the pipe if there were no pending tasks before this one
229 // since the thread could be sleeping. If there were already pending tasks
230 // then we know there's either a pending write in the pipe or the thread has
231 // not yet processed the pending tasks. In either case, the thread will
232 // eventually wake up and process all pending tasks including this one.
233 if (had_pending_tasks) {
234 return;
235 }
Danil Chapovalov00e71ef2019-06-11 16:01:56236 }
Steve Anton9a83dd72020-01-09 19:03:25237
238 // Note: This behvior outlined above ensures we never fill up the pipe write
239 // buffer since there will only ever be 1 byte pending.
240 char message = kRunTasks;
241 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
242 sizeof(message));
tommic06b1332016-05-14 18:31:40243}
244
Danil Chapovaloveb175242019-02-12 09:44:38245void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
246 uint32_t milliseconds) {
tommic06b1332016-05-14 18:31:40247 if (IsCurrent()) {
Danil Chapovaloveb175242019-02-12 09:44:38248 TimerEvent* timer = new TimerEvent(this, std::move(task));
249 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
perkj650fdae2017-08-25 12:00:11250 timer);
Danil Chapovaloveb175242019-02-12 09:44:38251 pending_timers_.push_back(timer);
kwiberg5b9746e2017-08-16 11:52:35252 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
253 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
tommic06b1332016-05-14 18:31:40254 event_add(&timer->ev, &tv);
255 } else {
Mirko Bonadei317a1f02019-09-17 15:06:18256 PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
tommic06b1332016-05-14 18:31:40257 }
258}
259
tommic06b1332016-05-14 18:31:40260// static
Danil Chapovaloveb175242019-02-12 09:44:38261void TaskQueueLibevent::OnWakeup(int socket,
262 short flags, // NOLINT
263 void* context) {
264 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
265 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
tommic06b1332016-05-14 18:31:40266 char buf;
267 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
268 switch (buf) {
269 case kQuit:
Danil Chapovaloveb175242019-02-12 09:44:38270 me->is_active_ = false;
271 event_base_loopbreak(me->event_base_);
tommic06b1332016-05-14 18:31:40272 break;
Steve Anton9a83dd72020-01-09 19:03:25273 case kRunTasks: {
274 absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
tommic06b1332016-05-14 18:31:40275 {
Markus Handell18523c32020-07-08 15:55:58276 MutexLock lock(&me->pending_lock_);
Steve Anton9a83dd72020-01-09 19:03:25277 tasks.swap(me->pending_);
tommic06b1332016-05-14 18:31:40278 }
Steve Anton9a83dd72020-01-09 19:03:25279 RTC_DCHECK(!tasks.empty());
280 for (auto& task : tasks) {
281 if (task->Run()) {
282 task.reset();
283 } else {
Artem Titov96e3b992021-07-26 14:03:14284 // `false` means the task should *not* be deleted.
Steve Anton9a83dd72020-01-09 19:03:25285 task.release();
286 }
287 }
tommic06b1332016-05-14 18:31:40288 break;
289 }
290 default:
Artem Titovd3251962021-11-15 15:57:07291 RTC_DCHECK_NOTREACHED();
tommic06b1332016-05-14 18:31:40292 break;
293 }
294}
295
296// static
Danil Chapovaloveb175242019-02-12 09:44:38297void TaskQueueLibevent::RunTimer(int fd,
298 short flags, // NOLINT
299 void* context) {
tommic06b1332016-05-14 18:31:40300 TimerEvent* timer = static_cast<TimerEvent*>(context);
301 if (!timer->task->Run())
302 timer->task.release();
Danil Chapovaloveb175242019-02-12 09:44:38303 timer->task_queue->pending_timers_.remove(timer);
tommic06b1332016-05-14 18:31:40304 delete timer;
305}
306
Danil Chapovaloveb175242019-02-12 09:44:38307class TaskQueueLibeventFactory final : public TaskQueueFactory {
308 public:
309 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310 absl::string_view name,
311 Priority priority) const override {
312 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313 new TaskQueueLibevent(name,
314 TaskQueuePriorityToThreadPriority(priority)));
315 }
316};
317
318} // namespace
319
320std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
Mirko Bonadei317a1f02019-09-17 15:06:18321 return std::make_unique<TaskQueueLibeventFactory>();
perkj650fdae2017-08-25 12:00:11322}
323
Danil Chapovaloveb175242019-02-12 09:44:38324} // namespace webrtc