blob: b31ea6c1b21fd527334260360947a69d37edb0e5 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:261/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Steve Anton10542f22019-01-11 17:11:0011#include "rtc_base/message_queue.h"
henrike@webrtc.orgf0488722014-05-13 18:00:2612
Taylor Brandstetterfe7d0912016-09-16 00:47:4213#include <functional>
14
Steve Anton10542f22019-01-11 17:11:0015#include "rtc_base/atomic_ops.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3116#include "rtc_base/bind.h"
17#include "rtc_base/event.h"
18#include "rtc_base/gunit.h"
19#include "rtc_base/logging.h"
Steve Anton10542f22019-01-11 17:11:0020#include "rtc_base/null_socket_server.h"
21#include "rtc_base/ref_count.h"
22#include "rtc_base/ref_counted_object.h"
Mirko Bonadei92ea95e2017-09-15 04:47:3123#include "rtc_base/thread.h"
Steve Anton10542f22019-01-11 17:11:0024#include "rtc_base/time_utils.h"
henrike@webrtc.orgf0488722014-05-13 18:00:2625
Mirko Bonadeie10b1632018-12-11 17:43:4026namespace rtc {
27namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:2628
Mirko Bonadei6a489f22019-04-09 13:11:1229class MessageQueueTest : public ::testing::Test, public MessageQueue {
henrike@webrtc.orgf0488722014-05-13 18:00:2630 public:
danilchapbebf54c2016-04-28 08:32:4831 MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {}
henrike@webrtc.orgf0488722014-05-13 18:00:2632 bool IsLocked_Worker() {
33 if (!crit_.TryEnter()) {
34 return true;
35 }
36 crit_.Leave();
37 return false;
38 }
39 bool IsLocked() {
40 // We have to do this on a worker thread, or else the TryEnter will
41 // succeed, since our critical sections are reentrant.
tommie7251592017-07-14 21:44:4642 std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
43 worker->Start();
44 return worker->Invoke<bool>(
Taylor Brandstetter5d97a9a2016-06-10 21:17:2745 RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this));
decurtis@webrtc.org2bffc3c2015-02-21 01:45:0446 }
decurtis@webrtc.org2bffc3c2015-02-21 01:45:0447};
48
henrike@webrtc.orgf0488722014-05-13 18:00:2649struct DeletedLockChecker {
decurtis@webrtc.org2af30572015-02-21 01:59:5050 DeletedLockChecker(MessageQueueTest* test, bool* was_locked, bool* deleted)
Yves Gerey665174f2018-06-19 13:03:0551 : test(test), was_locked(was_locked), deleted(deleted) {}
henrike@webrtc.orgf0488722014-05-13 18:00:2652 ~DeletedLockChecker() {
53 *deleted = true;
decurtis@webrtc.org2af30572015-02-21 01:59:5054 *was_locked = test->IsLocked();
henrike@webrtc.orgf0488722014-05-13 18:00:2655 }
decurtis@webrtc.org2af30572015-02-21 01:59:5056 MessageQueueTest* test;
henrike@webrtc.orgf0488722014-05-13 18:00:2657 bool* was_locked;
58 bool* deleted;
59};
60
61static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
62 MessageQueue* q) {
deadbeef37f5ecf2017-02-27 22:06:4163 EXPECT_TRUE(q != nullptr);
Honghai Zhang82d78622016-05-06 18:29:1564 int64_t now = TimeMillis();
deadbeef37f5ecf2017-02-27 22:06:4165 q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
66 q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
67 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
68 q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
69 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
henrike@webrtc.orgf0488722014-05-13 18:00:2670
71 Message msg;
Yves Gerey665174f2018-06-19 13:03:0572 for (size_t i = 0; i < 5; ++i) {
henrike@webrtc.orgf0488722014-05-13 18:00:2673 memset(&msg, 0, sizeof(msg));
74 EXPECT_TRUE(q->Get(&msg, 0));
75 EXPECT_EQ(i, msg.message_id);
76 }
77
78 EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
79}
80
81TEST_F(MessageQueueTest,
82 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
danilchapbebf54c2016-04-28 08:32:4883 MessageQueue q(SocketServer::CreateDefault(), true);
decurtis@webrtc.org2af30572015-02-21 01:59:5084 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
danilchapbebf54c2016-04-28 08:32:4885
henrike@webrtc.orgf0488722014-05-13 18:00:2686 NullSocketServer nullss;
danilchapbebf54c2016-04-28 08:32:4887 MessageQueue q_nullss(&nullss, true);
henrike@webrtc.orgf0488722014-05-13 18:00:2688 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
89}
90
henrike@webrtc.orgc732a3e2014-10-09 22:08:1591TEST_F(MessageQueueTest, DisposeNotLocked) {
henrike@webrtc.orgf0488722014-05-13 18:00:2692 bool was_locked = true;
93 bool deleted = false;
decurtis@webrtc.org2af30572015-02-21 01:59:5094 DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
95 Dispose(d);
henrike@webrtc.orgf0488722014-05-13 18:00:2696 Message msg;
decurtis@webrtc.org2af30572015-02-21 01:59:5097 EXPECT_FALSE(Get(&msg, 0));
henrike@webrtc.orgf0488722014-05-13 18:00:2698 EXPECT_TRUE(deleted);
99 EXPECT_FALSE(was_locked);
100}
101
102class DeletedMessageHandler : public MessageHandler {
103 public:
Yves Gerey665174f2018-06-19 13:03:05104 explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {}
Steve Anton9de3aac2017-10-24 17:08:26105 ~DeletedMessageHandler() override { *deleted_ = true; }
106 void OnMessage(Message* msg) override {}
107
henrike@webrtc.orgf0488722014-05-13 18:00:26108 private:
109 bool* deleted_;
110};
111
decurtis@webrtc.org2af30572015-02-21 01:59:50112TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) {
henrike@webrtc.orgf0488722014-05-13 18:00:26113 bool deleted = false;
Yves Gerey665174f2018-06-19 13:03:05114 DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
henrike@webrtc.orgf0488722014-05-13 18:00:26115 // First, post a dispose.
decurtis@webrtc.org2af30572015-02-21 01:59:50116 Dispose(handler);
henrike@webrtc.orgf0488722014-05-13 18:00:26117 // Now, post a message, which should *not* be returned by Get().
Taylor Brandstetter5d97a9a2016-06-10 21:17:27118 Post(RTC_FROM_HERE, handler, 1);
henrike@webrtc.orgf0488722014-05-13 18:00:26119 Message msg;
decurtis@webrtc.org2af30572015-02-21 01:59:50120 EXPECT_FALSE(Get(&msg, 0));
henrike@webrtc.orgf0488722014-05-13 18:00:26121 EXPECT_TRUE(deleted);
122}
123
Taylor Brandstetterfe7d0912016-09-16 00:47:42124// Ensure that ProcessAllMessageQueues does its essential function; process
125// all messages (both delayed and non delayed) up until the current time, on
126// all registered message queues.
127TEST(MessageQueueManager, ProcessAllMessageQueues) {
128 Event entered_process_all_message_queues(true, false);
tommie7251592017-07-14 21:44:46129 auto a = Thread::CreateWithSocketServer();
130 auto b = Thread::CreateWithSocketServer();
131 a->Start();
132 b->Start();
Taylor Brandstetterfe7d0912016-09-16 00:47:42133
134 volatile int messages_processed = 0;
135 FunctorMessageHandler<void, std::function<void()>> incrementer(
136 [&messages_processed, &entered_process_all_message_queues] {
137 // Wait for event as a means to ensure Increment doesn't occur outside
138 // of ProcessAllMessageQueues. The event is set by a message posted to
139 // the main thread, which is guaranteed to be handled inside
140 // ProcessAllMessageQueues.
141 entered_process_all_message_queues.Wait(Event::kForever);
142 AtomicOps::Increment(&messages_processed);
143 });
144 FunctorMessageHandler<void, std::function<void()>> event_signaler(
145 [&entered_process_all_message_queues] {
146 entered_process_all_message_queues.Set();
147 });
148
149 // Post messages (both delayed and non delayed) to both threads.
tommie7251592017-07-14 21:44:46150 a->Post(RTC_FROM_HERE, &incrementer);
151 b->Post(RTC_FROM_HERE, &incrementer);
152 a->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
153 b->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
Taylor Brandstetterfe7d0912016-09-16 00:47:42154 rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
155
Niels Möller8909a632018-09-06 06:42:44156 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42157 EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
158}
159
160// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
161TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
tommie7251592017-07-14 21:44:46162 auto t = Thread::CreateWithSocketServer();
163 t->Start();
164 t->Quit();
Niels Möller8909a632018-09-06 06:42:44165 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42166}
167
168// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
169// messages.
170TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
171 Event entered_process_all_message_queues(true, false);
tommie7251592017-07-14 21:44:46172 auto t = Thread::CreateWithSocketServer();
173 t->Start();
Taylor Brandstetterfe7d0912016-09-16 00:47:42174
175 FunctorMessageHandler<void, std::function<void()>> clearer(
176 [&entered_process_all_message_queues] {
177 // Wait for event as a means to ensure Clear doesn't occur outside of
178 // ProcessAllMessageQueues. The event is set by a message posted to the
179 // main thread, which is guaranteed to be handled inside
180 // ProcessAllMessageQueues.
181 entered_process_all_message_queues.Wait(Event::kForever);
182 rtc::Thread::Current()->Clear(nullptr);
183 });
184 FunctorMessageHandler<void, std::function<void()>> event_signaler(
185 [&entered_process_all_message_queues] {
186 entered_process_all_message_queues.Set();
187 });
188
189 // Post messages (both delayed and non delayed) to both threads.
tommie7251592017-07-14 21:44:46190 t->Post(RTC_FROM_HERE, &clearer);
Taylor Brandstetterfe7d0912016-09-16 00:47:42191 rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
Niels Möller8909a632018-09-06 06:42:44192 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42193}
jbauch5b361732017-07-07 06:51:37194
Yves Gerey665174f2018-06-19 13:03:05195class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface {
jbauch5b361732017-07-07 06:51:37196 public:
197 void OnMessage(Message* msg) override {}
198};
199
200class EmptyHandler : public MessageHandler {
201 public:
202 void OnMessage(Message* msg) override {}
203};
204
205TEST(MessageQueueManager, ClearReentrant) {
tommie7251592017-07-14 21:44:46206 std::unique_ptr<Thread> t(Thread::Create());
jbauch5b361732017-07-07 06:51:37207 EmptyHandler handler;
208 RefCountedHandler* inner_handler(
209 new rtc::RefCountedObject<RefCountedHandler>());
210 // When the empty handler is destroyed, it will clear messages queued for
211 // itself. The message to be cleared itself wraps a MessageHandler object
212 // (RefCountedHandler) so this will cause the message queue to be cleared
213 // again in a re-entrant fashion, which previously triggered a DCHECK.
214 // The inner handler will be removed in a re-entrant fashion from the
215 // message queue of the thread while the outer handler is removed, verifying
216 // that the iterator is not invalidated in "MessageQueue::Clear".
tommie7251592017-07-14 21:44:46217 t->Post(RTC_FROM_HERE, inner_handler, 0);
218 t->Post(RTC_FROM_HERE, &handler, 0,
219 new ScopedRefMessageData<RefCountedHandler>(inner_handler));
jbauch5b361732017-07-07 06:51:37220}
Mirko Bonadeie10b1632018-12-11 17:43:40221
222} // namespace
223} // namespace rtc