Remove the PendingTaskSafetyFlag::Pointer type add ScopedTaskSafety.

ScopedTaskSafety simplifies usage of PendingTaskSafetyFlag,
so this CL also includes ToQueuedTask support for ScopedTaskSafety
and test updates.

This is following up on feedback in the following CL:
https://webrtc-review.googlesource.com/c/src/+/174262

Change-Id: Idd38dfc1914b24a05fdc4ad256b409dcf1795fc0
Bug: none
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174740
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31241}
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
index 307d2d5..4be2131 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.cc
+++ b/rtc_base/task_utils/pending_task_safety_flag.cc
@@ -15,7 +15,7 @@
 namespace webrtc {
 
 // static
-PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
+rtc::scoped_refptr<PendingTaskSafetyFlag> PendingTaskSafetyFlag::Create() {
   return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
 }
 
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
index 1b301c8..580fb3f 100644
--- a/rtc_base/task_utils/pending_task_safety_flag.h
+++ b/rtc_base/task_utils/pending_task_safety_flag.h
@@ -36,12 +36,17 @@
 //          MyMethod();
 //        }));
 //
+// Or implicitly by letting ToQueuedTask do the checking:
+//
+//    // Running outside of the main thread.
+//    my_task_queue_->PostTask(ToQueuedTask(pending_task_safety_flag_,
+//        [this]() { MyMethod(); }));
+//
 // Note that checking the state only works on the construction/destruction
 // thread of the ReceiveStatisticsProxy instance.
 class PendingTaskSafetyFlag : public rtc::RefCountInterface {
  public:
-  using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
-  static Pointer Create();
+  static rtc::scoped_refptr<PendingTaskSafetyFlag> Create();
 
   ~PendingTaskSafetyFlag() = default;
 
@@ -56,6 +61,25 @@
   SequenceChecker main_sequence_;
 };
 
+// Makes using PendingTaskSafetyFlag very simple. Automatic PTSF creation
+// and signalling of destruction when the ScopedTaskSafety instance goes out
+// of scope.
+// Should be used by the class that wants tasks dropped after destruction.
+// Requirements are that the instance be constructed and destructed on
+// the same thread as the potentially dropped tasks would be running on.
+class ScopedTaskSafety {
+ public:
+  ScopedTaskSafety() = default;
+  ~ScopedTaskSafety() { flag_->SetNotAlive(); }
+
+  // Returns a new reference to the safety flag.
+  rtc::scoped_refptr<PendingTaskSafetyFlag> flag() const { return flag_; }
+
+ private:
+  rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
+      PendingTaskSafetyFlag::Create();
+};
+
 }  // namespace webrtc
 
 #endif  // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
index 0c1c3c8..6df2fe2 100644
--- a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
+++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
@@ -29,7 +29,7 @@
 }  // namespace
 
 TEST(PendingTaskSafetyFlagTest, Basic) {
-  PendingTaskSafetyFlag::Pointer safety_flag;
+  rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
   {
     // Scope for the |owner| instance.
     class Owner {
@@ -37,12 +37,27 @@
       Owner() = default;
       ~Owner() { flag_->SetNotAlive(); }
 
-      PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+      rtc::scoped_refptr<PendingTaskSafetyFlag> flag_ =
+          PendingTaskSafetyFlag::Create();
     } owner;
     EXPECT_TRUE(owner.flag_->alive());
     safety_flag = owner.flag_;
     EXPECT_TRUE(safety_flag->alive());
   }
+  // |owner| now out of scope.
+  EXPECT_FALSE(safety_flag->alive());
+}
+
+TEST(PendingTaskSafetyFlagTest, BasicScoped) {
+  rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag;
+  {
+    struct Owner {
+      ScopedTaskSafety safety;
+    } owner;
+    safety_flag = owner.safety.flag();
+    EXPECT_TRUE(safety_flag->alive());
+  }
+  // |owner| now out of scope.
   EXPECT_FALSE(safety_flag->alive());
 }
 
