Add helper class to process RtcEventLog events in order.
Add helper class to process RtcEventLog events in order.
Use helper class to migrate rtc_event_log2rtp_dump.cc
to new parser API.
Bug: webrtc:8111
Change-Id: I7cbc220dad1f50be3a985ed44de27b38e5f20476
Reviewed-on: https://webrtc-review.googlesource.com/98601
Commit-Queue: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Elad Alon <eladalon@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24806}
diff --git a/logging/BUILD.gn b/logging/BUILD.gn
index eefb836..0269485 100644
--- a/logging/BUILD.gn
+++ b/logging/BUILD.gn
@@ -266,6 +266,7 @@
"rtc_event_log/rtc_event_log_parser.h",
"rtc_event_log/rtc_event_log_parser_new.cc",
"rtc_event_log/rtc_event_log_parser_new.h",
+ "rtc_event_log/rtc_event_processor.h",
]
deps = [
@@ -300,6 +301,7 @@
"rtc_event_log/rtc_event_log_unittest.cc",
"rtc_event_log/rtc_event_log_unittest_helper.cc",
"rtc_event_log/rtc_event_log_unittest_helper.h",
+ "rtc_event_log/rtc_event_processor_unittest.cc",
]
deps = [
":ice_log",
@@ -351,6 +353,8 @@
"../system_wrappers:field_trial_default",
"../system_wrappers:metrics_default",
"../test:rtp_test_utils",
+ "//third_party/abseil-cpp/absl/memory:memory",
+ "//third_party/abseil-cpp/absl/types:optional",
]
if (!build_with_chromium && is_clang) {
# Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
diff --git a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
index b62759b..a3d3456 100644
--- a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
+++ b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
@@ -15,9 +15,14 @@
#include <sstream>
#include <string>
+#include "absl/memory/memory.h"
+#include "absl/types/optional.h"
#include "logging/rtc_event_log/rtc_event_log.h"
#include "logging/rtc_event_log/rtc_event_log_parser_new.h"
+#include "logging/rtc_event_log/rtc_event_processor.h"
#include "modules/rtp_rtcp/source/byte_io.h"
+#include "modules/rtp_rtcp/source/rtp_header_extensions.h"
+#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/rtp_rtcp/source/rtp_utility.h"
#include "rtc_base/checks.h"
#include "rtc_base/flags.h"
@@ -59,8 +64,9 @@
// The empty string must be validated as true, because it is the default value
// of the command-line flag. In this case, no value is written to the output
// variable.
-bool ParseSsrc(std::string str, uint32_t* ssrc) {
+absl::optional<uint32_t> ParseSsrc(std::string str) {
// If the input string starts with 0x or 0X it indicates a hexadecimal number.
+ uint32_t ssrc;
auto read_mode = std::dec;
if (str.size() > 2 &&
(str.substr(0, 2) == "0x" || str.substr(0, 2) == "0X")) {
@@ -68,8 +74,79 @@
str = str.substr(2);
}
std::stringstream ss(str);
- ss >> read_mode >> *ssrc;
- return str.empty() || (!ss.fail() && ss.eof());
+ ss >> read_mode >> ssrc;
+ if (str.empty() || (!ss.fail() && ss.eof()))
+ return ssrc;
+ return absl::nullopt;
+}
+
+bool ShouldSkipStream(MediaType media_type,
+ uint32_t ssrc,
+ absl::optional<uint32_t> ssrc_filter) {
+ if (!FLAG_audio && media_type == MediaType::AUDIO)
+ return true;
+ if (!FLAG_video && media_type == MediaType::VIDEO)
+ return true;
+ if (!FLAG_data && media_type == MediaType::DATA)
+ return true;
+ if (ssrc_filter.has_value() && ssrc != *ssrc_filter)
+ return true;
+ return false;
+}
+
+// Convert a LoggedRtpPacketIncoming to a test::RtpPacket. Header extension IDs
+// are allocated according to the provided extension map. This might not match
+// the extension map used in the actual call.
+void ConvertRtpPacket(const webrtc::LoggedRtpPacketIncoming& incoming,
+ const webrtc::RtpHeaderExtensionMap default_extension_map,
+ webrtc::test::RtpPacket* packet) {
+ webrtc::RtpPacket reconstructed_packet(&default_extension_map);
+
+ reconstructed_packet.SetMarker(incoming.rtp.header.markerBit);
+ reconstructed_packet.SetPayloadType(incoming.rtp.header.payloadType);
+ reconstructed_packet.SetSequenceNumber(incoming.rtp.header.sequenceNumber);
+ reconstructed_packet.SetTimestamp(incoming.rtp.header.timestamp);
+ reconstructed_packet.SetSsrc(incoming.rtp.header.ssrc);
+ if (incoming.rtp.header.numCSRCs > 0) {
+ reconstructed_packet.SetCsrcs(rtc::ArrayView<const uint32_t>(
+ incoming.rtp.header.arrOfCSRCs, incoming.rtp.header.numCSRCs));
+ }
+
+ // Set extensions.
+ if (incoming.rtp.header.extension.hasTransmissionTimeOffset)
+ reconstructed_packet.SetExtension<webrtc::TransmissionOffset>(
+ incoming.rtp.header.extension.transmissionTimeOffset);
+ if (incoming.rtp.header.extension.hasAbsoluteSendTime)
+ reconstructed_packet.SetExtension<webrtc::AbsoluteSendTime>(
+ incoming.rtp.header.extension.absoluteSendTime);
+ if (incoming.rtp.header.extension.hasTransportSequenceNumber)
+ reconstructed_packet.SetExtension<webrtc::TransportSequenceNumber>(
+ incoming.rtp.header.extension.transportSequenceNumber);
+ if (incoming.rtp.header.extension.hasAudioLevel)
+ reconstructed_packet.SetExtension<webrtc::AudioLevel>(
+ incoming.rtp.header.extension.voiceActivity,
+ incoming.rtp.header.extension.audioLevel);
+ if (incoming.rtp.header.extension.hasVideoRotation)
+ reconstructed_packet.SetExtension<webrtc::VideoOrientation>(
+ incoming.rtp.header.extension.videoRotation);
+ if (incoming.rtp.header.extension.hasVideoContentType)
+ reconstructed_packet.SetExtension<webrtc::VideoContentTypeExtension>(
+ incoming.rtp.header.extension.videoContentType);
+ if (incoming.rtp.header.extension.has_video_timing)
+ reconstructed_packet.SetExtension<webrtc::VideoTimingExtension>(
+ incoming.rtp.header.extension.video_timing);
+
+ RTC_DCHECK_EQ(reconstructed_packet.size(), incoming.rtp.header_length);
+ RTC_DCHECK_EQ(reconstructed_packet.headers_size(),
+ incoming.rtp.header_length);
+ memcpy(packet->data, reconstructed_packet.data(),
+ reconstructed_packet.headers_size());
+ packet->length = reconstructed_packet.headers_size();
+ packet->original_length = incoming.rtp.total_length;
+ packet->time_ms = incoming.log_time_ms();
+ // Set padding bit.
+ if (incoming.rtp.header.paddingLength > 0)
+ packet->data[0] = packet->data[0] | 0x20;
}
} // namespace
@@ -97,10 +174,11 @@
std::string input_file = argv[1];
std::string output_file = argv[2];
- uint32_t ssrc_filter = 0;
- if (strlen(FLAG_ssrc) > 0)
- RTC_CHECK(ParseSsrc(FLAG_ssrc, &ssrc_filter))
- << "Flag verification has failed.";
+ absl::optional<uint32_t> ssrc_filter;
+ if (strlen(FLAG_ssrc) > 0) {
+ ssrc_filter = ParseSsrc(FLAG_ssrc);
+ RTC_CHECK(ssrc_filter.has_value()) << "Failed to read SSRC filter flag.";
+ }
webrtc::ParsedRtcEventLogNew parsed_stream;
if (!parsed_stream.ParseFile(input_file)) {
@@ -122,84 +200,57 @@
<< " events in the input file." << std::endl;
int rtp_counter = 0, rtcp_counter = 0;
bool header_only = false;
- for (size_t i = 0; i < parsed_stream.GetNumberOfEvents(); i++) {
- // The parsed_stream will assert if the protobuf event is missing
- // some required fields and we attempt to access them. We could consider
- // a softer failure option, but it does not seem useful to generate
- // RTP dumps based on broken event logs.
- if (FLAG_rtp && parsed_stream.GetEventType(i) ==
- webrtc::ParsedRtcEventLogNew::EventType::RTP_EVENT) {
- webrtc::test::RtpPacket packet;
- webrtc::PacketDirection direction;
- parsed_stream.GetRtpHeader(i, &direction, packet.data, &packet.length,
- &packet.original_length, nullptr);
- if (packet.original_length > packet.length)
- header_only = true;
- packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
- webrtc::RtpUtility::RtpHeaderParser rtp_parser(packet.data,
- packet.length);
+ webrtc::RtpHeaderExtensionMap default_extension_map =
+ webrtc::ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap();
+ auto handle_rtp = [&default_extension_map, &rtp_writer, &rtp_counter](
+ const webrtc::LoggedRtpPacketIncoming& incoming) {
+ webrtc::test::RtpPacket packet;
+ ConvertRtpPacket(incoming, default_extension_map, &packet);
- // TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
- if (direction == webrtc::kOutgoingPacket)
- continue;
+ rtp_writer->WritePacket(&packet);
+ rtp_counter++;
+ };
- webrtc::RTPHeader parsed_header;
- rtp_parser.Parse(&parsed_header);
- MediaType media_type =
- parsed_stream.GetMediaType(parsed_header.ssrc, direction);
- if (!FLAG_audio && media_type == MediaType::AUDIO)
- continue;
- if (!FLAG_video && media_type == MediaType::VIDEO)
- continue;
- if (!FLAG_data && media_type == MediaType::DATA)
- continue;
- if (strlen(FLAG_ssrc) > 0) {
- const uint32_t packet_ssrc =
- webrtc::ByteReader<uint32_t>::ReadBigEndian(
- reinterpret_cast<const uint8_t*>(packet.data + 8));
- if (packet_ssrc != ssrc_filter)
- continue;
- }
+ auto handle_rtcp = [&rtp_writer, &rtcp_counter](
+ const webrtc::LoggedRtcpPacketIncoming& incoming) {
+ webrtc::test::RtpPacket packet;
+ memcpy(packet.data, incoming.rtcp.raw_data.data(),
+ incoming.rtcp.raw_data.size());
+ packet.length = incoming.rtcp.raw_data.size();
+ // For RTCP packets the original_length should be set to 0 in the
+ // RTPdump format.
+ packet.original_length = 0;
+ packet.time_ms = incoming.log_time_ms();
- rtp_writer->WritePacket(&packet);
- rtp_counter++;
- }
- if (FLAG_rtcp && parsed_stream.GetEventType(i) ==
- webrtc::ParsedRtcEventLogNew::EventType::RTCP_EVENT) {
- webrtc::test::RtpPacket packet;
- webrtc::PacketDirection direction;
- parsed_stream.GetRtcpPacket(i, &direction, packet.data, &packet.length);
- // For RTCP packets the original_length should be set to 0 in the
- // RTPdump format.
- packet.original_length = 0;
- packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
+ rtp_writer->WritePacket(&packet);
+ rtcp_counter++;
+ };
- // TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
- if (direction == webrtc::kOutgoingPacket)
- continue;
-
- // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
- // report blocks for many streams, thus several SSRCs and they doen't
- // necessarily have to be of the same media type.
- const uint32_t packet_ssrc = webrtc::ByteReader<uint32_t>::ReadBigEndian(
- reinterpret_cast<const uint8_t*>(packet.data + 4));
- MediaType media_type = parsed_stream.GetMediaType(packet_ssrc, direction);
- if (!FLAG_audio && media_type == MediaType::AUDIO)
- continue;
- if (!FLAG_video && media_type == MediaType::VIDEO)
- continue;
- if (!FLAG_data && media_type == MediaType::DATA)
- continue;
- if (strlen(FLAG_ssrc) > 0) {
- if (packet_ssrc != ssrc_filter)
- continue;
- }
-
- rtp_writer->WritePacket(&packet);
- rtcp_counter++;
- }
+ webrtc::RtcEventProcessor event_processor;
+ for (const auto& stream : parsed_stream.incoming_rtp_packets_by_ssrc()) {
+ MediaType media_type =
+ parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket);
+ if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter))
+ continue;
+ auto rtp_view = absl::make_unique<
+ webrtc::ProcessableEventList<webrtc::LoggedRtpPacketIncoming>>(
+ stream.incoming_packets.begin(), stream.incoming_packets.end(),
+ handle_rtp);
+ event_processor.AddEvents(std::move(rtp_view));
}
+ // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
+ // report blocks for many streams, thus several SSRCs and they don't
+ // necessarily have to be of the same media type. We therefore don't
+ // support filtering of RTCP based on SSRC and media type.
+ auto rtcp_view = absl::make_unique<
+ webrtc::ProcessableEventList<webrtc::LoggedRtcpPacketIncoming>>(
+ parsed_stream.incoming_rtcp_packets().begin(),
+ parsed_stream.incoming_rtcp_packets().end(), handle_rtcp);
+ event_processor.AddEvents(std::move(rtcp_view));
+
+ event_processor.ProcessEventsInOrder();
+
std::cout << "Wrote " << rtp_counter << (header_only ? " header-only" : "")
<< " RTP packets and " << rtcp_counter << " RTCP packets to the "
<< "output file." << std::endl;
diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.cc b/logging/rtc_event_log/rtc_event_log_parser_new.cc
index 9fbf0d0..4a75dc3 100644
--- a/logging/rtc_event_log/rtc_event_log_parser_new.cc
+++ b/logging/rtc_event_log/rtc_event_log_parser_new.cc
@@ -210,33 +210,6 @@
return IceCandidatePairEventType::kCheckSent;
}
-// Return default values for header extensions, to use on streams without stored
-// mapping data. Currently this only applies to audio streams, since the mapping
-// is not stored in the event log.
-// TODO(ivoc): Remove this once this mapping is stored in the event log for
-// audio streams. Tracking bug: webrtc:6399
-webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap() {
- webrtc::RtpHeaderExtensionMap default_map;
- default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
- default_map.Register<TransmissionOffset>(
- webrtc::RtpExtension::kTimestampOffsetDefaultId);
- default_map.Register<AbsoluteSendTime>(
- webrtc::RtpExtension::kAbsSendTimeDefaultId);
- default_map.Register<VideoOrientation>(
- webrtc::RtpExtension::kVideoRotationDefaultId);
- default_map.Register<VideoContentTypeExtension>(
- webrtc::RtpExtension::kVideoContentTypeDefaultId);
- default_map.Register<VideoTimingExtension>(
- webrtc::RtpExtension::kVideoTimingDefaultId);
- default_map.Register<FrameMarkingExtension>(
- webrtc::RtpExtension::kFrameMarkingDefaultId);
- default_map.Register<TransportSequenceNumber>(
- webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
- default_map.Register<PlayoutDelayLimits>(
- webrtc::RtpExtension::kPlayoutDelayDefaultId);
- return default_map;
-}
-
std::pair<uint64_t, bool> ParseVarInt(
std::istream& stream) { // no-presubmit-check TODO(webrtc:8982)
uint64_t varint = 0;
@@ -328,6 +301,32 @@
ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView(
const LoggedRtpStreamView&) = default;
+// Return default values for header extensions, to use on streams without stored
+// mapping data. Currently this only applies to audio streams, since the mapping
+// is not stored in the event log.
+// TODO(ivoc): Remove this once this mapping is stored in the event log for
+// audio streams. Tracking bug: webrtc:6399
+webrtc::RtpHeaderExtensionMap
+ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap() {
+ webrtc::RtpHeaderExtensionMap default_map;
+ default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
+ default_map.Register<TransmissionOffset>(
+ webrtc::RtpExtension::kTimestampOffsetDefaultId);
+ default_map.Register<AbsoluteSendTime>(
+ webrtc::RtpExtension::kAbsSendTimeDefaultId);
+ default_map.Register<VideoOrientation>(
+ webrtc::RtpExtension::kVideoRotationDefaultId);
+ default_map.Register<VideoContentTypeExtension>(
+ webrtc::RtpExtension::kVideoContentTypeDefaultId);
+ default_map.Register<VideoTimingExtension>(
+ webrtc::RtpExtension::kVideoTimingDefaultId);
+ default_map.Register<TransportSequenceNumber>(
+ webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
+ default_map.Register<PlayoutDelayLimits>(
+ webrtc::RtpExtension::kPlayoutDelayDefaultId);
+ return default_map;
+}
+
ParsedRtcEventLogNew::ParsedRtcEventLogNew(
UnconfiguredHeaderExtensions parse_unconfigured_header_extensions)
: parse_unconfigured_header_extensions_(
diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.h b/logging/rtc_event_log/rtc_event_log_parser_new.h
index bf35ae9..8f7cc8f 100644
--- a/logging/rtc_event_log/rtc_event_log_parser_new.h
+++ b/logging/rtc_event_log/rtc_event_log_parser_new.h
@@ -49,6 +49,11 @@
enum class BandwidthUsage;
struct AudioEncoderRuntimeConfig;
+// The different event types are deliberately POD. Analysis of large logs is
+// already resource intensive. The code simplifications that would be possible
+// possible by having a base class (containing e.g. the log time) are not
+// considered to outweigh the added memory and runtime overhead incurred by
+// adding a vptr.
struct LoggedAlrStateEvent {
int64_t timestamp_us;
bool in_alr;
@@ -244,7 +249,7 @@
};
struct LoggedStartEvent {
- explicit LoggedStartEvent(uint64_t timestamp_us)
+ explicit LoggedStartEvent(int64_t timestamp_us)
: timestamp_us(timestamp_us) {}
int64_t timestamp_us;
int64_t log_time_us() const { return timestamp_us; }
@@ -252,8 +257,7 @@
};
struct LoggedStopEvent {
- explicit LoggedStopEvent(uint64_t timestamp_us)
- : timestamp_us(timestamp_us) {}
+ explicit LoggedStopEvent(int64_t timestamp_us) : timestamp_us(timestamp_us) {}
int64_t timestamp_us;
int64_t log_time_us() const { return timestamp_us; }
int64_t log_time_ms() const { return timestamp_us / 1000; }
@@ -531,6 +535,8 @@
PacketView<const LoggedRtpPacket> packet_view;
};
+ static webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap();
+
explicit ParsedRtcEventLogNew(
UnconfiguredHeaderExtensions parse_unconfigured_header_extensions =
UnconfiguredHeaderExtensions::kDontParse);
diff --git a/logging/rtc_event_log/rtc_event_processor.h b/logging/rtc_event_log/rtc_event_processor.h
new file mode 100644
index 0000000..8bd81d3
--- /dev/null
+++ b/logging/rtc_event_log/rtc_event_processor.h
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
+#define LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/function_view.h"
+
+namespace webrtc {
+
+// This file contains helper class used to process the elements of two or more
+// sorted lists in timestamp order. The effect is the same as doing a merge step
+// in the merge-sort algorithm but without copying the elements or modifying the
+// lists.
+
+// Interface to allow "merging" lists of different types. ProcessNext()
+// processes the next unprocesses element in the list. IsEmpty() checks if all
+// elements have been processed. GetNextTime returns the timestamp of the next
+// unprocessed element.
+class ProcessableEventListInterface {
+ public:
+ virtual ~ProcessableEventListInterface() = default;
+ virtual void ProcessNext() = 0;
+ virtual bool IsEmpty() const = 0;
+ virtual int64_t GetNextTime() const = 0;
+};
+
+// ProcessableEventList encapsulates a list of events and a function that will
+// be applied to each element of the list.
+template <typename T>
+class ProcessableEventList : public ProcessableEventListInterface {
+ public:
+ // N.B. |f| is not owned by ProcessableEventList. The caller must ensure that
+ // the function object or lambda outlives ProcessableEventList and
+ // RtcEventProcessor. The same thing applies to the iterators (begin, end);
+ // the vector must outlive ProcessableEventList and must not be modified until
+ // processing has finished.
+ ProcessableEventList(typename std::vector<T>::const_iterator begin,
+ typename std::vector<T>::const_iterator end,
+ rtc::FunctionView<void(const T&)> f)
+ : begin_(begin), end_(end), f_(f) {}
+
+ void ProcessNext() override {
+ RTC_DCHECK(!IsEmpty());
+ f_(*begin_);
+ ++begin_;
+ }
+
+ bool IsEmpty() const override { return begin_ == end_; }
+
+ int64_t GetNextTime() const override {
+ RTC_DCHECK(!IsEmpty());
+ return begin_->log_time_us();
+ }
+
+ private:
+ typename std::vector<T>::const_iterator begin_;
+ typename std::vector<T>::const_iterator end_;
+ rtc::FunctionView<void(const T&)> f_;
+};
+
+// Helper class used to "merge" two or more lists of ordered RtcEventLog events
+// so that they can be treated as a single ordered list. Since the individual
+// lists may have different types, we need to access the lists via pointers to
+// the common base class.
+//
+// Usage example:
+// ParsedRtcEventLogNew log;
+// auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... };
+// auto incoming_rtcp =
+// absl::make_unique<ProcessableEventList<LoggedRtcpPacketIncoming>>(
+// log.incoming_rtcp_packets().begin(),
+// log.incoming_rtcp_packets().end(),
+// incoming_handler);
+// auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... };
+// auto outgoing_rtcp =
+// absl::make_unique<ProcessableEventList<LoggedRtcpPacketOutgoing>>(
+// log.outgoing_rtcp_packets().begin(),
+// log.outgoing_rtcp_packets().end(),
+// outgoing_handler);
+//
+// RtcEventProcessor processor;
+// processor.AddEvents(std::move(incoming_rtcp));
+// processor.AddEvents(std::move(outgoing_rtcp));
+// processor.ProcessEventsInOrder();
+class RtcEventProcessor {
+ public:
+ // The elements of each list is processed in the index order. To process all
+ // elements in all lists in timestamp order, each lists need to be sorted in
+ // timestamp order prior to insertion. Otherwise,
+ void AddEvents(std::unique_ptr<ProcessableEventListInterface> events) {
+ if (!events->IsEmpty()) {
+ event_lists_.push_back(std::move(events));
+ std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+ }
+ }
+
+ void ProcessEventsInOrder() {
+ // |event_lists_| is a min-heap of lists ordered by the timestamp of the
+ // first element in the list. We therefore process the first element of the
+ // first list, then reinsert the remainder of that list into the heap
+ // if the list still contains unprocessed elements.
+ while (!event_lists_.empty()) {
+ event_lists_.front()->ProcessNext();
+ std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+ if (event_lists_.back()->IsEmpty()) {
+ event_lists_.pop_back();
+ } else {
+ std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+ }
+ }
+ }
+
+ private:
+ using ListPtrType = std::unique_ptr<ProcessableEventListInterface>;
+ std::vector<ListPtrType> event_lists_;
+ // Comparison function to make |event_lists_| into a min heap.
+ static bool Cmp(const ListPtrType& a, const ListPtrType& b) {
+ return a->GetNextTime() > b->GetNextTime();
+ }
+};
+
+} // namespace webrtc
+
+#endif // LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
diff --git a/logging/rtc_event_log/rtc_event_processor_unittest.cc b/logging/rtc_event_log/rtc_event_processor_unittest.cc
new file mode 100644
index 0000000..f2764e7
--- /dev/null
+++ b/logging/rtc_event_log/rtc_event_processor_unittest.cc
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "logging/rtc_event_log/rtc_event_processor.h"
+
+#include <initializer_list>
+#include <numeric>
+
+#include "absl/memory/memory.h"
+#include "logging/rtc_event_log/rtc_event_log_parser_new.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/random.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+
+namespace {
+std::vector<LoggedStartEvent> CreateEventList(
+ std::initializer_list<int64_t> timestamp_list) {
+ std::vector<LoggedStartEvent> v;
+ for (int64_t timestamp_ms : timestamp_list) {
+ v.emplace_back(timestamp_ms * 1000); // Convert ms to us.
+ }
+ return v;
+}
+
+using OrderedEventView = ProcessableEventList<LoggedStartEvent>;
+
+std::vector<std::vector<LoggedStartEvent>>
+CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) {
+ Random prng(seed);
+ std::vector<std::vector<LoggedStartEvent>> lists(num_lists);
+ for (size_t elem = 0; elem < num_elements; elem++) {
+ uint32_t i = prng.Rand(0u, num_lists - 1);
+ int64_t timestamp_ms = elem;
+ lists[i].emplace_back(timestamp_ms * 1000);
+ }
+ return lists;
+}
+} // namespace
+
+TEST(RtcEventProcessor, NoList) {
+ RtcEventProcessor processor;
+ processor.ProcessEventsInOrder(); // Don't crash but do nothing.
+}
+
+TEST(RtcEventProcessor, EmptyList) {
+ auto not_called = [](LoggedStartEvent /*elem*/) { EXPECT_TRUE(false); };
+ std::vector<LoggedStartEvent> events;
+ RtcEventProcessor processor;
+
+ processor.AddEvents(absl::make_unique<OrderedEventView>(
+ events.begin(), events.end(), not_called));
+ processor.ProcessEventsInOrder(); // Don't crash but do nothing.
+}
+
+TEST(RtcEventProcessor, OneList) {
+ std::vector<LoggedStartEvent> result;
+ auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+ std::vector<LoggedStartEvent> events(CreateEventList({1, 2, 3, 4}));
+ RtcEventProcessor processor;
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(events.begin(), events.end(), f));
+ processor.ProcessEventsInOrder();
+
+ std::vector<int64_t> expected_results{1, 2, 3, 4};
+ ASSERT_EQ(result.size(), expected_results.size());
+ for (size_t i = 0; i < expected_results.size(); i++) {
+ EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+ }
+}
+
+TEST(RtcEventProcessor, MergeTwoLists) {
+ std::vector<LoggedStartEvent> result;
+ auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+ std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 4, 7, 8, 9}));
+ std::vector<LoggedStartEvent> events2(CreateEventList({3, 5, 6, 10}));
+ RtcEventProcessor processor;
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+ processor.ProcessEventsInOrder();
+
+ std::vector<int64_t> expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ ASSERT_EQ(result.size(), expected_results.size());
+ for (size_t i = 0; i < expected_results.size(); i++) {
+ EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+ }
+}
+
+TEST(RtcEventProcessor, MergeTwoListsWithDuplicatedElements) {
+ std::vector<LoggedStartEvent> result;
+ auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+ std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 2, 3, 5, 5}));
+ std::vector<LoggedStartEvent> events2(CreateEventList({1, 3, 4, 4}));
+ RtcEventProcessor processor;
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+ processor.ProcessEventsInOrder();
+
+ std::vector<int64_t> expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
+ ASSERT_EQ(result.size(), expected_results.size());
+ for (size_t i = 0; i < expected_results.size(); i++) {
+ EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+ }
+}
+
+TEST(RtcEventProcessor, MergeManyLists) {
+ std::vector<LoggedStartEvent> result;
+ auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+ constexpr size_t kNumLists = 5;
+ constexpr size_t kNumElems = 30;
+ constexpr uint64_t kSeed = 0xF3C6B91F;
+ std::vector<std::vector<LoggedStartEvent>> lists(
+ CreateRandomEventLists(kNumLists, kNumElems, kSeed));
+ RTC_DCHECK_EQ(lists.size(), kNumLists);
+ RtcEventProcessor processor;
+ for (const auto& list : lists) {
+ processor.AddEvents(
+ absl::make_unique<OrderedEventView>(list.begin(), list.end(), f));
+ }
+ processor.ProcessEventsInOrder();
+
+ std::vector<int64_t> expected_results(kNumElems);
+ std::iota(expected_results.begin(), expected_results.end(), 0);
+ ASSERT_EQ(result.size(), expected_results.size());
+ for (size_t i = 0; i < expected_results.size(); i++) {
+ EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+ }
+}
+
+TEST(RtcEventProcessor, DifferentTypes) {
+ std::vector<int64_t> result;
+ auto f1 = [&result](LoggedStartEvent elem) {
+ result.push_back(elem.log_time_ms());
+ };
+ auto f2 = [&result](LoggedStopEvent elem) {
+ result.push_back(elem.log_time_ms());
+ };
+
+ std::vector<LoggedStartEvent> events1{LoggedStartEvent(2000)};
+ std::vector<LoggedStopEvent> events2{LoggedStopEvent(1000)};
+ RtcEventProcessor processor;
+ processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStartEvent>>(
+ events1.begin(), events1.end(), f1));
+ processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStopEvent>>(
+ events2.begin(), events2.end(), f2));
+ processor.ProcessEventsInOrder();
+
+ std::vector<int64_t> expected_results{1, 2};
+ ASSERT_EQ(result.size(), expected_results.size());
+ for (size_t i = 0; i < expected_results.size(); i++) {
+ EXPECT_EQ(result[i], expected_results[i]);
+ }
+}
+
+} // namespace webrtc