Add helper class to process RtcEventLog events in order.

Add helper class to process RtcEventLog events in order.
Use helper class to migrate rtc_event_log2rtp_dump.cc
to new parser API.

Bug: webrtc:8111
Change-Id: I7cbc220dad1f50be3a985ed44de27b38e5f20476
Reviewed-on: https://webrtc-review.googlesource.com/98601
Commit-Queue: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Elad Alon <eladalon@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24806}
diff --git a/logging/BUILD.gn b/logging/BUILD.gn
index eefb836..0269485 100644
--- a/logging/BUILD.gn
+++ b/logging/BUILD.gn
@@ -266,6 +266,7 @@
       "rtc_event_log/rtc_event_log_parser.h",
       "rtc_event_log/rtc_event_log_parser_new.cc",
       "rtc_event_log/rtc_event_log_parser_new.h",
+      "rtc_event_log/rtc_event_processor.h",
     ]
 
     deps = [
@@ -300,6 +301,7 @@
         "rtc_event_log/rtc_event_log_unittest.cc",
         "rtc_event_log/rtc_event_log_unittest_helper.cc",
         "rtc_event_log/rtc_event_log_unittest_helper.h",
+        "rtc_event_log/rtc_event_processor_unittest.cc",
       ]
       deps = [
         ":ice_log",
@@ -351,6 +353,8 @@
         "../system_wrappers:field_trial_default",
         "../system_wrappers:metrics_default",
         "../test:rtp_test_utils",
+        "//third_party/abseil-cpp/absl/memory:memory",
+        "//third_party/abseil-cpp/absl/types:optional",
       ]
       if (!build_with_chromium && is_clang) {
         # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
diff --git a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
index b62759b..a3d3456 100644
--- a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
+++ b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
@@ -15,9 +15,14 @@
 #include <sstream>
 #include <string>
 
+#include "absl/memory/memory.h"
+#include "absl/types/optional.h"
 #include "logging/rtc_event_log/rtc_event_log.h"
 #include "logging/rtc_event_log/rtc_event_log_parser_new.h"
+#include "logging/rtc_event_log/rtc_event_processor.h"
 #include "modules/rtp_rtcp/source/byte_io.h"
+#include "modules/rtp_rtcp/source/rtp_header_extensions.h"
+#include "modules/rtp_rtcp/source/rtp_packet.h"
 #include "modules/rtp_rtcp/source/rtp_utility.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/flags.h"
@@ -59,8 +64,9 @@
 // The empty string must be validated as true, because it is the default value
 // of the command-line flag. In this case, no value is written to the output
 // variable.
-bool ParseSsrc(std::string str, uint32_t* ssrc) {
+absl::optional<uint32_t> ParseSsrc(std::string str) {
   // If the input string starts with 0x or 0X it indicates a hexadecimal number.
+  uint32_t ssrc;
   auto read_mode = std::dec;
   if (str.size() > 2 &&
       (str.substr(0, 2) == "0x" || str.substr(0, 2) == "0X")) {
@@ -68,8 +74,79 @@
     str = str.substr(2);
   }
   std::stringstream ss(str);
-  ss >> read_mode >> *ssrc;
-  return str.empty() || (!ss.fail() && ss.eof());
+  ss >> read_mode >> ssrc;
+  if (str.empty() || (!ss.fail() && ss.eof()))
+    return ssrc;
+  return absl::nullopt;
+}
+
+bool ShouldSkipStream(MediaType media_type,
+                      uint32_t ssrc,
+                      absl::optional<uint32_t> ssrc_filter) {
+  if (!FLAG_audio && media_type == MediaType::AUDIO)
+    return true;
+  if (!FLAG_video && media_type == MediaType::VIDEO)
+    return true;
+  if (!FLAG_data && media_type == MediaType::DATA)
+    return true;
+  if (ssrc_filter.has_value() && ssrc != *ssrc_filter)
+    return true;
+  return false;
+}
+
+// Convert a LoggedRtpPacketIncoming to a test::RtpPacket. Header extension IDs
+// are allocated according to the provided extension map. This might not match
+// the extension map used in the actual call.
+void ConvertRtpPacket(const webrtc::LoggedRtpPacketIncoming& incoming,
+                      const webrtc::RtpHeaderExtensionMap default_extension_map,
+                      webrtc::test::RtpPacket* packet) {
+  webrtc::RtpPacket reconstructed_packet(&default_extension_map);
+
+  reconstructed_packet.SetMarker(incoming.rtp.header.markerBit);
+  reconstructed_packet.SetPayloadType(incoming.rtp.header.payloadType);
+  reconstructed_packet.SetSequenceNumber(incoming.rtp.header.sequenceNumber);
+  reconstructed_packet.SetTimestamp(incoming.rtp.header.timestamp);
+  reconstructed_packet.SetSsrc(incoming.rtp.header.ssrc);
+  if (incoming.rtp.header.numCSRCs > 0) {
+    reconstructed_packet.SetCsrcs(rtc::ArrayView<const uint32_t>(
+        incoming.rtp.header.arrOfCSRCs, incoming.rtp.header.numCSRCs));
+  }
+
+  // Set extensions.
+  if (incoming.rtp.header.extension.hasTransmissionTimeOffset)
+    reconstructed_packet.SetExtension<webrtc::TransmissionOffset>(
+        incoming.rtp.header.extension.transmissionTimeOffset);
+  if (incoming.rtp.header.extension.hasAbsoluteSendTime)
+    reconstructed_packet.SetExtension<webrtc::AbsoluteSendTime>(
+        incoming.rtp.header.extension.absoluteSendTime);
+  if (incoming.rtp.header.extension.hasTransportSequenceNumber)
+    reconstructed_packet.SetExtension<webrtc::TransportSequenceNumber>(
+        incoming.rtp.header.extension.transportSequenceNumber);
+  if (incoming.rtp.header.extension.hasAudioLevel)
+    reconstructed_packet.SetExtension<webrtc::AudioLevel>(
+        incoming.rtp.header.extension.voiceActivity,
+        incoming.rtp.header.extension.audioLevel);
+  if (incoming.rtp.header.extension.hasVideoRotation)
+    reconstructed_packet.SetExtension<webrtc::VideoOrientation>(
+        incoming.rtp.header.extension.videoRotation);
+  if (incoming.rtp.header.extension.hasVideoContentType)
+    reconstructed_packet.SetExtension<webrtc::VideoContentTypeExtension>(
+        incoming.rtp.header.extension.videoContentType);
+  if (incoming.rtp.header.extension.has_video_timing)
+    reconstructed_packet.SetExtension<webrtc::VideoTimingExtension>(
+        incoming.rtp.header.extension.video_timing);
+
+  RTC_DCHECK_EQ(reconstructed_packet.size(), incoming.rtp.header_length);
+  RTC_DCHECK_EQ(reconstructed_packet.headers_size(),
+                incoming.rtp.header_length);
+  memcpy(packet->data, reconstructed_packet.data(),
+         reconstructed_packet.headers_size());
+  packet->length = reconstructed_packet.headers_size();
+  packet->original_length = incoming.rtp.total_length;
+  packet->time_ms = incoming.log_time_ms();
+  // Set padding bit.
+  if (incoming.rtp.header.paddingLength > 0)
+    packet->data[0] = packet->data[0] | 0x20;
 }
 
 }  // namespace
@@ -97,10 +174,11 @@
   std::string input_file = argv[1];
   std::string output_file = argv[2];
 
-  uint32_t ssrc_filter = 0;
-  if (strlen(FLAG_ssrc) > 0)
-    RTC_CHECK(ParseSsrc(FLAG_ssrc, &ssrc_filter))
-        << "Flag verification has failed.";
+  absl::optional<uint32_t> ssrc_filter;
+  if (strlen(FLAG_ssrc) > 0) {
+    ssrc_filter = ParseSsrc(FLAG_ssrc);
+    RTC_CHECK(ssrc_filter.has_value()) << "Failed to read SSRC filter flag.";
+  }
 
   webrtc::ParsedRtcEventLogNew parsed_stream;
   if (!parsed_stream.ParseFile(input_file)) {
@@ -122,84 +200,57 @@
             << " events in the input file." << std::endl;
   int rtp_counter = 0, rtcp_counter = 0;
   bool header_only = false;
-  for (size_t i = 0; i < parsed_stream.GetNumberOfEvents(); i++) {
-    // The parsed_stream will assert if the protobuf event is missing
-    // some required fields and we attempt to access them. We could consider
-    // a softer failure option, but it does not seem useful to generate
-    // RTP dumps based on broken event logs.
-    if (FLAG_rtp && parsed_stream.GetEventType(i) ==
-                        webrtc::ParsedRtcEventLogNew::EventType::RTP_EVENT) {
-      webrtc::test::RtpPacket packet;
-      webrtc::PacketDirection direction;
-      parsed_stream.GetRtpHeader(i, &direction, packet.data, &packet.length,
-                                 &packet.original_length, nullptr);
-      if (packet.original_length > packet.length)
-        header_only = true;
-      packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
 
-      webrtc::RtpUtility::RtpHeaderParser rtp_parser(packet.data,
-                                                     packet.length);
+  webrtc::RtpHeaderExtensionMap default_extension_map =
+      webrtc::ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap();
+  auto handle_rtp = [&default_extension_map, &rtp_writer, &rtp_counter](
+                        const webrtc::LoggedRtpPacketIncoming& incoming) {
+    webrtc::test::RtpPacket packet;
+    ConvertRtpPacket(incoming, default_extension_map, &packet);
 
-      // TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
-      if (direction == webrtc::kOutgoingPacket)
-        continue;
+    rtp_writer->WritePacket(&packet);
+    rtp_counter++;
+  };
 
-      webrtc::RTPHeader parsed_header;
-      rtp_parser.Parse(&parsed_header);
-      MediaType media_type =
-          parsed_stream.GetMediaType(parsed_header.ssrc, direction);
-      if (!FLAG_audio && media_type == MediaType::AUDIO)
-        continue;
-      if (!FLAG_video && media_type == MediaType::VIDEO)
-        continue;
-      if (!FLAG_data && media_type == MediaType::DATA)
-        continue;
-      if (strlen(FLAG_ssrc) > 0) {
-        const uint32_t packet_ssrc =
-            webrtc::ByteReader<uint32_t>::ReadBigEndian(
-                reinterpret_cast<const uint8_t*>(packet.data + 8));
-        if (packet_ssrc != ssrc_filter)
-          continue;
-      }
+  auto handle_rtcp = [&rtp_writer, &rtcp_counter](
+                         const webrtc::LoggedRtcpPacketIncoming& incoming) {
+    webrtc::test::RtpPacket packet;
+    memcpy(packet.data, incoming.rtcp.raw_data.data(),
+           incoming.rtcp.raw_data.size());
+    packet.length = incoming.rtcp.raw_data.size();
+    // For RTCP packets the original_length should be set to 0 in the
+    // RTPdump format.
+    packet.original_length = 0;
+    packet.time_ms = incoming.log_time_ms();
 
-      rtp_writer->WritePacket(&packet);
-      rtp_counter++;
-    }
-    if (FLAG_rtcp && parsed_stream.GetEventType(i) ==
-                         webrtc::ParsedRtcEventLogNew::EventType::RTCP_EVENT) {
-      webrtc::test::RtpPacket packet;
-      webrtc::PacketDirection direction;
-      parsed_stream.GetRtcpPacket(i, &direction, packet.data, &packet.length);
-      // For RTCP packets the original_length should be set to 0 in the
-      // RTPdump format.
-      packet.original_length = 0;
-      packet.time_ms = parsed_stream.GetTimestamp(i) / 1000;
+    rtp_writer->WritePacket(&packet);
+    rtcp_counter++;
+  };
 
-      // TODO(terelius): Maybe add a flag to dump outgoing traffic instead?
-      if (direction == webrtc::kOutgoingPacket)
-        continue;
-
-      // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
-      // report blocks for many streams, thus several SSRCs and they doen't
-      // necessarily have to be of the same media type.
-      const uint32_t packet_ssrc = webrtc::ByteReader<uint32_t>::ReadBigEndian(
-          reinterpret_cast<const uint8_t*>(packet.data + 4));
-      MediaType media_type = parsed_stream.GetMediaType(packet_ssrc, direction);
-      if (!FLAG_audio && media_type == MediaType::AUDIO)
-        continue;
-      if (!FLAG_video && media_type == MediaType::VIDEO)
-        continue;
-      if (!FLAG_data && media_type == MediaType::DATA)
-        continue;
-      if (strlen(FLAG_ssrc) > 0) {
-        if (packet_ssrc != ssrc_filter)
-          continue;
-      }
-
-      rtp_writer->WritePacket(&packet);
-      rtcp_counter++;
-    }
+  webrtc::RtcEventProcessor event_processor;
+  for (const auto& stream : parsed_stream.incoming_rtp_packets_by_ssrc()) {
+    MediaType media_type =
+        parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket);
+    if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter))
+      continue;
+    auto rtp_view = absl::make_unique<
+        webrtc::ProcessableEventList<webrtc::LoggedRtpPacketIncoming>>(
+        stream.incoming_packets.begin(), stream.incoming_packets.end(),
+        handle_rtp);
+    event_processor.AddEvents(std::move(rtp_view));
   }