@@ -72,7 +87,8 @@
    private:
     TaskQueueBase* const tq_main_;
     bool stuff_done_ = false;
-    PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+    rtc::scoped_refptr<PendingTaskSafetyFlag> flag_{
+        PendingTaskSafetyFlag::Create()};
   };
 
   std::unique_ptr<Owner> owner;
@@ -106,22 +122,18 @@
     }
     ~Owner() {
       RTC_DCHECK(tq_main_->IsCurrent());
-      flag_->SetNotAlive();
     }
 
     void DoStuff() {
       RTC_DCHECK(!tq_main_->IsCurrent());
-      tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
-        if (!safe->alive())
-          return;
-        *stuff_done_ = true;
-      }));
+      tq_main_->PostTask(
+          ToQueuedTask(safety_, [this]() { *stuff_done_ = true; }));
     }
 
    private:
     TaskQueueBase* const tq_main_;
     bool* const stuff_done_;
-    PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+    ScopedTaskSafety safety_;
   };
 
   std::unique_ptr<Owner> owner;
diff --git a/rtc_base/task_utils/to_queued_task.h b/rtc_base/task_utils/to_queued_task.h
index cc9325e..07ab0eb 100644
--- a/rtc_base/task_utils/to_queued_task.h
+++ b/rtc_base/task_utils/to_queued_task.h
@@ -39,7 +39,7 @@
 template <typename Closure>
 class SafetyClosureTask : public QueuedTask {
  public:
-  explicit SafetyClosureTask(PendingTaskSafetyFlag::Pointer safety,
+  explicit SafetyClosureTask(rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
                              Closure&& closure)
       : closure_(std::forward<Closure>(closure)),
         safety_flag_(std::move(safety)) {}
@@ -52,7 +52,7 @@
   }
 
   typename std::decay<Closure>::type closure_;
-  PendingTaskSafetyFlag::Pointer safety_flag_;
+  rtc::scoped_refptr<PendingTaskSafetyFlag> safety_flag_;
 };
 
 // Extends ClosureTask to also allow specifying cleanup code.
@@ -81,13 +81,25 @@
 }
 
 template <typename Closure>
-std::unique_ptr<QueuedTask> ToQueuedTask(PendingTaskSafetyFlag::Pointer safety,
-                                         Closure&& closure) {
+std::unique_ptr<QueuedTask> ToQueuedTask(
+    rtc::scoped_refptr<PendingTaskSafetyFlag> safety,
+    Closure&& closure) {
   return std::make_unique<webrtc_new_closure_impl::SafetyClosureTask<Closure>>(
       std::move(safety), std::forward<Closure>(closure));
 }
 
-template <typename Closure, typename Cleanup>
+template <typename Closure>
+std::unique_ptr<QueuedTask> ToQueuedTask(const ScopedTaskSafety& safety,
+                                         Closure&& closure) {
+  return ToQueuedTask(safety.flag(), std::forward<Closure>(closure));
+}
+
+template <typename Closure,
+          typename Cleanup,
+          typename std::enable_if<!std::is_same<
+              typename std::remove_const<
+                  typename std::remove_reference<Closure>::type>::type,
+              ScopedTaskSafety>::value>::type* = nullptr>
 std::unique_ptr<QueuedTask> ToQueuedTask(Closure&& closure, Cleanup&& cleanup) {
   return std::make_unique<
       webrtc_new_closure_impl::ClosureTaskWithCleanup<Closure, Cleanup>>(
diff --git a/rtc_base/task_utils/to_queued_task_unittest.cc b/rtc_base/task_utils/to_queued_task_unittest.cc
index e98c81e..261b9e8 100644
--- a/rtc_base/task_utils/to_queued_task_unittest.cc
+++ b/rtc_base/task_utils/to_queued_task_unittest.cc
@@ -127,7 +127,8 @@
 }
 
 TEST(ToQueuedTaskTest, PendingTaskSafetyFlag) {
-  PendingTaskSafetyFlag::Pointer flag(PendingTaskSafetyFlag::Create());
+  rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
+      PendingTaskSafetyFlag::Create();
 
   int count = 0;
   // Create two identical tasks that increment the |count|.
diff --git a/video/call_stats2.cc b/video/call_stats2.cc
index af0da0f..ce68127 100644
--- a/video/call_stats2.cc
+++ b/video/call_stats2.cc
@@ -76,16 +76,13 @@
   RTC_DCHECK(task_queue_);
   process_thread_checker_.Detach();
   task_queue_->PostDelayedTask(
-      ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }),
-      kUpdateIntervalMs);
+      ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs);
 }
 
 CallStats::~CallStats() {
   RTC_DCHECK_RUN_ON(&construction_thread_checker_);
   RTC_DCHECK(observers_.empty());
 
-  task_safety_flag_->SetNotAlive();
-
   UpdateHistograms();
 }
 
