Simplifies RtcEventProcessor interface.

Bug: webrtc:10170
Change-Id: Ie643e47c55b8c35ca9b8ef31eda5b1673f19d7b3
Reviewed-on: https://webrtc-review.googlesource.com/c/116066
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Oskar Sundbom <ossu@webrtc.org>
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26160}
diff --git a/logging/BUILD.gn b/logging/BUILD.gn
index 18189d5..d450f62 100644
--- a/logging/BUILD.gn
+++ b/logging/BUILD.gn
@@ -293,6 +293,7 @@
       "rtc_event_log/logged_events.h",
       "rtc_event_log/rtc_event_log_parser.cc",
       "rtc_event_log/rtc_event_log_parser.h",
+      "rtc_event_log/rtc_event_processor.cc",
       "rtc_event_log/rtc_event_processor.h",
     ]
 
diff --git a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
index c446212..62d6bfc 100644
--- a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
+++ b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc
@@ -236,21 +236,13 @@
         parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket);
     if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter))
       continue;
-    auto rtp_view = absl::make_unique<
-        webrtc::ProcessableEventList<webrtc::LoggedRtpPacketIncoming>>(
-        stream.incoming_packets.begin(), stream.incoming_packets.end(),
-        handle_rtp);
-    event_processor.AddEvents(std::move(rtp_view));
+    event_processor.AddEvents(stream.incoming_packets, handle_rtp);
   }
   // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain
   // report blocks for many streams, thus several SSRCs and they don't
   // necessarily have to be of the same media type. We therefore don't
   // support filtering of RTCP based on SSRC and media type.
-  auto rtcp_view = absl::make_unique<
-      webrtc::ProcessableEventList<webrtc::LoggedRtcpPacketIncoming>>(
-      parsed_stream.incoming_rtcp_packets().begin(),
-      parsed_stream.incoming_rtcp_packets().end(), handle_rtcp);
-  event_processor.AddEvents(std::move(rtcp_view));
+  event_processor.AddEvents(parsed_stream.incoming_rtcp_packets(), handle_rtcp);
 
   event_processor.ProcessEventsInOrder();
 
diff --git a/logging/rtc_event_log/rtc_event_log_parser.h b/logging/rtc_event_log/rtc_event_log_parser.h
index f7e896d..dc23944 100644
--- a/logging/rtc_event_log/rtc_event_log_parser.h
+++ b/logging/rtc_event_log/rtc_event_log_parser.h
@@ -105,6 +105,9 @@
   T& operator*() { return *reinterpret_cast<T*>(ptr_); }
   const T& operator*() const { return *reinterpret_cast<const T*>(ptr_); }
 
+  T* operator->() { return reinterpret_cast<T*>(ptr_); }
+  const T* operator->() const { return reinterpret_cast<const T*>(ptr_); }
+
  private:
   PacketIterator(typename std::conditional<std::is_const<T>::value,
                                            const void*,
@@ -155,6 +158,10 @@
     return PacketView(ptr, num_elements, offset, sizeof(U));
   }
 
+  using value_type = T;
+  using reference = value_type&;
+  using const_reference = const value_type&;
+
   using iterator = PacketIterator<T>;
   using const_iterator = PacketIterator<const T>;
   using reverse_iterator = std::reverse_iterator<iterator>;
