blob: d8e8b1173e617534f958a62458491ae64ecb25c1 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:261/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 04:47:3111#include "rtc_base/messagequeue.h"
henrike@webrtc.orgf0488722014-05-13 18:00:2612
Taylor Brandstetterfe7d0912016-09-16 00:47:4213#include <functional>
14
Mirko Bonadei92ea95e2017-09-15 04:47:3115#include "rtc_base/atomicops.h"
16#include "rtc_base/bind.h"
17#include "rtc_base/event.h"
18#include "rtc_base/gunit.h"
19#include "rtc_base/logging.h"
20#include "rtc_base/nullsocketserver.h"
21#include "rtc_base/refcount.h"
22#include "rtc_base/refcountedobject.h"
23#include "rtc_base/thread.h"
24#include "rtc_base/timeutils.h"
henrike@webrtc.orgf0488722014-05-13 18:00:2625
26using namespace rtc;
27
Yves Gerey665174f2018-06-19 13:03:0528class MessageQueueTest : public testing::Test, public MessageQueue {
henrike@webrtc.orgf0488722014-05-13 18:00:2629 public:
danilchapbebf54c2016-04-28 08:32:4830 MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {}
henrike@webrtc.orgf0488722014-05-13 18:00:2631 bool IsLocked_Worker() {
32 if (!crit_.TryEnter()) {
33 return true;
34 }
35 crit_.Leave();
36 return false;
37 }
38 bool IsLocked() {
39 // We have to do this on a worker thread, or else the TryEnter will
40 // succeed, since our critical sections are reentrant.
tommie7251592017-07-14 21:44:4641 std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
42 worker->Start();
43 return worker->Invoke<bool>(
Taylor Brandstetter5d97a9a2016-06-10 21:17:2744 RTC_FROM_HERE, rtc::Bind(&MessageQueueTest::IsLocked_Worker, this));
decurtis@webrtc.org2bffc3c2015-02-21 01:45:0445 }
decurtis@webrtc.org2bffc3c2015-02-21 01:45:0446};
47
henrike@webrtc.orgf0488722014-05-13 18:00:2648struct DeletedLockChecker {
decurtis@webrtc.org2af30572015-02-21 01:59:5049 DeletedLockChecker(MessageQueueTest* test, bool* was_locked, bool* deleted)
Yves Gerey665174f2018-06-19 13:03:0550 : test(test), was_locked(was_locked), deleted(deleted) {}
henrike@webrtc.orgf0488722014-05-13 18:00:2651 ~DeletedLockChecker() {
52 *deleted = true;
decurtis@webrtc.org2af30572015-02-21 01:59:5053 *was_locked = test->IsLocked();
henrike@webrtc.orgf0488722014-05-13 18:00:2654 }
decurtis@webrtc.org2af30572015-02-21 01:59:5055 MessageQueueTest* test;
henrike@webrtc.orgf0488722014-05-13 18:00:2656 bool* was_locked;
57 bool* deleted;
58};
59
60static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
61 MessageQueue* q) {
deadbeef37f5ecf2017-02-27 22:06:4162 EXPECT_TRUE(q != nullptr);
Honghai Zhang82d78622016-05-06 18:29:1563 int64_t now = TimeMillis();
deadbeef37f5ecf2017-02-27 22:06:4164 q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
65 q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
66 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
67 q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
68 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
henrike@webrtc.orgf0488722014-05-13 18:00:2669
70 Message msg;
Yves Gerey665174f2018-06-19 13:03:0571 for (size_t i = 0; i < 5; ++i) {
henrike@webrtc.orgf0488722014-05-13 18:00:2672 memset(&msg, 0, sizeof(msg));
73 EXPECT_TRUE(q->Get(&msg, 0));
74 EXPECT_EQ(i, msg.message_id);
75 }
76
77 EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
78}
79
80TEST_F(MessageQueueTest,
81 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
danilchapbebf54c2016-04-28 08:32:4882 MessageQueue q(SocketServer::CreateDefault(), true);
decurtis@webrtc.org2af30572015-02-21 01:59:5083 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
danilchapbebf54c2016-04-28 08:32:4884
henrike@webrtc.orgf0488722014-05-13 18:00:2685 NullSocketServer nullss;
danilchapbebf54c2016-04-28 08:32:4886 MessageQueue q_nullss(&nullss, true);
henrike@webrtc.orgf0488722014-05-13 18:00:2687 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
88}
89
henrike@webrtc.orgc732a3e2014-10-09 22:08:1590TEST_F(MessageQueueTest, DisposeNotLocked) {
henrike@webrtc.orgf0488722014-05-13 18:00:2691 bool was_locked = true;
92 bool deleted = false;
decurtis@webrtc.org2af30572015-02-21 01:59:5093 DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
94 Dispose(d);
henrike@webrtc.orgf0488722014-05-13 18:00:2695 Message msg;
decurtis@webrtc.org2af30572015-02-21 01:59:5096 EXPECT_FALSE(Get(&msg, 0));
henrike@webrtc.orgf0488722014-05-13 18:00:2697 EXPECT_TRUE(deleted);
98 EXPECT_FALSE(was_locked);
99}
100
101class DeletedMessageHandler : public MessageHandler {
102 public:
Yves Gerey665174f2018-06-19 13:03:05103 explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {}
Steve Anton9de3aac2017-10-24 17:08:26104 ~DeletedMessageHandler() override { *deleted_ = true; }
105 void OnMessage(Message* msg) override {}
106
henrike@webrtc.orgf0488722014-05-13 18:00:26107 private:
108 bool* deleted_;
109};
110
decurtis@webrtc.org2af30572015-02-21 01:59:50111TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) {
henrike@webrtc.orgf0488722014-05-13 18:00:26112 bool deleted = false;
Yves Gerey665174f2018-06-19 13:03:05113 DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
henrike@webrtc.orgf0488722014-05-13 18:00:26114 // First, post a dispose.
decurtis@webrtc.org2af30572015-02-21 01:59:50115 Dispose(handler);
henrike@webrtc.orgf0488722014-05-13 18:00:26116 // Now, post a message, which should *not* be returned by Get().
Taylor Brandstetter5d97a9a2016-06-10 21:17:27117 Post(RTC_FROM_HERE, handler, 1);
henrike@webrtc.orgf0488722014-05-13 18:00:26118 Message msg;
decurtis@webrtc.org2af30572015-02-21 01:59:50119 EXPECT_FALSE(Get(&msg, 0));
henrike@webrtc.orgf0488722014-05-13 18:00:26120 EXPECT_TRUE(deleted);
121}
122
123struct UnwrapMainThreadScope {
deadbeef37f5ecf2017-02-27 22:06:41124 UnwrapMainThreadScope() : rewrap_(Thread::Current() != nullptr) {
Yves Gerey665174f2018-06-19 13:03:05125 if (rewrap_)
126 ThreadManager::Instance()->UnwrapCurrentThread();
henrike@webrtc.orgf0488722014-05-13 18:00:26127 }
128 ~UnwrapMainThreadScope() {
Yves Gerey665174f2018-06-19 13:03:05129 if (rewrap_)
130 ThreadManager::Instance()->WrapCurrentThread();
henrike@webrtc.orgf0488722014-05-13 18:00:26131 }
Yves Gerey665174f2018-06-19 13:03:05132
henrike@webrtc.orgf0488722014-05-13 18:00:26133 private:
134 bool rewrap_;
135};
136
Taylor Brandstetterfe7d0912016-09-16 00:47:42137// Ensure that ProcessAllMessageQueues does its essential function; process
138// all messages (both delayed and non delayed) up until the current time, on
139// all registered message queues.
140TEST(MessageQueueManager, ProcessAllMessageQueues) {
141 Event entered_process_all_message_queues(true, false);
tommie7251592017-07-14 21:44:46142 auto a = Thread::CreateWithSocketServer();
143 auto b = Thread::CreateWithSocketServer();
144 a->Start();
145 b->Start();
Taylor Brandstetterfe7d0912016-09-16 00:47:42146
147 volatile int messages_processed = 0;
148 FunctorMessageHandler<void, std::function<void()>> incrementer(
149 [&messages_processed, &entered_process_all_message_queues] {
150 // Wait for event as a means to ensure Increment doesn't occur outside
151 // of ProcessAllMessageQueues. The event is set by a message posted to
152 // the main thread, which is guaranteed to be handled inside
153 // ProcessAllMessageQueues.
154 entered_process_all_message_queues.Wait(Event::kForever);
155 AtomicOps::Increment(&messages_processed);
156 });
157 FunctorMessageHandler<void, std::function<void()>> event_signaler(
158 [&entered_process_all_message_queues] {
159 entered_process_all_message_queues.Set();
160 });
161
162 // Post messages (both delayed and non delayed) to both threads.
tommie7251592017-07-14 21:44:46163 a->Post(RTC_FROM_HERE, &incrementer);
164 b->Post(RTC_FROM_HERE, &incrementer);
165 a->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
166 b->PostDelayed(RTC_FROM_HERE, 0, &incrementer);
Taylor Brandstetterfe7d0912016-09-16 00:47:42167 rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
168
Niels Möller8909a632018-09-06 06:42:44169 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42170 EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
171}
172
173// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
174TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
tommie7251592017-07-14 21:44:46175 auto t = Thread::CreateWithSocketServer();
176 t->Start();
177 t->Quit();
Niels Möller8909a632018-09-06 06:42:44178 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42179}
180
181// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
182// messages.
183TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
184 Event entered_process_all_message_queues(true, false);
tommie7251592017-07-14 21:44:46185 auto t = Thread::CreateWithSocketServer();
186 t->Start();
Taylor Brandstetterfe7d0912016-09-16 00:47:42187
188 FunctorMessageHandler<void, std::function<void()>> clearer(
189 [&entered_process_all_message_queues] {
190 // Wait for event as a means to ensure Clear doesn't occur outside of
191 // ProcessAllMessageQueues. The event is set by a message posted to the
192 // main thread, which is guaranteed to be handled inside
193 // ProcessAllMessageQueues.
194 entered_process_all_message_queues.Wait(Event::kForever);
195 rtc::Thread::Current()->Clear(nullptr);
196 });
197 FunctorMessageHandler<void, std::function<void()>> event_signaler(
198 [&entered_process_all_message_queues] {
199 entered_process_all_message_queues.Set();
200 });
201
202 // Post messages (both delayed and non delayed) to both threads.
tommie7251592017-07-14 21:44:46203 t->Post(RTC_FROM_HERE, &clearer);
Taylor Brandstetterfe7d0912016-09-16 00:47:42204 rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
Niels Möller8909a632018-09-06 06:42:44205 MessageQueueManager::ProcessAllMessageQueuesForTesting();
Taylor Brandstetterfe7d0912016-09-16 00:47:42206}
jbauch5b361732017-07-07 06:51:37207
Yves Gerey665174f2018-06-19 13:03:05208class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface {
jbauch5b361732017-07-07 06:51:37209 public:
210 void OnMessage(Message* msg) override {}
211};
212
213class EmptyHandler : public MessageHandler {
214 public:
215 void OnMessage(Message* msg) override {}
216};
217
218TEST(MessageQueueManager, ClearReentrant) {
tommie7251592017-07-14 21:44:46219 std::unique_ptr<Thread> t(Thread::Create());
jbauch5b361732017-07-07 06:51:37220 EmptyHandler handler;
221 RefCountedHandler* inner_handler(
222 new rtc::RefCountedObject<RefCountedHandler>());
223 // When the empty handler is destroyed, it will clear messages queued for
224 // itself. The message to be cleared itself wraps a MessageHandler object
225 // (RefCountedHandler) so this will cause the message queue to be cleared
226 // again in a re-entrant fashion, which previously triggered a DCHECK.
227 // The inner handler will be removed in a re-entrant fashion from the
228 // message queue of the thread while the outer handler is removed, verifying
229 // that the iterator is not invalidated in "MessageQueue::Clear".
tommie7251592017-07-14 21:44:46230 t->Post(RTC_FROM_HERE, inner_handler, 0);
231 t->Post(RTC_FROM_HERE, &handler, 0,
232 new ScopedRefMessageData<RefCountedHandler>(inner_handler));
jbauch5b361732017-07-07 06:51:37233}