blob: 10bde2091c22db6af11caa6ff4adf0516919c535 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:261/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:2610#include <algorithm>
11
Mirko Bonadei92ea95e2017-09-15 04:47:3112#include "rtc_base/atomicops.h"
13#include "rtc_base/checks.h"
14#include "rtc_base/logging.h"
15#include "rtc_base/messagequeue.h"
16#include "rtc_base/stringencode.h"
17#include "rtc_base/thread.h"
18#include "rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:2619
20namespace rtc {
andrespcdf61722016-07-08 09:45:4021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:2622
Honghai Zhang82d78622016-05-06 18:29:1523const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 21:17:2724const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:2625
danilchap3c6abd22017-09-06 12:46:2926class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 09:45:4027 public:
jbauch5b361732017-07-07 06:51:3728 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
danilchap3c6abd22017-09-06 12:46:2929 RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-07 06:51:3730 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 09:45:4031 cs_->Enter();
jbauch5b361732017-07-07 06:51:3732 *processing_ += 1;
andrespcdf61722016-07-08 09:45:4033 }
34
danilchap3c6abd22017-09-06 12:46:2935 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
jbauch5b361732017-07-07 06:51:3736 *processing_ -= 1;
andrespcdf61722016-07-08 09:45:4037 cs_->Leave();
38 }
39
40 private:
41 const CriticalSection* const cs_;
jbauch5b361732017-07-07 06:51:3742 size_t* processing_;
andrespcdf61722016-07-08 09:45:4043
jbauch5b361732017-07-07 06:51:3744 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 09:45:4045};
46} // namespace
47
henrike@webrtc.orgf0488722014-05-13 18:00:2648//------------------------------------------------------------------
49// MessageQueueManager
50
deadbeef37f5ecf2017-02-27 22:06:4151MessageQueueManager* MessageQueueManager::instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:2652
53MessageQueueManager* MessageQueueManager::Instance() {
54 // Note: This is not thread safe, but it is first called before threads are
55 // spawned.
56 if (!instance_)
57 instance_ = new MessageQueueManager;
58 return instance_;
59}
60
61bool MessageQueueManager::IsInitialized() {
deadbeef37f5ecf2017-02-27 22:06:4162 return instance_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:2663}
64
jbauch5b361732017-07-07 06:51:3765MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:2666
67MessageQueueManager::~MessageQueueManager() {
68}
69
70void MessageQueueManager::Add(MessageQueue *message_queue) {
71 return Instance()->AddInternal(message_queue);
72}
73void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
jbauch5b361732017-07-07 06:51:3774 CritScope cs(&crit_);
75 // Prevent changes while the list of message queues is processed.
76 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:2677 message_queues_.push_back(message_queue);
78}
79
80void MessageQueueManager::Remove(MessageQueue *message_queue) {
81 // If there isn't a message queue manager instance, then there isn't a queue
82 // to remove.
83 if (!instance_) return;
84 return Instance()->RemoveInternal(message_queue);
85}
86void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:2687 // If this is the last MessageQueue, destroy the manager as well so that
88 // we don't leak this object at program shutdown. As mentioned above, this is
89 // not thread-safe, but this should only happen at program termination (when
90 // the ThreadManager is destroyed, and threads are no longer active).
91 bool destroy = false;
92 {
jbauch5b361732017-07-07 06:51:3793 CritScope cs(&crit_);
94 // Prevent changes while the list of message queues is processed.
95 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:2696 std::vector<MessageQueue *>::iterator iter;
97 iter = std::find(message_queues_.begin(), message_queues_.end(),
98 message_queue);
99 if (iter != message_queues_.end()) {
100 message_queues_.erase(iter);
101 }
102 destroy = message_queues_.empty();
103 }
104 if (destroy) {
deadbeef37f5ecf2017-02-27 22:06:41105 instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26106 delete this;
107 }
108}
109
110void MessageQueueManager::Clear(MessageHandler *handler) {
111 // If there isn't a message queue manager instance, then there aren't any
112 // queues to remove this handler from.
113 if (!instance_) return;
114 return Instance()->ClearInternal(handler);
115}
116void MessageQueueManager::ClearInternal(MessageHandler *handler) {
jbauch5b361732017-07-07 06:51:37117 // Deleted objects may cause re-entrant calls to ClearInternal. This is
118 // allowed as the list of message queues does not change while queues are
119 // cleared.
120 MarkProcessingCritScope cs(&crit_, &processing_);
jbauch5b361732017-07-07 06:51:37121 for (MessageQueue* queue : message_queues_) {
122 queue->Clear(handler);
123 }
henrike@webrtc.orgf0488722014-05-13 18:00:26124}
125
deadbeeff5f03e82016-06-06 18:16:06126void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 21:15:43127 if (!instance_) {
128 return;
129 }
deadbeeff5f03e82016-06-06 18:16:06130 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 21:15:43131}
132
deadbeeff5f03e82016-06-06 18:16:06133void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-16 00:47:42134 // This works by posting a delayed message at the current time and waiting
135 // for it to be dispatched on all queues, which will ensure that all messages
136 // that came before it were also dispatched.
137 volatile int queues_not_done = 0;
138
139 // This class is used so that whether the posted message is processed, or the
140 // message queue is simply cleared, queues_not_done gets decremented.
141 class ScopedIncrement : public MessageData {
142 public:
143 ScopedIncrement(volatile int* value) : value_(value) {
144 AtomicOps::Increment(value_);
145 }
146 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
147
148 private:
149 volatile int* value_;
150 };
151
deadbeeff5f03e82016-06-06 18:16:06152 {
jbauch5b361732017-07-07 06:51:37153 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 18:16:06154 for (MessageQueue* queue : message_queues_) {
pthatcher1749bc32017-02-08 21:18:00155 if (!queue->IsProcessingMessages()) {
156 // If the queue is not processing messages, it can
157 // be ignored. If we tried to post a message to it, it would be dropped
158 // or ignored.
Taylor Brandstetterfe7d0912016-09-16 00:47:42159 continue;
160 }
161 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
162 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 18:16:06163 }
Taylor Brandstetterb3c68102016-05-27 21:15:43164 }
deadbeeff5f03e82016-06-06 18:16:06165 // Note: One of the message queues may have been on this thread, which is why
166 // we can't synchronously wait for queues_not_done to go to 0; we need to
167 // process messages as well.
168 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
169 rtc::Thread::Current()->ProcessMessages(0);
170 }
Taylor Brandstetterb3c68102016-05-27 21:15:43171}
172
henrike@webrtc.orgf0488722014-05-13 18:00:26173//------------------------------------------------------------------
174// MessageQueue
jbauch25d1f282016-02-05 08:25:02175MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 11:30:05176 : fPeekKeep_(false),
177 dmsgq_next_num_(0),
178 fInitialized_(false),
179 fDestroyed_(false),
180 stop_(0),
181 ss_(ss) {
danilchapbebf54c2016-04-28 08:32:48182 RTC_DCHECK(ss);
183 // Currently, MessageQueue holds a socket server, and is the base class for
184 // Thread. It seems like it makes more sense for Thread to hold the socket
185 // server, and provide it to the MessageQueue, since the Thread controls
186 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
187 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26188 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 08:25:02189 if (init_queue) {
190 DoInit();
191 }
henrike@webrtc.orgf0488722014-05-13 18:00:26192}
193
danilchapbebf54c2016-04-28 08:32:48194MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
195 : MessageQueue(ss.get(), init_queue) {
196 own_ss_ = std::move(ss);
197}
198
henrike@webrtc.orgf0488722014-05-13 18:00:26199MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 08:25:02200 DoDestroy();
201}
202
203void MessageQueue::DoInit() {
204 if (fInitialized_) {
205 return;
206 }
207
208 fInitialized_ = true;
209 MessageQueueManager::Add(this);
210}
211
212void MessageQueue::DoDestroy() {
213 if (fDestroyed_) {
214 return;
215 }
216
217 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26218 // The signal is done from here to ensure
219 // that it always gets called when the queue
220 // is going away.
221 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17222 MessageQueueManager::Remove(this);
deadbeef37f5ecf2017-02-27 22:06:41223 Clear(nullptr);
jbauch9ccedc32016-02-25 09:14:56224
henrike@webrtc.orgf0488722014-05-13 18:00:26225 if (ss_) {
deadbeef37f5ecf2017-02-27 22:06:41226 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26227 }
228}
229
jbauch9ccedc32016-02-25 09:14:56230SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 09:14:56231 return ss_;
232}
233
jbauch9ccedc32016-02-25 09:14:56234void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 09:14:56235 ss_->WakeUp();
236}
237
henrike@webrtc.orgf0488722014-05-13 18:00:26238void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 11:30:05239 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 09:14:56240 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26241}
242
243bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 11:30:05244 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26245}
246
pthatcher1749bc32017-02-08 21:18:00247bool MessageQueue::IsProcessingMessages() {
248 return !IsQuitting();
249}
250
henrike@webrtc.orgf0488722014-05-13 18:00:26251void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 11:30:05252 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26253}
254
255bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
256 if (fPeekKeep_) {
257 *pmsg = msgPeek_;
258 return true;
259 }
260 if (!Get(pmsg, cmsWait))
261 return false;
262 msgPeek_ = *pmsg;
263 fPeekKeep_ = true;
264 return true;
265}
266
267bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
268 // Return and clear peek if present
269 // Always return the peek if it exists so there is Peek/Get symmetry
270
271 if (fPeekKeep_) {
272 *pmsg = msgPeek_;
273 fPeekKeep_ = false;
274 return true;
275 }
276
277 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
278
Honghai Zhang82d78622016-05-06 18:29:15279 int64_t cmsTotal = cmsWait;
280 int64_t cmsElapsed = 0;
281 int64_t msStart = TimeMillis();
282 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26283 while (true) {
284 // Check for sent messages
285 ReceiveSends();
286
287 // Check for posted events
Honghai Zhang82d78622016-05-06 18:29:15288 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26289 bool first_pass = true;
290 while (true) {
291 // All queue operations need to be locked, but nothing else in this loop
292 // (specifically handling disposed message) can happen inside the crit.
293 // Otherwise, disposed MessageHandlers will cause deadlocks.
294 {
295 CritScope cs(&crit_);
296 // On the first pass, check for delayed messages that have been
297 // triggered and calculate the next trigger time.
298 if (first_pass) {
299 first_pass = false;
300 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 18:29:15301 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26302 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
303 break;
304 }
305 msgq_.push_back(dmsgq_.top().msg_);
306 dmsgq_.pop();
307 }
308 }
309 // Pull a message off the message queue, if available.
310 if (msgq_.empty()) {
311 break;
312 } else {
313 *pmsg = msgq_.front();
314 msgq_.pop_front();
315 }
316 } // crit_ is released here.
317
318 // Log a warning for time-sensitive messages that we're late to deliver.
319 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 18:29:15320 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26321 if (delay > 0) {
Mirko Bonadei675513b2017-11-09 10:09:25322 RTC_LOG_F(LS_WARNING)
323 << "id: " << pmsg->message_id
324 << " delay: " << (delay + kMaxMsgLatency) << "ms";
henrike@webrtc.orgf0488722014-05-13 18:00:26325 }
326 }
327 // If this was a dispose message, delete it and skip it.
328 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 22:06:41329 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26330 delete pmsg->pdata;
331 *pmsg = Message();
332 continue;
333 }
334 return true;
335 }
336
André Susano Pinto02a57972016-07-22 11:30:05337 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26338 break;
339
340 // Which is shorter, the delay wait or the asked wait?
341
Honghai Zhang82d78622016-05-06 18:29:15342 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26343 if (cmsWait == kForever) {
344 cmsNext = cmsDelayNext;
345 } else {
Honghai Zhang82d78622016-05-06 18:29:15346 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26347 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
348 cmsNext = cmsDelayNext;
349 }
350
jbauch9ccedc32016-02-25 09:14:56351 {
352 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 18:29:15353 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 09:14:56354 return false;
355 }
henrike@webrtc.orgf0488722014-05-13 18:00:26356
357 // If the specified timeout expired, return
358
Honghai Zhang82d78622016-05-06 18:29:15359 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26360 cmsElapsed = TimeDiff(msCurrent, msStart);
361 if (cmsWait != kForever) {
362 if (cmsElapsed >= cmsWait)
363 return false;
364 }
365 }
366 return false;
367}
368
369void MessageQueue::ReceiveSends() {
370}
371
Taylor Brandstetter5d97a9a2016-06-10 21:17:27372void MessageQueue::Post(const Location& posted_from,
373 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 10:23:21374 uint32_t id,
375 MessageData* pdata,
376 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 11:30:05377 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26378 return;
379
380 // Keep thread safe
381 // Add the message to the end of the queue
382 // Signal for the multiplexer to return
383
jbauch9ccedc32016-02-25 09:14:56384 {
385 CritScope cs(&crit_);
386 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 21:17:27387 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 09:14:56388 msg.phandler = phandler;
389 msg.message_id = id;
390 msg.pdata = pdata;
391 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 18:29:15392 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 09:14:56393 }
394 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26395 }
jbauch9ccedc32016-02-25 09:14:56396 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26397}
398
Taylor Brandstetter5d97a9a2016-06-10 21:17:27399void MessageQueue::PostDelayed(const Location& posted_from,
400 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53401 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 10:23:21402 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53403 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 21:17:27404 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
405 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53406}
407
Taylor Brandstetter5d97a9a2016-06-10 21:17:27408void MessageQueue::PostAt(const Location& posted_from,
409 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53410 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 10:23:21411 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53412 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 18:29:15413 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 21:57:31414 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 21:17:27415 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 18:29:15416}
417
Taylor Brandstetter5d97a9a2016-06-10 21:17:27418void MessageQueue::PostAt(const Location& posted_from,
419 int64_t tstamp,
Honghai Zhang82d78622016-05-06 18:29:15420 MessageHandler* phandler,
421 uint32_t id,
422 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 21:17:27423 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
424 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53425}
426
Taylor Brandstetter5d97a9a2016-06-10 21:17:27427void MessageQueue::DoDelayPost(const Location& posted_from,
428 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 18:29:15429 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 10:23:21430 MessageHandler* phandler,
431 uint32_t id,
432 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 11:30:05433 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26434 return;
Taylor Brandstetter2b3bf6b2016-05-19 21:57:31435 }
henrike@webrtc.orgf0488722014-05-13 18:00:26436
437 // Keep thread safe
438 // Add to the priority queue. Gets sorted soonest first.
439 // Signal for the multiplexer to return.
440
jbauch9ccedc32016-02-25 09:14:56441 {
442 CritScope cs(&crit_);
443 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 21:17:27444 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 09:14:56445 msg.phandler = phandler;
446 msg.message_id = id;
447 msg.pdata = pdata;
448 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
449 dmsgq_.push(dmsg);
450 // If this message queue processes 1 message every millisecond for 50 days,
451 // we will wrap this number. Even then, only messages with identical times
452 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 08:57:56453 ++dmsgq_next_num_;
454 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 09:14:56455 }
456 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26457}
458
459int MessageQueue::GetDelay() {
460 CritScope cs(&crit_);
461
462 if (!msgq_.empty())
463 return 0;
464
465 if (!dmsgq_.empty()) {
466 int delay = TimeUntil(dmsgq_.top().msTrigger_);
467 if (delay < 0)
468 delay = 0;
469 return delay;
470 }
471
472 return kForever;
473}
474
Peter Boström0c4e06b2015-10-07 10:23:21475void MessageQueue::Clear(MessageHandler* phandler,
476 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26477 MessageList* removed) {
478 CritScope cs(&crit_);
479
480 // Remove messages with phandler
481
482 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
483 if (removed) {
484 removed->push_back(msgPeek_);
485 } else {
486 delete msgPeek_.pdata;
487 }
488 fPeekKeep_ = false;
489 }
490
491 // Remove from ordered message queue
492
493 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
494 if (it->Match(phandler, id)) {
495 if (removed) {
496 removed->push_back(*it);
497 } else {
498 delete it->pdata;
499 }
500 it = msgq_.erase(it);
501 } else {
502 ++it;
503 }
504 }
505
506 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50507
henrike@webrtc.orgf0488722014-05-13 18:00:26508 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
509 for (PriorityQueue::container_type::iterator it = new_end;
510 it != dmsgq_.container().end(); ++it) {
511 if (it->msg_.Match(phandler, id)) {
512 if (removed) {
513 removed->push_back(it->msg_);
514 } else {
515 delete it->msg_.pdata;
516 }
517 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50518 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26519 }
520 }
521 dmsgq_.container().erase(new_end, dmsgq_.container().end());
522 dmsgq_.reheap();
523}
524
525void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 21:17:27526 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
527 pmsg->posted_from.file_and_line(), "src_func",
528 pmsg->posted_from.function_name());
529 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26530 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 21:17:27531 int64_t end_time = TimeMillis();
532 int64_t diff = TimeDiff(end_time, start_time);
533 if (diff >= kSlowDispatchLoggingThreshold) {
Mirko Bonadei675513b2017-11-09 10:09:25534 RTC_LOG(LS_INFO) << "Message took " << diff
535 << "ms to dispatch. Posted from: "
536 << pmsg->posted_from.ToString();
Taylor Brandstetter5d97a9a2016-06-10 21:17:27537 }
henrike@webrtc.orgf0488722014-05-13 18:00:26538}
539
henrike@webrtc.orgf0488722014-05-13 18:00:26540} // namespace rtc