blob: e27c6eff9fc74bda0a103f62244b9cbad36e6149 [file] [log] [blame]
tommic06b1332016-05-14 18:31:401/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Danil Chapovaloveb175242019-02-12 09:44:3811#include "rtc_base/task_queue_libevent.h"
tommic06b1332016-05-14 18:31:4012
Yves Gerey988cc082018-10-23 10:03:0113#include <errno.h>
tommic06b1332016-05-14 18:31:4014#include <fcntl.h>
Yves Gerey988cc082018-10-23 10:03:0115#include <pthread.h>
tommi8c80c6e2017-02-23 08:34:5216#include <signal.h>
Yves Gerey988cc082018-10-23 10:03:0117#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 18:31:4019#include <unistd.h>
Jonas Olssona4d87372019-07-05 17:08:3320
Danil Chapovalov02fddf62018-02-12 11:41:1621#include <list>
Yves Gerey988cc082018-10-23 10:03:0122#include <memory>
23#include <type_traits>
24#include <utility>
tommic06b1332016-05-14 18:31:4025
Steve Anton9a83dd72020-01-09 19:03:2526#include "absl/container/inlined_vector.h"
Danil Chapovalov30c2a312022-07-19 12:12:4327#include "absl/functional/any_invocable.h"
Danil Chapovaloveb175242019-02-12 09:44:3828#include "absl/strings/string_view.h"
Danil Chapovaloveb175242019-02-12 09:44:3829#include "api/task_queue/task_queue_base.h"
Danil Chapovalov30c2a312022-07-19 12:12:4330#include "api/units/time_delta.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3131#include "rtc_base/checks.h"
32#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 09:42:2633#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3134#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 10:03:0135#include "rtc_base/platform_thread_types.h"
Markus Handell18523c32020-07-08 15:55:5836#include "rtc_base/synchronization/mutex.h"
Yves Gerey988cc082018-10-23 10:03:0137#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 17:11:0038#include "rtc_base/time_utils.h"
Byoungchan Leed69a7262022-06-23 13:06:0039#include "third_party/libevent/event.h"
tommic06b1332016-05-14 18:31:4040
Danil Chapovaloveb175242019-02-12 09:44:3841namespace webrtc {
tommic06b1332016-05-14 18:31:4042namespace {
Danil Chapovaloveb175242019-02-12 09:44:3843constexpr char kQuit = 1;
Steve Anton9a83dd72020-01-09 19:03:2544constexpr char kRunTasks = 2;
tommi8c80c6e2017-02-23 08:34:5245
Danil Chapovaloveb175242019-02-12 09:44:3846using Priority = TaskQueueFactory::Priority;
tommic9bb7912017-02-24 18:42:1447
tommi8c80c6e2017-02-23 08:34:5248// This ignores the SIGPIPE signal on the calling thread.
49// This signal can be fired when trying to write() to a pipe that's being
50// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 14:46:5851// We can run into that situation so we ignore this signal and continue as
52// normal.
tommi8c80c6e2017-02-23 08:34:5253// As a side note for this implementation, it would be great if we could safely
54// restore the sigmask, but unfortunately the operation of restoring it, can
55// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56// The SIGPIPE signal by default causes the process to be terminated, so we
57// don't want to risk that.
58// An alternative to this approach is to ignore the signal for the whole
59// process:
60// signal(SIGPIPE, SIG_IGN);
61void IgnoreSigPipeSignalOnCurrentThread() {
62 sigset_t sigpipe_mask;
63 sigemptyset(&sigpipe_mask);
64 sigaddset(&sigpipe_mask, SIGPIPE);
65 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66}
tommic06b1332016-05-14 18:31:4067
tommic06b1332016-05-14 18:31:4068bool SetNonBlocking(int fd) {
69 const int flags = fcntl(fd, F_GETFL);
70 RTC_CHECK(flags != -1);
71 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72}
tommi1666b612016-07-13 17:58:1273
74// TODO(tommi): This is a hack to support two versions of libevent that we're
75// compatible with. The method we really want to call is event_assign(),
76// since event_set() has been marked as deprecated (and doesn't accept
77// passing event_base__ as a parameter). However, the version of libevent
78// that we have in Chromium, doesn't have event_assign(), so we need to call
79// event_set() there.
80void EventAssign(struct event* ev,
81 struct event_base* base,
82 int fd,
83 short events,
84 void (*callback)(int, short, void*),
85 void* arg) {
86#if defined(_EVENT2_EVENT_H_)
87 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88#else
89 event_set(ev, fd, events, callback, arg);
90 RTC_CHECK_EQ(0, event_base_set(base, ev));
91#endif
92}
tommic9bb7912017-02-24 18:42:1493
Danil Chapovaloveb175242019-02-12 09:44:3894rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
tommic9bb7912017-02-24 18:42:1495 switch (priority) {
96 case Priority::HIGH:
Markus Handellad5037b2021-05-07 13:02:3697 return rtc::ThreadPriority::kRealtime;
tommic9bb7912017-02-24 18:42:1498 case Priority::LOW:
Markus Handellad5037b2021-05-07 13:02:3699 return rtc::ThreadPriority::kLow;
tommic9bb7912017-02-24 18:42:14100 case Priority::NORMAL:
Markus Handellad5037b2021-05-07 13:02:36101 return rtc::ThreadPriority::kNormal;
tommic9bb7912017-02-24 18:42:14102 }
tommic9bb7912017-02-24 18:42:14103}
tommic06b1332016-05-14 18:31:40104
Danil Chapovaloveb175242019-02-12 09:44:38105class TaskQueueLibevent final : public TaskQueueBase {
perkj650fdae2017-08-25 12:00:11106 public:
Danil Chapovaloveb175242019-02-12 09:44:38107 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
perkj650fdae2017-08-25 12:00:11108
Danil Chapovaloveb175242019-02-12 09:44:38109 void Delete() override;
Markus Handell2a256c82023-02-27 11:41:39110
111 protected:
112 void PostTaskImpl(absl::AnyInvocable<void() &&> task,
113 const PostTaskTraits& traits,
114 const Location& location) override;
115 void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
116 TimeDelta delay,
117 const PostDelayedTaskTraits& traits,
118 const Location& location) override;
perkj650fdae2017-08-25 12:00:11119
120 private:
Danil Chapovaloveb175242019-02-12 09:44:38121 struct TimerEvent;
122
Danil Chapovalov30c2a312022-07-19 12:12:43123 void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
124 TimeDelta delay);
125
Danil Chapovaloveb175242019-02-12 09:44:38126 ~TaskQueueLibevent() override = default;
127
perkj650fdae2017-08-25 12:00:11128 static void OnWakeup(int socket, short flags, void* context); // NOLINT
perkj650fdae2017-08-25 12:00:11129 static void RunTimer(int fd, short flags, void* context); // NOLINT
130
Danil Chapovaloveb175242019-02-12 09:44:38131 bool is_active_ = true;
perkj650fdae2017-08-25 12:00:11132 int wakeup_pipe_in_ = -1;
133 int wakeup_pipe_out_ = -1;
134 event_base* event_base_;
Danil Chapovaloveb175242019-02-12 09:44:38135 event wakeup_event_;
136 rtc::PlatformThread thread_;
Markus Handell18523c32020-07-08 15:55:58137 Mutex pending_lock_;
Danil Chapovalov30c2a312022-07-19 12:12:43138 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
Steve Anton9a83dd72020-01-09 19:03:25139 RTC_GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 18:31:40140 // Holds a list of events pending timers for cleanup when the loop exits.
141 std::list<TimerEvent*> pending_timers_;
142};
143
Danil Chapovaloveb175242019-02-12 09:44:38144struct TaskQueueLibevent::TimerEvent {
Danil Chapovalov30c2a312022-07-19 12:12:43145 TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
Danil Chapovaloveb175242019-02-12 09:44:38146 : task_queue(task_queue), task(std::move(task)) {}
147 ~TimerEvent() { event_del(&ev); }
148
149 event ev;
150 TaskQueueLibevent* task_queue;
Danil Chapovalov30c2a312022-07-19 12:12:43151 absl::AnyInvocable<void() &&> task;
tommic06b1332016-05-14 18:31:40152};
153
Danil Chapovaloveb175242019-02-12 09:44:38154TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
155 rtc::ThreadPriority priority)
Markus Handellad5037b2021-05-07 13:02:36156 : event_base_(event_base_new()) {
tommic06b1332016-05-14 18:31:40157 int fds[2];
158 RTC_CHECK(pipe(fds) == 0);
159 SetNonBlocking(fds[0]);
160 SetNonBlocking(fds[1]);
161 wakeup_pipe_out_ = fds[0];
162 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 08:34:52163
Danil Chapovaloveb175242019-02-12 09:44:38164 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
tommi1666b612016-07-13 17:58:12165 EV_READ | EV_PERSIST, OnWakeup, this);
Danil Chapovaloveb175242019-02-12 09:44:38166 event_add(&wakeup_event_, 0);
Markus Handellad5037b2021-05-07 13:02:36167 thread_ = rtc::PlatformThread::SpawnJoinable(
168 [this] {
169 {
170 CurrentTaskQueueSetter set_current(this);
171 while (is_active_)
172 event_base_loop(event_base_, 0);
Markus Handellad5037b2021-05-07 13:02:36173
Markus Handell82da9322022-12-16 14:50:24174 // Ensure remaining deleted tasks are destroyed with Current() set up
175 // to this task queue.
176 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending;
177 MutexLock lock(&pending_lock_);
178 pending_.swap(pending);
179 }
Markus Handellad5037b2021-05-07 13:02:36180 for (TimerEvent* timer : pending_timers_)
181 delete timer;
Markus Handell82da9322022-12-16 14:50:24182
183#if RTC_DCHECK_IS_ON
184 MutexLock lock(&pending_lock_);
185 RTC_DCHECK(pending_.empty());
186#endif
Markus Handellad5037b2021-05-07 13:02:36187 },
188 queue_name, rtc::ThreadAttributes().SetPriority(priority));
tommic06b1332016-05-14 18:31:40189}
190
Danil Chapovaloveb175242019-02-12 09:44:38191void TaskQueueLibevent::Delete() {
tommic06b1332016-05-14 18:31:40192 RTC_DCHECK(!IsCurrent());
193 struct timespec ts;
194 char message = kQuit;
195 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
196 // The queue is full, so we have no choice but to wait and retry.
197 RTC_CHECK_EQ(EAGAIN, errno);
198 ts.tv_sec = 0;
199 ts.tv_nsec = 1000000;
200 nanosleep(&ts, nullptr);
201 }
202
Markus Handellad5037b2021-05-07 13:02:36203 thread_.Finalize();
tommic06b1332016-05-14 18:31:40204
Danil Chapovaloveb175242019-02-12 09:44:38205 event_del(&wakeup_event_);
tommi8c80c6e2017-02-23 08:34:52206
207 IgnoreSigPipeSignalOnCurrentThread();
208
tommic06b1332016-05-14 18:31:40209 close(wakeup_pipe_in_);
210 close(wakeup_pipe_out_);
211 wakeup_pipe_in_ = -1;
212 wakeup_pipe_out_ = -1;
213
tommic06b1332016-05-14 18:31:40214 event_base_free(event_base_);
Danil Chapovaloveb175242019-02-12 09:44:38215 delete this;
tommic06b1332016-05-14 18:31:40216}
217
Markus Handell2a256c82023-02-27 11:41:39218void TaskQueueLibevent::PostTaskImpl(absl::AnyInvocable<void() &&> task,
219 const PostTaskTraits& traits,
220 const Location& location) {
Danil Chapovalov00e71ef2019-06-11 16:01:56221 {
Markus Handell18523c32020-07-08 15:55:58222 MutexLock lock(&pending_lock_);
Steve Anton9a83dd72020-01-09 19:03:25223 bool had_pending_tasks = !pending_.empty();
Danil Chapovalov00e71ef2019-06-11 16:01:56224 pending_.push_back(std::move(task));
Steve Anton9a83dd72020-01-09 19:03:25225
226 // Only write to the pipe if there were no pending tasks before this one
227 // since the thread could be sleeping. If there were already pending tasks
228 // then we know there's either a pending write in the pipe or the thread has
229 // not yet processed the pending tasks. In either case, the thread will
230 // eventually wake up and process all pending tasks including this one.
231 if (had_pending_tasks) {
232 return;
233 }
Danil Chapovalov00e71ef2019-06-11 16:01:56234 }
Steve Anton9a83dd72020-01-09 19:03:25235
236 // Note: This behvior outlined above ensures we never fill up the pipe write
237 // buffer since there will only ever be 1 byte pending.
238 char message = kRunTasks;
239 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
240 sizeof(message));
tommic06b1332016-05-14 18:31:40241}
242
Danil Chapovalov30c2a312022-07-19 12:12:43243void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
244 absl::AnyInvocable<void() &&> task,
245 TimeDelta delay) {
246 // libevent api is not thread safe by default, thus event_add need to be
247 // called on the `thread_`.
248 RTC_DCHECK(IsCurrent());
249
250 TimerEvent* timer = new TimerEvent(this, std::move(task));
251 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
252 timer);
253 pending_timers_.push_back(timer);
254 timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
255 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
256 event_add(&timer->ev, &tv);
257}
258
Markus Handell2a256c82023-02-27 11:41:39259void TaskQueueLibevent::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
260 TimeDelta delay,
261 const PostDelayedTaskTraits& traits,
262 const Location& location) {
tommic06b1332016-05-14 18:31:40263 if (IsCurrent()) {
Danil Chapovalov30c2a312022-07-19 12:12:43264 PostDelayedTaskOnTaskQueue(std::move(task), delay);
tommic06b1332016-05-14 18:31:40265 } else {
Danil Chapovalov30c2a312022-07-19 12:12:43266 int64_t posted_us = rtc::TimeMicros();
267 PostTask([posted_us, delay, task = std::move(task), this]() mutable {
268 // Compensate for the time that has passed since the posting.
269 TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
270 PostDelayedTaskOnTaskQueue(
271 std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
272 });
tommic06b1332016-05-14 18:31:40273 }
274}
275
tommic06b1332016-05-14 18:31:40276// static
Danil Chapovaloveb175242019-02-12 09:44:38277void TaskQueueLibevent::OnWakeup(int socket,
278 short flags, // NOLINT
279 void* context) {
280 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
281 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
tommic06b1332016-05-14 18:31:40282 char buf;
283 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
284 switch (buf) {
285 case kQuit:
Danil Chapovaloveb175242019-02-12 09:44:38286 me->is_active_ = false;
287 event_base_loopbreak(me->event_base_);
tommic06b1332016-05-14 18:31:40288 break;
Steve Anton9a83dd72020-01-09 19:03:25289 case kRunTasks: {
Danil Chapovalov30c2a312022-07-19 12:12:43290 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
tommic06b1332016-05-14 18:31:40291 {
Markus Handell18523c32020-07-08 15:55:58292 MutexLock lock(&me->pending_lock_);
Steve Anton9a83dd72020-01-09 19:03:25293 tasks.swap(me->pending_);
tommic06b1332016-05-14 18:31:40294 }
Steve Anton9a83dd72020-01-09 19:03:25295 RTC_DCHECK(!tasks.empty());
296 for (auto& task : tasks) {
Danil Chapovalov30c2a312022-07-19 12:12:43297 std::move(task)();
298 // Prefer to delete the `task` before running the next one.
299 task = nullptr;
Steve Anton9a83dd72020-01-09 19:03:25300 }
tommic06b1332016-05-14 18:31:40301 break;
302 }
303 default:
Artem Titovd3251962021-11-15 15:57:07304 RTC_DCHECK_NOTREACHED();
tommic06b1332016-05-14 18:31:40305 break;
306 }
307}
308
309// static
Danil Chapovaloveb175242019-02-12 09:44:38310void TaskQueueLibevent::RunTimer(int fd,
311 short flags, // NOLINT
312 void* context) {
tommic06b1332016-05-14 18:31:40313 TimerEvent* timer = static_cast<TimerEvent*>(context);
Danil Chapovalov30c2a312022-07-19 12:12:43314 std::move(timer->task)();
Danil Chapovaloveb175242019-02-12 09:44:38315 timer->task_queue->pending_timers_.remove(timer);
tommic06b1332016-05-14 18:31:40316 delete timer;
317}
318
Danil Chapovaloveb175242019-02-12 09:44:38319class TaskQueueLibeventFactory final : public TaskQueueFactory {
320 public:
321 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
322 absl::string_view name,
323 Priority priority) const override {
324 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
325 new TaskQueueLibevent(name,
326 TaskQueuePriorityToThreadPriority(priority)));
327 }
328};
329
330} // namespace
331
332std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
Mirko Bonadei317a1f02019-09-17 15:06:18333 return std::make_unique<TaskQueueLibeventFactory>();
perkj650fdae2017-08-25 12:00:11334}
335
Danil Chapovaloveb175242019-02-12 09:44:38336} // namespace webrtc