@@ -98,7 +95,7 @@
       last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();
 
   task_queue_->PostDelayedTask(
-      ToQueuedTask(task_safety_flag_, [this]() { RunTimer(); }), interval);
+      ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval);
 }
 
 void CallStats::UpdateAndReport() {
@@ -156,7 +153,7 @@
   RTC_DCHECK_RUN_ON(&process_thread_checker_);
 
   int64_t now_ms = clock_->TimeInMilliseconds();
-  task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this, rtt, now_ms]() {
+  task_queue_->PostTask(ToQueuedTask(task_safety_, [this, rtt, now_ms]() {
     RTC_DCHECK_RUN_ON(&construction_thread_checker_);
     reports_.push_back(RttTime(rtt, now_ms));
     if (time_of_first_rtt_ms_ == -1)
diff --git a/video/call_stats2.h b/video/call_stats2.h
index f06d33d..49d2db7 100644
--- a/video/call_stats2.h
+++ b/video/call_stats2.h
@@ -139,8 +139,7 @@
   TaskQueueBase* const task_queue_;
 
   // Used to signal destruction to potentially pending tasks.
-  PendingTaskSafetyFlag::Pointer task_safety_flag_ =
-      PendingTaskSafetyFlag::Create();
+  ScopedTaskSafety task_safety_;
 
   RTC_DISALLOW_COPY_AND_ASSIGN(CallStats);
 };
diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc
index b818eae..79684f2 100644
--- a/video/receive_statistics_proxy2.cc
+++ b/video/receive_statistics_proxy2.cc
@@ -129,7 +129,6 @@
 
 ReceiveStatisticsProxy::~ReceiveStatisticsProxy() {
   RTC_DCHECK_RUN_ON(&main_thread_);
-  task_safety_flag_->SetNotAlive();
 }
 
 void ReceiveStatisticsProxy::UpdateHistograms(
@@ -689,18 +688,17 @@
 
 void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  worker_thread_->PostTask(
-      ToQueuedTask(task_safety_flag_, [payload_type, this]() {
-        RTC_DCHECK_RUN_ON(&main_thread_);
-        stats_.current_payload_type = payload_type;
-      }));
+  worker_thread_->PostTask(ToQueuedTask(task_safety_, [payload_type, this]() {
+    RTC_DCHECK_RUN_ON(&main_thread_);
+    stats_.current_payload_type = payload_type;
+  }));
 }
 
 void ReceiveStatisticsProxy::OnDecoderImplementationName(
     const char* implementation_name) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
   worker_thread_->PostTask(ToQueuedTask(
-      task_safety_flag_, [name = std::string(implementation_name), this]() {
+      task_safety_, [name = std::string(implementation_name), this]() {
         RTC_DCHECK_RUN_ON(&main_thread_);
         stats_.decoder_implementation_name = name;
       }));
@@ -715,7 +713,7 @@
     int render_delay_ms) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
   worker_thread_->PostTask(ToQueuedTask(
-      task_safety_flag_,
+      task_safety_,
       [max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms,
        min_playout_delay_ms, render_delay_ms, this]() {
         RTC_DCHECK_RUN_ON(&main_thread_);
@@ -742,7 +740,7 @@
 void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
     const TimingFrameInfo& info) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [info, this]() {
+  worker_thread_->PostTask(ToQueuedTask(task_safety_, [info, this]() {
     RTC_DCHECK_RUN_ON(&main_thread_);
     if (info.flags != VideoSendTiming::kInvalid) {
       int64_t now_ms = clock_->TimeInMilliseconds();
@@ -777,11 +775,11 @@
     // [main] worker thread.
     // So until the sender implementation has been updated, we work around this
     // here by posting the update to the expected thread. We make a by value
-    // copy of the |task_safety_flag_| to handle the case if the queued task
+    // copy of the |task_safety_| to handle the case if the queued task
     // runs after the |ReceiveStatisticsProxy| has been deleted. In such a
     // case the packet_counter update won't be recorded.
     worker_thread_->PostTask(
-        ToQueuedTask(task_safety_flag_, [ssrc, packet_counter, this]() {
+        ToQueuedTask(task_safety_, [ssrc, packet_counter, this]() {
           RtcpPacketTypesCounterUpdated(ssrc, packet_counter);
         }));
     return;
@@ -810,7 +808,7 @@
   // "com.apple.coremedia.decompressionsession.clientcallback"
   VideoFrameMetaData meta(frame, clock_->CurrentTime());
   worker_thread_->PostTask(ToQueuedTask(
-      task_safety_flag_, [meta, qp, decode_time_ms, content_type, this]() {
+      task_safety_, [meta, qp, decode_time_ms, content_type, this]() {
         OnDecodedFrame(meta, qp, decode_time_ms, content_type);
       }));
 }
@@ -936,8 +934,8 @@
   RTC_DCHECK_RUN_ON(&incoming_render_queue_);
   int64_t now_ms = clock_->TimeInMilliseconds();
   worker_thread_->PostTask(
-      ToQueuedTask(task_safety_flag_, [video_playout_ntp_ms, sync_offset_ms,
-                                       estimated_freq_khz, now_ms, this]() {
+      ToQueuedTask(task_safety_, [video_playout_ntp_ms, sync_offset_ms,
+                                  estimated_freq_khz, now_ms, this]() {
         RTC_DCHECK_RUN_ON(&main_thread_);
         sync_offset_counter_.Add(std::abs(sync_offset_ms));
         stats_.sync_offset_ms = sync_offset_ms;
@@ -990,24 +988,22 @@
 
 void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  worker_thread_->PostTask(
-      ToQueuedTask(task_safety_flag_, [frames_dropped, this]() {
-        RTC_DCHECK_RUN_ON(&main_thread_);
-        stats_.frames_dropped += frames_dropped;
-      }));
+  worker_thread_->PostTask(ToQueuedTask(task_safety_, [frames_dropped, this]() {
+    RTC_DCHECK_RUN_ON(&main_thread_);
+    stats_.frames_dropped += frames_dropped;
+  }));
 }
 
 void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  worker_thread_->PostTask(
-      ToQueuedTask(task_safety_flag_, [codec_type, qp, this]() {
-        RTC_DCHECK_RUN_ON(&main_thread_);
-        last_codec_type_ = codec_type;
-        if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
-          qp_counters_.vp8.Add(qp);
-          qp_sample_.Add(qp);
-        }
-      }));
+  worker_thread_->PostTask(ToQueuedTask(task_safety_, [codec_type, qp, this]() {
+    RTC_DCHECK_RUN_ON(&main_thread_);
+    last_codec_type_ = codec_type;
+    if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
+      qp_counters_.vp8.Add(qp);
+      qp_sample_.Add(qp);
+    }
+  }));
 }
 
 void ReceiveStatisticsProxy::OnStreamInactive() {
diff --git a/video/receive_statistics_proxy2.h b/video/receive_statistics_proxy2.h
index d6f6f1c..1357c40 100644
--- a/video/receive_statistics_proxy2.h
+++ b/video/receive_statistics_proxy2.h
@@ -211,8 +211,7 @@
   // methods are invoked on such as GetStats().
   TaskQueueBase* const worker_thread_;
 
-  PendingTaskSafetyFlag::Pointer task_safety_flag_ =
-      PendingTaskSafetyFlag::Create();
+  ScopedTaskSafety task_safety_;
 
   SequenceChecker decode_queue_;
   rtc::ThreadChecker main_thread_;
diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc
index 116cf28..7e3bed1 100644
--- a/video/rtp_streams_synchronizer2.cc
+++ b/video/rtp_streams_synchronizer2.cc
@@ -47,7 +47,6 @@
 
 RtpStreamsSynchronizer::~RtpStreamsSynchronizer() {
   RTC_DCHECK_RUN_ON(&main_checker_);
-  task_safety_flag_->SetNotAlive();
 }
 
 void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
@@ -85,13 +84,12 @@
   }
 
   RTC_DCHECK_LE(delay, kSyncIntervalMs);
-  task_queue_->PostDelayedTask(ToQueuedTask([this, safety = task_safety_flag_] {
-                                 if (!safety->alive())
-                                   return;
-                                 RTC_DCHECK_RUN_ON(&main_checker_);
-                                 timer_running_ = false;
-                                 UpdateDelay();
-                               }),
+  task_queue_->PostDelayedTask(ToQueuedTask(task_safety_,
+                                            [this] {
+                                              RTC_DCHECK_RUN_ON(&main_checker_);
+                                              timer_running_ = false;
+                                              UpdateDelay();
+                                            }),
                                delay);
 }
 
diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h
index 353434e..83dd0fb 100644
--- a/video/rtp_streams_synchronizer2.h
+++ b/video/rtp_streams_synchronizer2.h
@@ -70,8 +70,7 @@
   bool timer_running_ RTC_GUARDED_BY(main_checker_) = false;
 
   // Used to signal destruction to potentially pending tasks.
-  PendingTaskSafetyFlag::Pointer task_safety_flag_ =
-      PendingTaskSafetyFlag::Create();
+  ScopedTaskSafety task_safety_;
 };
 
 }  // namespace internal
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index 510c260..b1b482d 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -269,7 +269,6 @@
   RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
   RTC_LOG(LS_INFO) << "~VideoReceiveStream2: " << config_.ToString();
   Stop();
-  task_safety_flag_->SetNotAlive();
 }
 
 void VideoReceiveStream2::SignalNetworkState(NetworkState state) {
@@ -491,7 +490,7 @@
   VideoFrameMetaData frame_meta(video_frame, clock_->CurrentTime());
 
   worker_thread_->PostTask(
-      ToQueuedTask(task_safety_flag_, [frame_meta, this]() {
+      ToQueuedTask(task_safety_, [frame_meta, this]() {
         RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
         int64_t video_playout_ntp_ms;
         int64_t sync_offset_ms;
@@ -703,7 +702,7 @@
   // check if we have received a packet within the last 5 seconds.
   bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000;
   if (!stream_is_active) {
-    worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+    worker_thread_->PostTask(ToQueuedTask(task_safety_, [this]() {
       RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
       stats_proxy_.OnStreamInactive();
     }));
diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h
index bbed08a..f8cd65d 100644
--- a/video/video_receive_stream2.h
+++ b/video/video_receive_stream2.h
@@ -255,8 +255,7 @@
   rtc::TaskQueue decode_queue_;
 
   // Used to signal destruction to potentially pending tasks.
-  PendingTaskSafetyFlag::Pointer task_safety_flag_ =
-      PendingTaskSafetyFlag::Create();
+  ScopedTaskSafety task_safety_;
 };
 }  // namespace internal
 }  // namespace webrtc