diff --git a/logging/rtc_event_log/rtc_event_processor.cc b/logging/rtc_event_log/rtc_event_processor.cc
new file mode 100644
index 0000000..804e2838
--- /dev/null
+++ b/logging/rtc_event_log/rtc_event_processor.cc
@@ -0,0 +1,41 @@
+/*
+ *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "logging/rtc_event_log/rtc_event_processor.h"
+
+namespace webrtc {
+
+RtcEventProcessor::RtcEventProcessor() = default;
+
+RtcEventProcessor::~RtcEventProcessor() = default;
+void RtcEventProcessor::ProcessEventsInOrder() {
+  // |event_lists_| is a min-heap of lists ordered by the timestamp of the
+  // first element in the list. We therefore process the first element of the
+  // first list, then reinsert the remainder of that list into the heap
+  // if the list still contains unprocessed elements.
+  while (!event_lists_.empty()) {
+    event_lists_.front()->ProcessNext();
+    std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+    if (event_lists_.back()->IsEmpty()) {
+      event_lists_.pop_back();
+    } else {
+      std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
+    }
+  }
+}
+
+bool RtcEventProcessor::Cmp(const RtcEventProcessor::ListPtrType& a,
+                            const RtcEventProcessor::ListPtrType& b) {
+  int64_t time_diff = a->GetNextTime() - b->GetNextTime();
+  if (time_diff == 0)
+    return a->GetTieBreaker() > b->GetTieBreaker();
+  return time_diff > 0;
+}
+
+}  // namespace webrtc
diff --git a/logging/rtc_event_log/rtc_event_processor.h b/logging/rtc_event_log/rtc_event_processor.h
index 9eee481..dbbdff6 100644
--- a/logging/rtc_event_log/rtc_event_processor.h
+++ b/logging/rtc_event_log/rtc_event_processor.h
@@ -17,6 +17,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/memory/memory.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/function_view.h"
 
@@ -27,6 +28,7 @@
 // in the merge-sort algorithm but without copying the elements or modifying the
 // lists.
 
+namespace event_processor_impl {
 // Interface to allow "merging" lists of different types. ProcessNext()
 // processes the next unprocesses element in the list. IsEmpty() checks if all
 // elements have been processed. GetNextTime returns the timestamp of the next
@@ -37,22 +39,19 @@
   virtual void ProcessNext() = 0;
   virtual bool IsEmpty() const = 0;
   virtual int64_t GetNextTime() const = 0;
+  virtual int GetTieBreaker() const = 0;
 };
 
 // ProcessableEventList encapsulates a list of events and a function that will
 // be applied to each element of the list.
-template <typename T>
+template <typename Iterator, typename T>
 class ProcessableEventList : public ProcessableEventListInterface {
  public:
-  // N.B. |f| is not owned by ProcessableEventList. The caller must ensure that
-  // the function object or lambda outlives ProcessableEventList and
-  // RtcEventProcessor. The same thing applies to the iterators (begin, end);
-  // the vector must outlive ProcessableEventList and must not be modified until
-  // processing has finished.
-  ProcessableEventList(typename std::vector<T>::const_iterator begin,
-                       typename std::vector<T>::const_iterator end,
-                       rtc::FunctionView<void(const T&)> f)
-      : begin_(begin), end_(end), f_(f) {}
+  ProcessableEventList(Iterator begin,
+                       Iterator end,
+                       std::function<void(const T&)> f,
+                       int tie_breaker)
+      : begin_(begin), end_(end), f_(f), tie_breaker_(tie_breaker) {}
 
   void ProcessNext() override {
     RTC_DCHECK(!IsEmpty());
@@ -66,12 +65,15 @@
     RTC_DCHECK(!IsEmpty());
     return begin_->log_time_us();
   }
+  int GetTieBreaker() const override { return tie_breaker_; }
 
  private:
-  typename std::vector<T>::const_iterator begin_;
-  typename std::vector<T>::const_iterator end_;
-  rtc::FunctionView<void(const T&)> f_;
+  Iterator begin_;
+  Iterator end_;
+  std::function<void(const T&)> f_;
+  int tie_breaker_;
 };
+}  // namespace event_processor_impl
 
 // Helper class used to "merge" two or more lists of ordered RtcEventLog events
 // so that they can be treated as a single ordered list. Since the individual
@@ -81,57 +83,47 @@
 // Usage example:
 // ParsedRtcEventLogNew log;
 // auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... };
-// auto incoming_rtcp =
-//     absl::make_unique<ProcessableEventList<LoggedRtcpPacketIncoming>>(
-//         log.incoming_rtcp_packets().begin(),
-//         log.incoming_rtcp_packets().end(),
-//         incoming_handler);
 // auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... };
-// auto outgoing_rtcp =
-//     absl::make_unique<ProcessableEventList<LoggedRtcpPacketOutgoing>>(
-//         log.outgoing_rtcp_packets().begin(),
-//         log.outgoing_rtcp_packets().end(),
-//         outgoing_handler);
 //
 // RtcEventProcessor processor;
-// processor.AddEvents(std::move(incoming_rtcp));
-// processor.AddEvents(std::move(outgoing_rtcp));
+// processor.AddEvents(log.incoming_rtcp_packets(),
+//                     incoming_handler);
+// processor.AddEvents(log.outgoing_rtcp_packets(),
+//                     outgoing_handler);
 // processor.ProcessEventsInOrder();
 class RtcEventProcessor {
  public:
+  RtcEventProcessor();
+  ~RtcEventProcessor();
   // The elements of each list is processed in the index order. To process all
-  // elements in all lists in timestamp order, each lists need to be sorted in
-  // timestamp order prior to insertion. Otherwise,
-  void AddEvents(std::unique_ptr<ProcessableEventListInterface> events) {
-    if (!events->IsEmpty()) {
-      event_lists_.push_back(std::move(events));
-      std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
-    }
+  // elements in all lists in timestamp order, each list needs to be sorted in
+  // timestamp order prior to insertion.
+  // N.B. |iterable| is not owned by RtcEventProcessor. The caller must ensure
+  // that the iterable outlives RtcEventProcessor and it must not be modified
+  // until processing has finished.
+  template <typename Iterable>
+  void AddEvents(
+      const Iterable& iterable,
+      std::function<void(const typename Iterable::value_type&)> handler) {
+    if (iterable.begin() == iterable.end())
+      return;
+    event_lists_.push_back(
+        absl::make_unique<event_processor_impl::ProcessableEventList<
+            typename Iterable::const_iterator, typename Iterable::value_type>>(
+            iterable.begin(), iterable.end(), handler,
+            insertion_order_index_++));
+    std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
   }
 
-  void ProcessEventsInOrder() {
-    // |event_lists_| is a min-heap of lists ordered by the timestamp of the
-    // first element in the list. We therefore process the first element of the
-    // first list, then reinsert the remainder of that list into the heap
-    // if the list still contains unprocessed elements.
-    while (!event_lists_.empty()) {
-      event_lists_.front()->ProcessNext();
-      std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp);
-      if (event_lists_.back()->IsEmpty()) {
-        event_lists_.pop_back();
-      } else {
-        std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp);
-      }
-    }
-  }
+  void ProcessEventsInOrder();
 
  private:
-  using ListPtrType = std::unique_ptr<ProcessableEventListInterface>;
+  using ListPtrType =
+      std::unique_ptr<event_processor_impl::ProcessableEventListInterface>;
+  int insertion_order_index_ = 0;
   std::vector<ListPtrType> event_lists_;
   // Comparison function to make |event_lists_| into a min heap.
-  static bool Cmp(const ListPtrType& a, const ListPtrType& b) {
-    return a->GetNextTime() > b->GetNextTime();
-  }
+  static bool Cmp(const ListPtrType& a, const ListPtrType& b);
 };
 
 }  // namespace webrtc
diff --git a/logging/rtc_event_log/rtc_event_processor_unittest.cc b/logging/rtc_event_log/rtc_event_processor_unittest.cc
index dac3bb6..9f33540 100644
--- a/logging/rtc_event_log/rtc_event_processor_unittest.cc
+++ b/logging/rtc_event_log/rtc_event_processor_unittest.cc
@@ -33,8 +33,6 @@
   return v;
 }
 
-using OrderedEventView = ProcessableEventList<LoggedStartEvent>;
-
 std::vector<std::vector<LoggedStartEvent>>
 CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) {
   Random prng(seed);
@@ -58,8 +56,7 @@
   std::vector<LoggedStartEvent> events;
   RtcEventProcessor processor;
 
-  processor.AddEvents(absl::make_unique<OrderedEventView>(
-      events.begin(), events.end(), not_called));
+  processor.AddEvents(events, not_called);
   processor.ProcessEventsInOrder();  // Don't crash but do nothing.
 }
 
@@ -69,8 +66,7 @@
 
   std::vector<LoggedStartEvent> events(CreateEventList({1, 2, 3, 4}));
   RtcEventProcessor processor;
-  processor.AddEvents(
-      absl::make_unique<OrderedEventView>(events.begin(), events.end(), f));
+  processor.AddEvents(events, f);
   processor.ProcessEventsInOrder();
 
   std::vector<int64_t> expected_results{1, 2, 3, 4};
@@ -87,10 +83,8 @@
   std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 4, 7, 8, 9}));
   std::vector<LoggedStartEvent> events2(CreateEventList({3, 5, 6, 10}));
   RtcEventProcessor processor;
-  processor.AddEvents(
-      absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
-  processor.AddEvents(
-      absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+  processor.AddEvents(events1, f);
+  processor.AddEvents(events2, f);
   processor.ProcessEventsInOrder();
 
   std::vector<int64_t> expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@@ -107,10 +101,8 @@
   std::vector<LoggedStartEvent> events1(CreateEventList({1, 2, 2, 3, 5, 5}));
   std::vector<LoggedStartEvent> events2(CreateEventList({1, 3, 4, 4}));
   RtcEventProcessor processor;
-  processor.AddEvents(
-      absl::make_unique<OrderedEventView>(events1.begin(), events1.end(), f));
-  processor.AddEvents(
-      absl::make_unique<OrderedEventView>(events2.begin(), events2.end(), f));
+  processor.AddEvents(events1, f);
+  processor.AddEvents(events2, f);
   processor.ProcessEventsInOrder();
 
   std::vector<int64_t> expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5};
@@ -132,8 +124,7 @@
   RTC_DCHECK_EQ(lists.size(), kNumLists);
   RtcEventProcessor processor;
   for (const auto& list : lists) {
-    processor.AddEvents(
-        absl::make_unique<OrderedEventView>(list.begin(), list.end(), f));
+    processor.AddEvents(list, f);
   }
   processor.ProcessEventsInOrder();
 
@@ -157,10 +148,8 @@
   std::vector<LoggedStartEvent> events1{LoggedStartEvent(2000)};
   std::vector<LoggedStopEvent> events2{LoggedStopEvent(1000)};
   RtcEventProcessor processor;
-  processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStartEvent>>(
-      events1.begin(), events1.end(), f1));
-  processor.AddEvents(absl::make_unique<ProcessableEventList<LoggedStopEvent>>(
-      events2.begin(), events2.end(), f2));
+  processor.AddEvents(events1, f1);
+  processor.AddEvents(events2, f2);
   processor.ProcessEventsInOrder();
 
   std::vector<int64_t> expected_results{1, 2};
diff --git a/modules/audio_coding/neteq/tools/rtc_event_log_source.cc b/modules/audio_coding/neteq/tools/rtc_event_log_source.cc
index 446a1a9..d72f9cb 100644
--- a/modules/audio_coding/neteq/tools/rtc_event_log_source.cc
+++ b/modules/audio_coding/neteq/tools/rtc_event_log_source.cc
@@ -109,21 +109,13 @@
     if (ShouldSkipStream(media_type, rtp_packets.ssrc, ssrc_filter)) {
       continue;
     }
-    auto rtp_view = absl::make_unique<
-        webrtc::ProcessableEventList<webrtc::LoggedRtpPacketIncoming>>(
-        rtp_packets.incoming_packets.begin(),
-        rtp_packets.incoming_packets.end(), handle_rtp_packet);
-    event_processor.AddEvents(std::move(rtp_view));
+    event_processor.AddEvents(rtp_packets.incoming_packets, handle_rtp_packet);
   }
 
   for (const auto& audio_playouts : parsed_log.audio_playout_events()) {
     if (ssrc_filter.has_value() && audio_playouts.first != *ssrc_filter)
       continue;
-    auto audio_view = absl::make_unique<
-        webrtc::ProcessableEventList<webrtc::LoggedAudioPlayoutEvent>>(
-        audio_playouts.second.begin(), audio_playouts.second.end(),
-        handle_audio_playout);
-    event_processor.AddEvents(std::move(audio_view));
+    event_processor.AddEvents(audio_playouts.second, handle_audio_playout);
   }
 
   // Fills in rtp_packets_ and audio_outputs_.