+  // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
+  // report blocks for many streams, thus several SSRCs and they don't
+  // necessarily have to be of the same media type. We therefore don't
+  // support filtering of RTCP based on SSRC and media type.
+  auto rtcp_view = absl::make_unique<
+      webrtc::ProcessableEventList<webrtc::LoggedRtcpPacketIncoming>>(
+      parsed_stream.incoming_rtcp_packets().begin(),
+      parsed_stream.incoming_rtcp_packets().end(), handle_rtcp);
+  event_processor.AddEvents(std::move(rtcp_view));
+
+  event_processor.ProcessEventsInOrder();
+
   std::cout << "Wrote " << rtp_counter << (header_only ? " header-only" : "")
             << " RTP packets and " << rtcp_counter << " RTCP packets to the "
             << "output file." << std::endl;
diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.cc b/logging/rtc_event_log/rtc_event_log_parser_new.cc
index 9fbf0d0..4a75dc3 100644
--- a/logging/rtc_event_log/rtc_event_log_parser_new.cc
+++ b/logging/rtc_event_log/rtc_event_log_parser_new.cc
@@ -210,33 +210,6 @@
   return IceCandidatePairEventType::kCheckSent;
 }
 
-// Return default values for header extensions, to use on streams without stored
-// mapping data. Currently this only applies to audio streams, since the mapping
-// is not stored in the event log.
-// TODO(ivoc): Remove this once this mapping is stored in the event log for
-//             audio streams. Tracking bug: webrtc:6399
-webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap() {
-  webrtc::RtpHeaderExtensionMap default_map;
-  default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
-  default_map.Register<TransmissionOffset>(
-      webrtc::RtpExtension::kTimestampOffsetDefaultId);
-  default_map.Register<AbsoluteSendTime>(
-      webrtc::RtpExtension::kAbsSendTimeDefaultId);
-  default_map.Register<VideoOrientation>(
-      webrtc::RtpExtension::kVideoRotationDefaultId);
-  default_map.Register<VideoContentTypeExtension>(
-      webrtc::RtpExtension::kVideoContentTypeDefaultId);
-  default_map.Register<VideoTimingExtension>(
-      webrtc::RtpExtension::kVideoTimingDefaultId);
-  default_map.Register<FrameMarkingExtension>(
-      webrtc::RtpExtension::kFrameMarkingDefaultId);
-  default_map.Register<TransportSequenceNumber>(
-      webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
-  default_map.Register<PlayoutDelayLimits>(
-      webrtc::RtpExtension::kPlayoutDelayDefaultId);
-  return default_map;
-}
-
 std::pair<uint64_t, bool> ParseVarInt(
     std::istream& stream) {  // no-presubmit-check TODO(webrtc:8982)
   uint64_t varint = 0;
@@ -328,6 +301,32 @@
 ParsedRtcEventLogNew::LoggedRtpStreamView::LoggedRtpStreamView(
     const LoggedRtpStreamView&) = default;
 
+// Return default values for header extensions, to use on streams without stored
+// mapping data. Currently this only applies to audio streams, since the mapping
+// is not stored in the event log.
+// TODO(ivoc): Remove this once this mapping is stored in the event log for
+//             audio streams. Tracking bug: webrtc:6399
+webrtc::RtpHeaderExtensionMap
+ParsedRtcEventLogNew::GetDefaultHeaderExtensionMap() {
+  webrtc::RtpHeaderExtensionMap default_map;
+  default_map.Register<AudioLevel>(webrtc::RtpExtension::kAudioLevelDefaultId);
+  default_map.Register<TransmissionOffset>(
+      webrtc::RtpExtension::kTimestampOffsetDefaultId);
+  default_map.Register<AbsoluteSendTime>(
+      webrtc::RtpExtension::kAbsSendTimeDefaultId);
+  default_map.Register<VideoOrientation>(
+      webrtc::RtpExtension::kVideoRotationDefaultId);
+  default_map.Register<VideoContentTypeExtension>(
+      webrtc::RtpExtension::kVideoContentTypeDefaultId);
+  default_map.Register<VideoTimingExtension>(
+      webrtc::RtpExtension::kVideoTimingDefaultId);
+  default_map.Register<TransportSequenceNumber>(
+      webrtc::RtpExtension::kTransportSequenceNumberDefaultId);
+  default_map.Register<PlayoutDelayLimits>(
+      webrtc::RtpExtension::kPlayoutDelayDefaultId);
+  return default_map;
+}
+
 ParsedRtcEventLogNew::ParsedRtcEventLogNew(
     UnconfiguredHeaderExtensions parse_unconfigured_header_extensions)
     : parse_unconfigured_header_extensions_(
diff --git a/logging/rtc_event_log/rtc_event_log_parser_new.h b/logging/rtc_event_log/rtc_event_log_parser_new.h
index bf35ae9..8f7cc8f 100644
--- a/logging/rtc_event_log/rtc_event_log_parser_new.h
+++ b/logging/rtc_event_log/rtc_event_log_parser_new.h
@@ -49,6 +49,11 @@
 enum class BandwidthUsage;
 struct AudioEncoderRuntimeConfig;
 
+// The different event types are deliberately POD. Analysis of large logs is
+// already resource intensive. The code simplifications that would be possible
+// possible by having a base class (containing e.g. the log time) are not
+// considered to outweigh the added memory and runtime overhead incurred by
+// adding a vptr.
 struct LoggedAlrStateEvent {
   int64_t timestamp_us;
   bool in_alr;
@@ -244,7 +249,7 @@
 };
 
 struct LoggedStartEvent {
-  explicit LoggedStartEvent(uint64_t timestamp_us)
+  explicit LoggedStartEvent(int64_t timestamp_us)
       : timestamp_us(timestamp_us) {}
   int64_t timestamp_us;
   int64_t log_time_us() const { return timestamp_us; }
@@ -252,8 +257,7 @@
 };
 
 struct LoggedStopEvent {
-  explicit LoggedStopEvent(uint64_t timestamp_us)
-      : timestamp_us(timestamp_us) {}
+  explicit LoggedStopEvent(int64_t timestamp_us) : timestamp_us(timestamp_us) {}
   int64_t timestamp_us;
   int64_t log_time_us() const { return timestamp_us; }
   int64_t log_time_ms() const { return timestamp_us / 1000; }
@@ -531,6 +535,8 @@
     PacketView<const LoggedRtpPacket> packet_view;
   };
 
+  static webrtc::RtpHeaderExtensionMap GetDefaultHeaderExtensionMap();
+
   explicit ParsedRtcEventLogNew(
       UnconfiguredHeaderExtensions parse_unconfigured_header_extensions =
           UnconfiguredHeaderExtensions::kDontParse);
diff --git a/logging/rtc_event_log/rtc_event_processor.h b/logging/rtc_event_log/rtc_event_processor.h
new file mode 100644
index 0000000..8bd81d3
--- /dev/null
+++ b/logging/rtc_event_log/rtc_event_processor.h
@@ -0,0 +1,137 @@
+/*
+ *  Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
+#define LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/function_view.h"
+
+namespace webrtc {
+
+// This file contains helper class used to process the elements of two or more
+// sorted lists in timestamp order. The effect is the same as doing a merge step
+// in the merge-sort algorithm but without copying the elements or modifying the
+// lists.
+
+// Interface to allow "merging" lists of different types. ProcessNext()
+// processes the next unprocesses element in the list. IsEmpty() checks if all
+// elements have been processed. GetNextTime returns the timestamp of the next
+// unprocessed element.
+class ProcessableEventListInterface {
+ public:
+  virtual ~ProcessableEventListInterface() = default;
+  virtual void ProcessNext() = 0;
+  virtual bool IsEmpty() const = 0;
+  virtual int64_t GetNextTime() const = 0;
+};
+
+// ProcessableEventList encapsulates a list of events and a function that will
+// be applied to each element of the list.
+template <typename T>
+class ProcessableEventList : public ProcessableEventListInterface {
+ public:
+  // N.B. |f| is not owned by ProcessableEventList. The caller must ensure that
+  // the function object or lambda outlives ProcessableEventList and
+  // RtcEventProcessor. The same thing applies to the iterators (begin, end);
+  // the vector must outlive ProcessableEventList and must not be modified until
+  // processing has finished.
+  ProcessableEventList(typename std::vector<T>::const_iterator begin,
+                       typename std::vector<T>::const_iterator end,
+                       rtc::FunctionView<void(const T&)> f)
+      : begin_(begin), end_(end), f_(f) {}
+
+  void ProcessNext() override {
+    RTC_DCHECK(!IsEmpty());
+    f_(*begin_);
+    ++begin_;
+  }
+
+  bool IsEmpty() const override { return begin_ == end_; }
+
+  int64_t GetNextTime() const override {
+    RTC_DCHECK(!IsEmpty());
+    return begin_->log_time_us();
+  }
+
+ private:
+  typename std::vector<T>::const_iterator begin_;
+  typename std::vector<T>::const_iterator end_;
+  rtc::FunctionView<void(const T&)> f_;
+};
+
+// Helper class used to "merge" two or more lists of ordered RtcEventLog events
+// so that they can be treated as a single ordered list. Since the individual
+// lists may have different types, we need to access the lists via pointers to
+// the common base class.
+//
+// Usage example:
+// ParsedRtcEventLogNew log;
+// auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... };
+// auto incoming_rtcp =
+//     absl::make_unique<ProcessableEventList<LoggedRtcpPacketIncoming>>(
+//         log.incoming_rtcp_packets().begin(),
+//         log.incoming_rtcp_packets().end(),
+//         incoming_handler);
+// auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... };
+// auto outgoing_rtcp =
+//     absl::make_unique<ProcessableEventList<LoggedRtcpPacketOutgoing>>(
+//         log.outgoing_rtcp_packets().begin(),
+//         log.outgoing_rtcp_packets().end(),
+//         outgoing_handler);
+//
+// RtcEventProcessor processor;
+// processor.AddEvents(std::move(incoming_rtcp));
+// processor.AddEvents(std::move(outgoing_rtcp));
+// processor.ProcessEventsInOrder();
+class RtcEventProcessor {
+ public:
+  // The elements of each list is processed in the index order. To process all
+  // elements in all lists in timestamp order, each lists need to be sorted in
+  // timestamp order prior to insertion. Otherwise,
+  void AddEvents(std::unique_ptr<ProcessableEventListInterface> events) {
+    if (!events->IsEmpty()) {
+      event_lists_.push_back(std::move(events));
+      std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+    }
+  }
+
+  void ProcessEventsInOrder() {
+    // |event_lists_| is a min-heap of lists ordered by the timestamp of the
+    // first element in the list. We therefore process the first element of the
+    // first list, then reinsert the remainder of that list into the heap
+    // if the list still contains unprocessed elements.
+    while (!event_lists_.empty()) {
+      event_lists_.front()->ProcessNext();
+      std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+      if (event_lists_.back()->IsEmpty()) {
+        event_lists_.pop_back();
+      } else {
+        std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+      }
+    }
+  }
+
+ private:
+  using ListPtrType = std::unique_ptr<ProcessableEventListInterface>;
+  std::vector<ListPtrType> event_lists_;
+  // Comparison function to make |event_lists_| into a min heap.
+  static bool Cmp(const ListPtrType& a, const ListPtrType& b) {
+    return a->GetNextTime() > b->GetNextTime();
+  }
+};
+
+}  // namespace webrtc
+
+#endif  // LOGGING_RTC_EVENT_LOG_RTC_EVENT_PROCESSOR_H_
diff --git a/logging/rtc_event_log/rtc_event_processor_unittest.cc b/logging/rtc_event_log/rtc_event_processor_unittest.cc
new file mode 100644
index 0000000..f2764e7
--- /dev/null
+++ b/logging/rtc_event_log/rtc_event_processor_unittest.cc
@@ -0,0 +1,171 @@
+/*
+ *  Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "logging/rtc_event_log/rtc_event_processor.h"
+
+#include <initializer_list>
+#include <numeric>
+
+#include "absl/memory/memory.h"
+#include "logging/rtc_event_log/rtc_event_log_parser_new.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/random.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+
+namespace {
+std::vector<LoggedStartEvent> CreateEventList(
+    std::initializer_list<int64_t> timestamp_list) {
+  std::vector<LoggedStartEvent> v;
+  for (int64_t timestamp_ms : timestamp_list) {
+    v.emplace_back(timestamp_ms * 1000);  // Convert ms to us.
+  }
+  return v;
+}
+
+using OrderedEventView = ProcessableEventList<LoggedStartEvent>;
+
+std::vector<std::vector<LoggedStartEvent>>
+CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) {
+  Random prng(seed);
+  std::vector<std::vector<LoggedStartEvent>> lists(num_lists);
+  for (size_t elem = 0; elem < num_elements; elem++) {
+    uint32_t i = prng.Rand(0u, num_lists - 1);
+    int64_t timestamp_ms = elem;
+    lists[i].emplace_back(timestamp_ms * 1000);
+  }
+  return lists;
+}
+}  // namespace
+
+TEST(RtcEventProcessor, NoList) {
+  RtcEventProcessor processor;
+  processor.ProcessEventsInOrder();  // Don't crash but do nothing.
+}
+
+TEST(RtcEventProcessor, EmptyList) {
+  auto not_called = [](LoggedStartEvent /*elem*/) { EXPECT_TRUE(false); };
+  std::vector<LoggedStartEvent> events;
+  RtcEventProcessor processor;
+
+  processor.AddEvents(absl::make_unique<OrderedEventView>(
+      events.begin(), events.end(), not_called));
+  processor.ProcessEventsInOrder();  // Don't crash but do nothing.
+}
+
+TEST(RtcEventProcessor, OneList) {
+  std::vector<LoggedStartEvent> result;
+  auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+  std::vector<LoggedStartEvent> events(CreateEventList({1, 2, 3, 4}));
+  RtcEventProcessor processor;
+  processor.AddEvents(
+      absl::make_unique<OrderedEventView>(events.begin(), events.end(), f));
+  processor.ProcessEventsInOrder();
+
+  std::vector<int64_t> expected_results{1, 2, 3, 4};
+  ASSERT_EQ(result.size(), expected_results.size());
+  for (size_t i = 0; i < expected_results.size(); i++) {
+    EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+  }
+}
+
+TEST(RtcEventProcessor, MergeTwoLists) {
+  std::vector<LoggedStartEvent> result;
+  auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+  std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 4, 7, 8, 9}));
+  std::vector<LoggedStartEvent> events2(CreateEventList({3, 5, 6, 10}));
+  RtcEventProcessor processor;
+  processor.AddEvents(
+      absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
+  processor.AddEvents(
+      absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+  processor.ProcessEventsInOrder();
+
+  std::vector<int64_t> expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+  ASSERT_EQ(result.size(), expected_results.size());
+  for (size_t i = 0; i < expected_results.size(); i++) {
+    EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+  }
+}
+
+TEST(RtcEventProcessor, MergeTwoListsWithDuplicatedElements) {
+  std::vector<LoggedStartEvent> result;
+  auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+  std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 2, 3, 5, 5}));
+  std::vector<LoggedStartEvent> events2(CreateEventList({1, 3, 4, 4}));
+  RtcEventProcessor processor;
+  processor.AddEvents(
+      absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
+  processor.AddEvents(
+      absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+  processor.ProcessEventsInOrder();
+
+  std::vector<int64_t> expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
+  ASSERT_EQ(result.size(), expected_results.size());
+  for (size_t i = 0; i < expected_results.size(); i++) {
+    EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+  }
+}
+
+TEST(RtcEventProcessor, MergeManyLists) {
+  std::vector<LoggedStartEvent> result;
+  auto f = [&result](LoggedStartEvent elem) { result.push_back(elem); };
+
+  constexpr size_t kNumLists = 5;
+  constexpr size_t kNumElems = 30;
+  constexpr uint64_t kSeed = 0xF3C6B91F;
+  std::vector<std::vector<LoggedStartEvent>> lists(
+      CreateRandomEventLists(kNumLists, kNumElems, kSeed));
+  RTC_DCHECK_EQ(lists.size(), kNumLists);
+  RtcEventProcessor processor;
+  for (const auto& list : lists) {
+    processor.AddEvents(
+        absl::make_unique<OrderedEventView>(list.begin(), list.end(), f));
+  }
+  processor.ProcessEventsInOrder();
+
+  std::vector<int64_t> expected_results(kNumElems);
+  std::iota(expected_results.begin(), expected_results.end(), 0);
+  ASSERT_EQ(result.size(), expected_results.size());
+  for (size_t i = 0; i < expected_results.size(); i++) {
+    EXPECT_EQ(result[i].log_time_ms(), expected_results[i]);
+  }
+}
+
+TEST(RtcEventProcessor, DifferentTypes) {
+  std::vector<int64_t> result;
+  auto f1 = [&result](LoggedStartEvent elem) {
+    result.push_back(elem.log_time_ms());
+  };
+  auto f2 = [&result](LoggedStopEvent elem) {
+    result.push_back(elem.log_time_ms());
+  };
+
+  std::vector<LoggedStartEvent> events1{LoggedStartEvent(2000)};
+  std::vector<LoggedStopEvent> events2{LoggedStopEvent(1000)};
+  RtcEventProcessor processor;
+  processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStartEvent>>(
+      events1.begin(), events1.end(), f1));
+  processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStopEvent>>(
+      events2.begin(), events2.end(), f2));
+  processor.ProcessEventsInOrder();
+
+  std::vector<int64_t> expected_results{1, 2};
+  ASSERT_EQ(result.size(), expected_results.size());
+  for (size_t i = 0; i < expected_results.size(); i++) {
+    EXPECT_EQ(result[i], expected_results[i]);
+  }
+}
+
+}  // namespace webrtc