Cleanup of rtc::Thread.
* Updates variable names to be more descriptive.
* Removes unused sensitive delay timing functionality.
* Removes deprecated PostAt() overload.
Bug: webrtc:9883
Change-Id: I68e8072fab345c5b169cbe5602a0a252eb71b5ec
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165393
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30323}
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index b20ec2d..00a582c 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -67,7 +67,6 @@
namespace rtc {
namespace {
-const int kMaxMsgLatency = 150; // 150 ms
const int kSlowDispatchLoggingThreshold = 50; // 50 ms
class MessageHandlerWithTask final : public MessageHandler {
@@ -305,7 +304,7 @@
Thread::Thread(SocketServer* ss, bool do_init)
: fPeekKeep_(false),
- dmsgq_next_num_(0),
+ delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
@@ -406,7 +405,7 @@
int64_t msCurrent = msStart;
while (true) {
// Check for sent messages
- ReceiveSends();
+ ReceiveSendsFromThread(nullptr);
// Check for posted events
int64_t cmsDelayNext = kForever;
@@ -421,33 +420,25 @@
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass = false;
- while (!dmsgq_.empty()) {
- if (msCurrent < dmsgq_.top().msTrigger_) {
- cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
+ while (!delayed_messages_.empty()) {
+ if (msCurrent < delayed_messages_.top().run_time_ms_) {
+ cmsDelayNext =
+ TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
- msgq_.push_back(dmsgq_.top().msg_);
- dmsgq_.pop();
+ messages_.push_back(delayed_messages_.top().msg_);
+ delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
- if (msgq_.empty()) {
+ if (messages_.empty()) {
break;
} else {
- *pmsg = msgq_.front();
- msgq_.pop_front();
+ *pmsg = messages_.front();
+ messages_.pop_front();
}
} // crit_ is released here.
- // Log a warning for time-sensitive messages that we're late to deliver.
- if (pmsg->ts_sensitive) {
- int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
- if (delay > 0) {
- RTC_LOG_F(LS_WARNING)
- << "id: " << pmsg->message_id
- << " delay: " << (delay + kMaxMsgLatency) << "ms";
- }
- }
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
@@ -495,6 +486,7 @@
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
+ RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
@@ -511,45 +503,32 @@
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
- if (time_sensitive) {
- msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
- }
- msgq_.push_back(msg);
+ messages_.push_back(msg);
}
WakeUpSocketServer();
}
void Thread::PostDelayed(const Location& posted_from,
- int cmsDelay,
+ int delay_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
- return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
+ return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
pdata);
}
void Thread::PostAt(const Location& posted_from,
- uint32_t tstamp,
+ int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
- // This should work even if it is used (unexpectedly).
- int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
- return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
-}
-
-void Thread::PostAt(const Location& posted_from,
- int64_t tstamp,
- MessageHandler* phandler,
- uint32_t id,
- MessageData* pdata) {
- return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
+ return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
pdata);
}
void Thread::DoDelayPost(const Location& posted_from,
- int64_t cmsDelay,
- int64_t tstamp,
+ int64_t delay_ms,
+ int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
@@ -569,13 +548,13 @@
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
- DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
- dmsgq_.push(dmsg);
+ DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
+ delayed_messages_.push(delayed);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
- ++dmsgq_next_num_;
- RTC_DCHECK_NE(0, dmsgq_next_num_);
+ ++delayed_next_num_;
+ RTC_DCHECK_NE(0, delayed_next_num_);
}
WakeUpSocketServer();
}
@@ -583,11 +562,11 @@
int Thread::GetDelay() {
CritScope cs(&crit_);
- if (!msgq_.empty())
+ if (!messages_.empty())
return 0;
- if (!dmsgq_.empty()) {
- int delay = TimeUntil(dmsgq_.top().msTrigger_);
+ if (!delayed_messages_.empty()) {
+ int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
if (delay < 0)
delay = 0;
return delay;
@@ -612,14 +591,14 @@
// Remove from ordered message queue
- for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
+ for (auto it = messages_.begin(); it != messages_.end();) {
if (it->Match(phandler, id)) {
if (removed) {
removed->push_back(*it);
} else {
delete it->pdata;
}
- it = msgq_.erase(it);
+ it = messages_.erase(it);
} else {
++it;
}
@@ -627,9 +606,8 @@
// Remove from priority queue. Not directly iterable, so use this approach
- PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
- for (PriorityQueue::container_type::iterator it = new_end;
- it != dmsgq_.container().end(); ++it) {
+ auto new_end = delayed_messages_.container().begin();
+ for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
if (it->msg_.Match(phandler, id)) {
if (removed) {
removed->push_back(it->msg_);
@@ -640,8 +618,9 @@
*new_end++ = *it;
}
}
- dmsgq_.container().erase(new_end, dmsgq_.container().end());
- dmsgq_.reheap();
+ delayed_messages_.container().erase(new_end,
+ delayed_messages_.container().end());
+ delayed_messages_.reheap();
}
void Thread::Dispatch(Message* pmsg) {
@@ -909,10 +888,6 @@
}
}
-void Thread::ReceiveSends() {
- ReceiveSendsFromThread(nullptr);
-}
-
void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
@@ -935,8 +910,7 @@
}
bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
- for (std::list<_SendMessage>::iterator it = sendlist_.begin();
- it != sendlist_.end(); ++it) {
+ for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) {
if (it->thread == source || source == nullptr) {
*msg = *it;
sendlist_.erase(it);
@@ -1011,9 +985,7 @@
// Remove messages on sendlist_ with phandler
// Object target cleared: remove from send list, wakeup/set ready
// if sender not null.
-
- std::list<_SendMessage>::iterator iter = sendlist_.begin();
- while (iter != sendlist_.end()) {
+ for (auto iter = sendlist_.begin(); iter != sendlist_.end();) {
_SendMessage smsg = *iter;
if (smsg.msg.Match(phandler, id)) {
if (removed) {
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 8b853a8..77aff61 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -228,24 +228,19 @@
int cmsWait = kForever,
bool process_io = true);
virtual bool Peek(Message* pmsg, int cmsWait = 0);
+ // |time_sensitive| is deprecated and should always be false.
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr,
bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from,
- int cmsDelay,
+ int delay_ms,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from,
- int64_t tstamp,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr);
- // TODO(honghaiz): Remove this when all the dependencies are removed.
- virtual void PostAt(const Location& posted_from,
- uint32_t tstamp,
+ int64_t run_at_ms,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
@@ -253,15 +248,14 @@
uint32_t id = MQID_ANY,
MessageList* removed = nullptr);
virtual void Dispatch(Message* pmsg);
- virtual void ReceiveSends();
// Amount of time until the next message can be retrieved
virtual int GetDelay();
bool empty() const { return size() == 0u; }
size_t size() const {
- CritScope cs(&crit_); // msgq_.size() is not thread safe.
- return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
+ CritScope cs(&crit_);
+ return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
}
// Internally posts a message which causes the doomed object to be deleted
@@ -431,6 +425,33 @@
#endif
protected:
+ // DelayedMessage goes into a priority queue, sorted by trigger time. Messages
+ // with the same trigger time are processed in num_ (FIFO) order.
+ class DelayedMessage {
+ public:
+ DelayedMessage(int64_t delay,
+ int64_t run_time_ms,
+ uint32_t num,
+ const Message& msg)
+ : delay_ms_(delay),
+ run_time_ms_(run_time_ms),
+ message_number_(num),
+ msg_(msg) {}
+
+ bool operator<(const DelayedMessage& dmsg) const {
+ return (dmsg.run_time_ms_ < run_time_ms_) ||
+ ((dmsg.run_time_ms_ == run_time_ms_) &&
+ (dmsg.message_number_ < message_number_));
+ }
+
+ int64_t delay_ms_; // for debugging
+ int64_t run_time_ms_;
+ // Monotonicaly incrementing number used for ordering of messages
+ // targeted to execute at the same time.
+ uint32_t message_number_;
+ Message msg_;
+ };
+
class PriorityQueue : public std::priority_queue<DelayedMessage> {
public:
container_type& container() { return c; }
@@ -520,9 +541,9 @@
bool fPeekKeep_;
Message msgPeek_;
- MessageList msgq_ RTC_GUARDED_BY(crit_);
- PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
- uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
+ MessageList messages_ RTC_GUARDED_BY(crit_);
+ PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
+ uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
CriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;
diff --git a/rtc_base/thread_message.h b/rtc_base/thread_message.h
index 1f6af1a..80824e2 100644
--- a/rtc_base/thread_message.h
+++ b/rtc_base/thread_message.h
@@ -101,8 +101,7 @@
// No destructor
struct Message {
- Message()
- : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
+ Message() : phandler(nullptr), message_id(0), pdata(nullptr) {}
inline bool Match(MessageHandler* handler, uint32_t id) const {
return (handler == nullptr || handler == phandler) &&
(id == MQID_ANY || id == message_id);
@@ -111,31 +110,8 @@
MessageHandler* phandler;
uint32_t message_id;
MessageData* pdata;
- int64_t ts_sensitive;
};
typedef std::list<Message> MessageList;
-
-// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
-// with the same trigger time are processed in num_ (FIFO) order.
-
-class DelayedMessage {
- public:
- DelayedMessage(int64_t delay,
- int64_t trigger,
- uint32_t num,
- const Message& msg)
- : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
-
- bool operator<(const DelayedMessage& dmsg) const {
- return (dmsg.msTrigger_ < msTrigger_) ||
- ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
- }
-
- int64_t cmsDelay_; // for debugging
- int64_t msTrigger_;
- uint32_t num_;
- Message msg_;
-};
} // namespace rtc
#endif // RTC_BASE_THREAD_MESSAGE_H_