Improve screen sharing with PipeWire on Wayland

Changes:
1) Scoped class
This is a special class for GLib based objects which we need to manually
delete with different functions. Wrapping these objects into Scoped class
will destroy them automatically when they go out of scope.

2) Window sharing support
Unlike screen sharing, with window sharing we are required to obtain more
information from the PipeWire stream, like video crop metadata, which we
use to properly set size of our buffer.

3) Support for DmaBuf and MemFd buffer types
As of now, we expected the PipeWire stream will provide only plain data
which we just need to copy to our buffer. We now add support for new
buffer types, which are often preferred for better effeciency.

4) Minor bugfixes:
a) Additionally accept PipeWire streams using alpha channels (BGRA, RGBA)
b) Add lock over PipeWire loop to prevent potential issues until we fully
   intialize everything we need
c) When obtaining buffers, make sure we work with the latest one

Bug: chromium:682122
Change-Id: I64638d5dcbe18e7280550dca0b01b17c511ac98a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/194100
Commit-Queue: Jamie Walch <jamiewalch@chromium.org>
Reviewed-by: Jamie Walch <jamiewalch@chromium.org>
Cr-Commit-Position: refs/heads/master@{#32763}
diff --git a/modules/desktop_capture/BUILD.gn b/modules/desktop_capture/BUILD.gn
index 427526c..72273f9 100644
--- a/modules/desktop_capture/BUILD.gn
+++ b/modules/desktop_capture/BUILD.gn
@@ -500,6 +500,7 @@
   absl_deps = [
     "//third_party/abseil-cpp/absl/memory",
     "//third_party/abseil-cpp/absl/strings",
+    "//third_party/abseil-cpp/absl/types:optional",
   ]
 
   if (rtc_use_x11_extensions) {
@@ -520,10 +521,6 @@
     sources += [
       "linux/base_capturer_pipewire.cc",
       "linux/base_capturer_pipewire.h",
-      "linux/screen_capturer_pipewire.cc",
-      "linux/screen_capturer_pipewire.h",
-      "linux/window_capturer_pipewire.cc",
-      "linux/window_capturer_pipewire.h",
     ]
 
     configs += [
diff --git a/modules/desktop_capture/linux/base_capturer_pipewire.cc b/modules/desktop_capture/linux/base_capturer_pipewire.cc
index 2640e93..bc96535 100644
--- a/modules/desktop_capture/linux/base_capturer_pipewire.cc
+++ b/modules/desktop_capture/linux/base_capturer_pipewire.cc
@@ -17,6 +17,10 @@
 #include <spa/param/video/raw-utils.h>
 #include <spa/support/type-map.h>
 
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+
 #include <memory>
 #include <utility>
 
@@ -51,6 +55,137 @@
 #endif
 
 // static
+struct dma_buf_sync {
+  uint64_t flags;
+};
+#define DMA_BUF_SYNC_READ (1 << 0)
+#define DMA_BUF_SYNC_START (0 << 2)
+#define DMA_BUF_SYNC_END (1 << 2)
+#define DMA_BUF_BASE 'b'
+#define DMA_BUF_IOCTL_SYNC _IOW(DMA_BUF_BASE, 0, struct dma_buf_sync)
+
+static void SyncDmaBuf(int fd, uint64_t start_or_end) {
+  struct dma_buf_sync sync = {0};
+
+  sync.flags = start_or_end | DMA_BUF_SYNC_READ;
+
+  while (true) {
+    int ret;
+    ret = ioctl(fd, DMA_BUF_IOCTL_SYNC, &sync);
+    if (ret == -1 && errno == EINTR) {
+      continue;
+    } else if (ret == -1) {
+      RTC_LOG(LS_ERROR) << "Failed to synchronize DMA buffer: "
+                        << g_strerror(errno);
+      break;
+    } else {
+      break;
+    }
+  }
+}
+
+class ScopedBuf {
+ public:
+  ScopedBuf() {}
+  ScopedBuf(unsigned char* map, int map_size, bool is_dma_buf, int fd)
+      : map_(map), map_size_(map_size), is_dma_buf_(is_dma_buf), fd_(fd) {}
+  ~ScopedBuf() {
+    if (map_ != MAP_FAILED) {
+      if (is_dma_buf_) {
+        SyncDmaBuf(fd_, DMA_BUF_SYNC_END);
+      }
+      munmap(map_, map_size_);
+    }
+  }
+
+  operator bool() { return map_ != MAP_FAILED; }
+
+  void initialize(unsigned char* map, int map_size, bool is_dma_buf, int fd) {
+    map_ = map;
+    map_size_ = map_size;
+    is_dma_buf_ = is_dma_buf;
+    fd_ = fd;
+  }
+
+  unsigned char* get() { return map_; }
+
+ protected:
+  unsigned char* map_ = nullptr;
+  int map_size_;
+  bool is_dma_buf_;
+  int fd_;
+};
+
+template <class T>
+class Scoped {
+ public:
+  Scoped() {}
+  explicit Scoped(T* val) { ptr_ = val; }
+  ~Scoped() { RTC_NOTREACHED(); }
+
+  T* operator->() { return ptr_; }
+
+  bool operator!() { return ptr_ == nullptr; }
+
+  T* get() { return ptr_; }
+
+  T** receive() {
+    RTC_CHECK(!ptr_);
+    return &ptr_;
+  }
+
+  Scoped& operator=(T* val) {
+    ptr_ = val;
+    return *this;
+  }
+
+ protected:
+  T* ptr_ = nullptr;
+};
+
+template <>
+Scoped<GError>::~Scoped() {
+  if (ptr_) {
+    g_error_free(ptr_);
+  }
+}
+
+template <>
+Scoped<gchar>::~Scoped() {
+  if (ptr_) {
+    g_free(ptr_);
+  }
+}
+
+template <>
+Scoped<GVariant>::~Scoped() {
+  if (ptr_) {
+    g_variant_unref(ptr_);
+  }
+}
+
+template <>
+Scoped<GVariantIter>::~Scoped() {
+  if (ptr_) {
+    g_variant_iter_free(ptr_);
+  }
+}
+
+template <>
+Scoped<GDBusMessage>::~Scoped() {
+  if (ptr_) {
+    g_object_unref(ptr_);
+  }
+}
+
+template <>
+Scoped<GUnixFDList>::~Scoped() {
+  if (ptr_) {
+    g_object_unref(ptr_);
+  }
+}
+
+// static
 void BaseCapturerPipeWire::OnStateChanged(void* data,
                                           pw_remote_state old_state,
                                           pw_remote_state state,
@@ -122,11 +257,13 @@
   auto stride = SPA_ROUND_UP_N(width * kBytesPerPixel, 4);
   auto size = height * stride;
 
+  that->desktop_size_ = DesktopSize(width, height);
+
   uint8_t buffer[1024] = {};
   auto builder = spa_pod_builder{buffer, sizeof(buffer)};
 
   // Setup buffers and meta header for new format.
-  const struct spa_pod* params[2];
+  const struct spa_pod* params[3];
   params[0] = reinterpret_cast<spa_pod*>(spa_pod_builder_object(
       &builder,
       // id to enumerate buffer requirements
@@ -155,8 +292,17 @@
       // Size: size of the metadata, specified as integer (i)
       ":", that->pw_core_type_->param_meta.size, "i",
       sizeof(struct spa_meta_header)));
-
-  pw_stream_finish_format(that->pw_stream_, /*res=*/0, params, /*n_params=*/2);
+  params[2] = reinterpret_cast<spa_pod*>(spa_pod_builder_object(
+      &builder,
+      // id to enumerate supported metadata
+      that->pw_core_type_->param.idMeta, that->pw_core_type_->param_meta.Meta,
+      // Type: specified as id or enum (I)
+      ":", that->pw_core_type_->param_meta.type, "I",
+      that->pw_core_type_->meta.VideoCrop,
+      // Size: size of the metadata, specified as integer (i)
+      ":", that->pw_core_type_->param_meta.size, "i",
+      sizeof(struct spa_meta_video_crop)));
+  pw_stream_finish_format(that->pw_stream_, /*res=*/0, params, /*n_params=*/3);
 }
 
 // static
@@ -164,15 +310,26 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(data);
   RTC_DCHECK(that);
 
-  pw_buffer* buf = nullptr;
+  struct pw_buffer* next_buffer;
+  struct pw_buffer* buffer = nullptr;
 
-  if (!(buf = pw_stream_dequeue_buffer(that->pw_stream_))) {
+  next_buffer = pw_stream_dequeue_buffer(that->pw_stream_);
+  while (next_buffer) {
+    buffer = next_buffer;
+    next_buffer = pw_stream_dequeue_buffer(that->pw_stream_);
+
+    if (next_buffer) {
+      pw_stream_queue_buffer(that->pw_stream_, buffer);
+    }
+  }
+
+  if (!buffer) {
     return;
   }
 
-  that->HandleBuffer(buf);
+  that->HandleBuffer(buffer);
 
-  pw_stream_queue_buffer(that->pw_stream_, buf);
+  pw_stream_queue_buffer(that->pw_stream_, buffer);
 }
 
 BaseCapturerPipeWire::BaseCapturerPipeWire(CaptureSourceType source_type)
@@ -211,10 +368,6 @@
     pw_loop_destroy(pw_loop_);
   }
 
-  if (current_frame_) {
-    free(current_frame_);
-  }
-
   if (start_request_signal_id_) {
     g_dbus_connection_signal_unsubscribe(connection_, start_request_signal_id_);
   }
@@ -228,18 +381,16 @@
   }
 
   if (session_handle_) {
-    GDBusMessage* message = g_dbus_message_new_method_call(
-        kDesktopBusName, session_handle_, kSessionInterfaceName, "Close");
-    if (message) {
-      GError* error = nullptr;
-      g_dbus_connection_send_message(connection_, message,
+    Scoped<GDBusMessage> message(g_dbus_message_new_method_call(
+        kDesktopBusName, session_handle_, kSessionInterfaceName, "Close"));
+    if (message.get()) {
+      Scoped<GError> error;
+      g_dbus_connection_send_message(connection_, message.get(),
                                      G_DBUS_SEND_MESSAGE_FLAGS_NONE,
-                                     /*out_serial=*/nullptr, &error);
-      if (error) {
+                                     /*out_serial=*/nullptr, error.receive());
+      if (error.get()) {
         RTC_LOG(LS_ERROR) << "Failed to close the session: " << error->message;
-        g_error_free(error);
       }
-      g_object_unref(message);
     }
   }
 
@@ -287,6 +438,8 @@
   pw_loop_ = pw_loop_new(/*properties=*/nullptr);
   pw_main_loop_ = pw_thread_loop_new(pw_loop_, "pipewire-main-loop");
 
+  pw_thread_loop_lock(pw_main_loop_);
+
   pw_core_ = pw_core_new(pw_loop_, /*properties=*/nullptr);
   pw_core_type_ = pw_core_get_type(pw_core_);
   pw_remote_ = pw_remote_new(pw_core_, nullptr, /*user_data_size=*/0);
@@ -311,6 +464,8 @@
     portal_init_failed_ = true;
   }
 
+  pw_thread_loop_unlock(pw_main_loop_);
+
   RTC_LOG(LS_INFO) << "PipeWire remote opened.";
 }
 
@@ -326,12 +481,7 @@
 
 void BaseCapturerPipeWire::CreateReceivingStream() {
   spa_rectangle pwMinScreenBounds = spa_rectangle{1, 1};
-  spa_rectangle pwScreenBounds =
-      spa_rectangle{static_cast<uint32_t>(desktop_size_.width()),
-                    static_cast<uint32_t>(desktop_size_.height())};
-
-  spa_fraction pwFrameRateMin = spa_fraction{0, 1};
-  spa_fraction pwFrameRateMax = spa_fraction{60, 1};
+  spa_rectangle pwMaxScreenBounds = spa_rectangle{UINT32_MAX, UINT32_MAX};
 
   pw_properties* reuseProps =
       pw_properties_new_string("pipewire.client.reuse=1");
@@ -349,27 +499,19 @@
       // then allowed formats are enumerated (e) and the format is undecided (u)
       // to allow negotiation
       ":", pw_type_->format_video.format, "Ieu", pw_type_->video_format.BGRx,
-      SPA_POD_PROP_ENUM(2, pw_type_->video_format.RGBx,
-                        pw_type_->video_format.BGRx),
+      SPA_POD_PROP_ENUM(
+          4, pw_type_->video_format.RGBx, pw_type_->video_format.BGRx,
+          pw_type_->video_format.RGBA, pw_type_->video_format.BGRA),
       // Video size: specified as rectangle (R), preferred size is specified as
       // first parameter, then allowed size is defined as range (r) from min and
       // max values and the format is undecided (u) to allow negotiation
-      ":", pw_type_->format_video.size, "Rru", &pwScreenBounds, 2,
-      &pwMinScreenBounds, &pwScreenBounds,
-      // Frame rate: specified as fraction (F) and set to minimum frame rate
-      // value
-      ":", pw_type_->format_video.framerate, "F", &pwFrameRateMin,
-      // Max frame rate: specified as fraction (F), preferred frame rate is set
-      // to maximum value, then allowed frame rate is defined as range (r) from
-      // min and max values and it is undecided (u) to allow negotiation
-      ":", pw_type_->format_video.max_framerate, "Fru", &pwFrameRateMax, 2,
-      &pwFrameRateMin, &pwFrameRateMax));
+      ":", pw_type_->format_video.size, "Rru", &pwMinScreenBounds,
+      SPA_POD_PROP_MIN_MAX(&pwMinScreenBounds, &pwMaxScreenBounds)));
 
   pw_stream_add_listener(pw_stream_, &spa_stream_listener_, &pw_stream_events_,
                          this);
   pw_stream_flags flags = static_cast<pw_stream_flags>(
-      PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_INACTIVE |
-      PW_STREAM_FLAG_MAP_BUFFERS);
+      PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_INACTIVE);
   if (pw_stream_connect(pw_stream_, PW_DIRECTION_INPUT, /*port_path=*/nullptr,
                         flags, params,
                         /*n_params=*/1) != 0) {
@@ -381,37 +523,120 @@
 
 void BaseCapturerPipeWire::HandleBuffer(pw_buffer* buffer) {
   spa_buffer* spaBuffer = buffer->buffer;
-  void* src = nullptr;
+  ScopedBuf map;
+  uint8_t* src = nullptr;
 
-  if (!(src = spaBuffer->datas[0].data)) {
+  if (spaBuffer->datas[0].chunk->size == 0) {
+    RTC_LOG(LS_ERROR) << "Failed to get video stream: Zero size.";
     return;
   }
 
-  uint32_t maxSize = spaBuffer->datas[0].maxsize;
-  int32_t srcStride = spaBuffer->datas[0].chunk->stride;
-  if (srcStride != (desktop_size_.width() * kBytesPerPixel)) {
+  if (spaBuffer->datas[0].type == pw_core_type_->data.MemFd ||
+      spaBuffer->datas[0].type == pw_core_type_->data.DmaBuf) {
+    map.initialize(
+        static_cast<uint8_t*>(
+            mmap(nullptr,
+                 spaBuffer->datas[0].maxsize + spaBuffer->datas[0].mapoffset,
+                 PROT_READ, MAP_PRIVATE, spaBuffer->datas[0].fd, 0)),
+        spaBuffer->datas[0].maxsize + spaBuffer->datas[0].mapoffset,
+        spaBuffer->datas[0].type == pw_core_type_->data.DmaBuf,
+        spaBuffer->datas[0].fd);
+
+    if (!map) {
+      RTC_LOG(LS_ERROR) << "Failed to mmap the memory: "
+                        << std::strerror(errno);
+      return;
+    }
+
+    if (spaBuffer->datas[0].type == pw_core_type_->data.DmaBuf) {
+      SyncDmaBuf(spaBuffer->datas[0].fd, DMA_BUF_SYNC_START);
+    }
+
+    src = SPA_MEMBER(map.get(), spaBuffer->datas[0].mapoffset, uint8_t);
+  } else if (spaBuffer->datas[0].type == pw_core_type_->data.MemPtr) {
+    src = static_cast<uint8_t*>(spaBuffer->datas[0].data);
+  }
+
+  if (!src) {
+    return;
+  }
+
+  struct spa_meta_video_crop* video_metadata =
+      static_cast<struct spa_meta_video_crop*>(
+          spa_buffer_find_meta(spaBuffer, pw_core_type_->meta.VideoCrop));
+
+  // Video size from metadata is bigger than an actual video stream size.
+  // The metadata are wrong or we should up-scale the video...in both cases
+  // just quit now.
+  if (video_metadata && (video_metadata->width > desktop_size_.width() ||
+                         video_metadata->height > desktop_size_.height())) {
+    RTC_LOG(LS_ERROR) << "Stream metadata sizes are wrong!";
+    return;
+  }
+
+  // Use video metadata when video size from metadata is set and smaller than
+  // video stream size, so we need to adjust it.
+  bool video_is_full_width = true;
+  bool video_is_full_height = true;
+  if (video_metadata && video_metadata->width != 0 &&
+      video_metadata->height != 0) {
+    if (video_metadata->width < desktop_size_.width()) {
+      video_is_full_width = false;
+    } else if (video_metadata->height < desktop_size_.height()) {
+      video_is_full_height = false;
+    }
+  }
+
+  DesktopSize video_size_prev = video_size_;
+  if (!video_is_full_height || !video_is_full_width) {
+    video_size_ = DesktopSize(video_metadata->width, video_metadata->height);
+  } else {
+    video_size_ = desktop_size_;
+  }
+
+  webrtc::MutexLock lock(&current_frame_lock_);
+  if (!current_frame_ || !video_size_.equals(video_size_prev)) {
+    current_frame_ = std::make_unique<uint8_t[]>(
+        video_size_.width() * video_size_.height() * kBytesPerPixel);
+  }
+
+  const int32_t dst_stride = video_size_.width() * kBytesPerPixel;
+  const int32_t src_stride = spaBuffer->datas[0].chunk->stride;
+
+  if (src_stride != (desktop_size_.width() * kBytesPerPixel)) {
     RTC_LOG(LS_ERROR) << "Got buffer with stride different from screen stride: "
-                      << srcStride
+                      << src_stride
                       << " != " << (desktop_size_.width() * kBytesPerPixel);
     portal_init_failed_ = true;
+
     return;
   }
 
-  if (!current_frame_) {
-    current_frame_ = static_cast<uint8_t*>(malloc(maxSize));
+  // Adjust source content based on metadata video position
+  if (!video_is_full_height &&
+      (video_metadata->y + video_size_.height() <= desktop_size_.height())) {
+    src += src_stride * video_metadata->y;
   }
-  RTC_DCHECK(current_frame_ != nullptr);
 
-  // If both sides decided to go with the RGBx format we need to convert it to
-  // BGRx to match color format expected by WebRTC.
-  if (spa_video_format_->format == pw_type_->video_format.RGBx) {
-    uint8_t* tempFrame = static_cast<uint8_t*>(malloc(maxSize));
-    std::memcpy(tempFrame, src, maxSize);
-    ConvertRGBxToBGRx(tempFrame, maxSize);
-    std::memcpy(current_frame_, tempFrame, maxSize);
-    free(tempFrame);
-  } else {
-    std::memcpy(current_frame_, src, maxSize);
+  const int x_offset =
+      !video_is_full_width &&
+              (video_metadata->x + video_size_.width() <= desktop_size_.width())
+          ? video_metadata->x * kBytesPerPixel
+          : 0;
+
+  uint8_t* dst = current_frame_.get();
+  for (int i = 0; i < video_size_.height(); ++i) {
+    // Adjust source content based on crop video position if needed
+    src += x_offset;
+    std::memcpy(dst, src, dst_stride);
+    // If both sides decided to go with the RGBx format we need to convert it to
+    // BGRx to match color format expected by WebRTC.
+    if (spa_video_format_->format == pw_type_->video_format.RGBx ||
+        spa_video_format_->format == pw_type_->video_format.RGBA) {
+      ConvertRGBxToBGRx(dst, dst_stride);
+    }
+    src += src_stride - x_offset;
+    dst += dst_stride;
   }
 }
 
@@ -441,14 +666,13 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(user_data);
   RTC_DCHECK(that);
 
-  GError* error = nullptr;
-  GDBusProxy *proxy = g_dbus_proxy_new_finish(result, &error);
+  Scoped<GError> error;
+  GDBusProxy* proxy = g_dbus_proxy_new_finish(result, error.receive());
   if (!proxy) {
-    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_CANCELLED))
       return;
     RTC_LOG(LS_ERROR) << "Failed to create a proxy for the screen cast portal: "
                       << error->message;
-    g_error_free(error);
     that->portal_init_failed_ = true;
     return;
   }
@@ -462,38 +686,36 @@
 // static
 gchar* BaseCapturerPipeWire::PrepareSignalHandle(GDBusConnection* connection,
                                                  const gchar* token) {
-  gchar* sender = g_strdup(g_dbus_connection_get_unique_name(connection) + 1);
-  for (int i = 0; sender[i]; i++) {
-    if (sender[i] == '.') {
-      sender[i] = '_';
+  Scoped<gchar> sender(
+      g_strdup(g_dbus_connection_get_unique_name(connection) + 1));
+  for (int i = 0; sender.get()[i]; i++) {
+    if (sender.get()[i] == '.') {
+      sender.get()[i] = '_';
     }
   }
 
-  gchar* handle = g_strconcat(kDesktopRequestObjectPath, "/", sender, "/",
+  gchar* handle = g_strconcat(kDesktopRequestObjectPath, "/", sender.get(), "/",
                               token, /*end of varargs*/ nullptr);
-  g_free(sender);
 
   return handle;
 }
 
 void BaseCapturerPipeWire::SessionRequest() {
   GVariantBuilder builder;
-  gchar* variant_string;
+  Scoped<gchar> variant_string;
 
   g_variant_builder_init(&builder, G_VARIANT_TYPE_VARDICT);
   variant_string =
       g_strdup_printf("webrtc_session%d", g_random_int_range(0, G_MAXINT));
   g_variant_builder_add(&builder, "{sv}", "session_handle_token",
-                        g_variant_new_string(variant_string));
-  g_free(variant_string);
+                        g_variant_new_string(variant_string.get()));
   variant_string = g_strdup_printf("webrtc%d", g_random_int_range(0, G_MAXINT));
   g_variant_builder_add(&builder, "{sv}", "handle_token",
-                        g_variant_new_string(variant_string));
+                        g_variant_new_string(variant_string.get()));
 
-  portal_handle_ = PrepareSignalHandle(connection_, variant_string);
+  portal_handle_ = PrepareSignalHandle(connection_, variant_string.get());
   session_request_signal_id_ = SetupRequestResponseSignal(
       portal_handle_, OnSessionRequestResponseSignal);
-  g_free(variant_string);
 
   RTC_LOG(LS_INFO) << "Screen cast session requested.";
   g_dbus_proxy_call(
@@ -509,22 +731,21 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(user_data);
   RTC_DCHECK(that);
 
-  GError* error = nullptr;
-  GVariant* variant = g_dbus_proxy_call_finish(proxy, result, &error);
+  Scoped<GError> error;
+  Scoped<GVariant> variant(
+      g_dbus_proxy_call_finish(proxy, result, error.receive()));
   if (!variant) {
-    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_CANCELLED))
       return;
     RTC_LOG(LS_ERROR) << "Failed to create a screen cast session: "
                       << error->message;
-    g_error_free(error);
     that->portal_init_failed_ = true;
     return;
   }
   RTC_LOG(LS_INFO) << "Initializing the screen cast session.";
 
-  gchar* handle = nullptr;
-  g_variant_get_child(variant, 0, "o", &handle);
-  g_variant_unref(variant);
+  Scoped<gchar> handle;
+  g_variant_get_child(variant.get(), 0, "o", &handle);
   if (!handle) {
     RTC_LOG(LS_ERROR) << "Failed to initialize the screen cast session.";
     if (that->session_request_signal_id_) {
@@ -536,8 +757,6 @@
     return;
   }
 
-  g_free(handle);
-
   RTC_LOG(LS_INFO) << "Subscribing to the screen cast session.";
 }
 
@@ -557,11 +776,11 @@
       << "Received response for the screen cast session subscription.";
 
   guint32 portal_response;
-  GVariant* response_data;
-  g_variant_get(parameters, "(u@a{sv})", &portal_response, &response_data);
-  g_variant_lookup(response_data, "session_handle", "s",
+  Scoped<GVariant> response_data;
+  g_variant_get(parameters, "(u@a{sv})", &portal_response,
+                response_data.receive());
+  g_variant_lookup(response_data.get(), "session_handle", "s",
                    &that->session_handle_);
-  g_variant_unref(response_data);
 
   if (!that->session_handle_ || portal_response) {
     RTC_LOG(LS_ERROR)
@@ -575,23 +794,23 @@
 
 void BaseCapturerPipeWire::SourcesRequest() {
   GVariantBuilder builder;
-  gchar* variant_string;
+  Scoped<gchar> variant_string;
 
   g_variant_builder_init(&builder, G_VARIANT_TYPE_VARDICT);
   // We want to record monitor content.
-  g_variant_builder_add(&builder, "{sv}", "types",
-                        g_variant_new_uint32(capture_source_type_));
+  g_variant_builder_add(
+      &builder, "{sv}", "types",
+      g_variant_new_uint32(static_cast<uint32_t>(capture_source_type_)));
   // We don't want to allow selection of multiple sources.
   g_variant_builder_add(&builder, "{sv}", "multiple",
                         g_variant_new_boolean(false));
   variant_string = g_strdup_printf("webrtc%d", g_random_int_range(0, G_MAXINT));
   g_variant_builder_add(&builder, "{sv}", "handle_token",
-                        g_variant_new_string(variant_string));
+                        g_variant_new_string(variant_string.get()));
 
-  sources_handle_ = PrepareSignalHandle(connection_, variant_string);
+  sources_handle_ = PrepareSignalHandle(connection_, variant_string.get());
   sources_request_signal_id_ = SetupRequestResponseSignal(
       sources_handle_, OnSourcesRequestResponseSignal);
-  g_free(variant_string);
 
   RTC_LOG(LS_INFO) << "Requesting sources from the screen cast session.";
   g_dbus_proxy_call(
@@ -608,22 +827,21 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(user_data);
   RTC_DCHECK(that);
 
-  GError* error = nullptr;
-  GVariant* variant = g_dbus_proxy_call_finish(proxy, result, &error);
+  Scoped<GError> error;
+  Scoped<GVariant> variant(
+      g_dbus_proxy_call_finish(proxy, result, error.receive()));
   if (!variant) {
-    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_CANCELLED))
       return;
     RTC_LOG(LS_ERROR) << "Failed to request the sources: " << error->message;
-    g_error_free(error);
     that->portal_init_failed_ = true;
     return;
   }
 
   RTC_LOG(LS_INFO) << "Sources requested from the screen cast session.";
 
-  gchar* handle = nullptr;
-  g_variant_get_child(variant, 0, "o", &handle);
-  g_variant_unref(variant);
+  Scoped<gchar> handle;
+  g_variant_get_child(variant.get(), 0, "o", handle.receive());
   if (!handle) {
     RTC_LOG(LS_ERROR) << "Failed to initialize the screen cast session.";
     if (that->sources_request_signal_id_) {
@@ -635,8 +853,6 @@
     return;
   }
 
-  g_free(handle);
-
   RTC_LOG(LS_INFO) << "Subscribed to sources signal.";
 }
 
@@ -668,17 +884,16 @@
 
 void BaseCapturerPipeWire::StartRequest() {
   GVariantBuilder builder;
-  gchar* variant_string;
+  Scoped<gchar> variant_string;
 
   g_variant_builder_init(&builder, G_VARIANT_TYPE_VARDICT);
   variant_string = g_strdup_printf("webrtc%d", g_random_int_range(0, G_MAXINT));
   g_variant_builder_add(&builder, "{sv}", "handle_token",
-                        g_variant_new_string(variant_string));
+                        g_variant_new_string(variant_string.get()));
 
-  start_handle_ = PrepareSignalHandle(connection_, variant_string);
+  start_handle_ = PrepareSignalHandle(connection_, variant_string.get());
   start_request_signal_id_ =
       SetupRequestResponseSignal(start_handle_, OnStartRequestResponseSignal);
-  g_free(variant_string);
 
   // "Identifier for the application window", this is Wayland, so not "x11:...".
   const gchar parent_window[] = "";
@@ -698,23 +913,22 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(user_data);
   RTC_DCHECK(that);
 
-  GError* error = nullptr;
-  GVariant* variant = g_dbus_proxy_call_finish(proxy, result, &error);
+  Scoped<GError> error;
+  Scoped<GVariant> variant(
+      g_dbus_proxy_call_finish(proxy, result, error.receive()));
   if (!variant) {
-    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_CANCELLED))
       return;
     RTC_LOG(LS_ERROR) << "Failed to start the screen cast session: "
                       << error->message;
-    g_error_free(error);
     that->portal_init_failed_ = true;
     return;
   }
 
   RTC_LOG(LS_INFO) << "Initializing the start of the screen cast session.";
 
-  gchar* handle = nullptr;
-  g_variant_get_child(variant, 0, "o", &handle);
-  g_variant_unref(variant);
+  Scoped<gchar> handle;
+  g_variant_get_child(variant.get(), 0, "o", handle.receive());
   if (!handle) {
     RTC_LOG(LS_ERROR)
         << "Failed to initialize the start of the screen cast session.";
@@ -727,8 +941,6 @@
     return;
   }
 
-  g_free(handle);
-
   RTC_LOG(LS_INFO) << "Subscribed to the start signal.";
 }
 
@@ -746,9 +958,10 @@
 
   RTC_LOG(LS_INFO) << "Start signal received.";
   guint32 portal_response;
-  GVariant* response_data;
-  GVariantIter* iter = nullptr;
-  g_variant_get(parameters, "(u@a{sv})", &portal_response, &response_data);
+  Scoped<GVariant> response_data;
+  Scoped<GVariantIter> iter;
+  g_variant_get(parameters, "(u@a{sv})", &portal_response,
+                response_data.receive());
   if (portal_response || !response_data) {
     RTC_LOG(LS_ERROR) << "Failed to start the screen cast session.";
     that->portal_init_failed_ = true;
@@ -758,28 +971,28 @@
   // Array of PipeWire streams. See
   // https://github.com/flatpak/xdg-desktop-portal/blob/master/data/org.freedesktop.portal.ScreenCast.xml
   // documentation for <method name="Start">.
-  if (g_variant_lookup(response_data, "streams", "a(ua{sv})", &iter)) {
-    GVariant* variant;
+  if (g_variant_lookup(response_data.get(), "streams", "a(ua{sv})",
+                       iter.receive())) {
+    Scoped<GVariant> variant;
 
-    while (g_variant_iter_next(iter, "@(ua{sv})", &variant)) {
+    while (g_variant_iter_next(iter.get(), "@(ua{sv})", variant.receive())) {
       guint32 stream_id;
-      gint32 width;
-      gint32 height;
-      GVariant* options;
+      guint32 type;
+      Scoped<GVariant> options;
 
-      g_variant_get(variant, "(u@a{sv})", &stream_id, &options);
-      RTC_DCHECK(options != nullptr);
+      g_variant_get(variant.get(), "(u@a{sv})", &stream_id, options.receive());
+      RTC_DCHECK(options.get());
 
-      g_variant_lookup(options, "size", "(ii)", &width, &height);
+      if (g_variant_lookup(options.get(), "source_type", "u", &type)) {
+        that->capture_source_type_ =
+            static_cast<BaseCapturerPipeWire::CaptureSourceType>(type);
+      }
 
-      that->desktop_size_.set(width, height);
+      that->pw_stream_node_id_ = stream_id;
 
-      g_variant_unref(options);
-      g_variant_unref(variant);
+      break;
     }
   }
-  g_variant_iter_free(iter);
-  g_variant_unref(response_data);
 
   that->OpenPipeWireRemote();
 }
@@ -807,35 +1020,30 @@
   BaseCapturerPipeWire* that = static_cast<BaseCapturerPipeWire*>(user_data);
   RTC_DCHECK(that);
 
-  GError* error = nullptr;
-  GUnixFDList* outlist = nullptr;
-  GVariant* variant = g_dbus_proxy_call_with_unix_fd_list_finish(
-      proxy, &outlist, result, &error);
+  Scoped<GError> error;
+  Scoped<GUnixFDList> outlist;
+  Scoped<GVariant> variant(g_dbus_proxy_call_with_unix_fd_list_finish(
+      proxy, outlist.receive(), result, error.receive()));
   if (!variant) {
-    if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    if (g_error_matches(error.get(), G_IO_ERROR, G_IO_ERROR_CANCELLED))
       return;
     RTC_LOG(LS_ERROR) << "Failed to open the PipeWire remote: "
                       << error->message;
-    g_error_free(error);
     that->portal_init_failed_ = true;
     return;
   }
 
   gint32 index;
-  g_variant_get(variant, "(h)", &index);
+  g_variant_get(variant.get(), "(h)", &index);
 
-  if ((that->pw_fd_ = g_unix_fd_list_get(outlist, index, &error)) == -1) {
+  if ((that->pw_fd_ =
+           g_unix_fd_list_get(outlist.get(), index, error.receive())) == -1) {
     RTC_LOG(LS_ERROR) << "Failed to get file descriptor from the list: "
                       << error->message;
-    g_error_free(error);
-    g_variant_unref(variant);
     that->portal_init_failed_ = true;
     return;
   }
 
-  g_variant_unref(variant);
-  g_object_unref(outlist);
-
   that->InitPipeWire();
 }
 
@@ -854,15 +1062,18 @@
     return;
   }
 
+  webrtc::MutexLock lock(&current_frame_lock_);
   if (!current_frame_) {
     callback_->OnCaptureResult(Result::ERROR_TEMPORARY, nullptr);
     return;
   }
 
-  std::unique_ptr<DesktopFrame> result(new BasicDesktopFrame(desktop_size_));
+  DesktopSize frame_size = video_size_;
+
+  std::unique_ptr<DesktopFrame> result(new BasicDesktopFrame(frame_size));
   result->CopyPixelsFrom(
-      current_frame_, (desktop_size_.width() * kBytesPerPixel),
-      DesktopRect::MakeWH(desktop_size_.width(), desktop_size_.height()));
+      current_frame_.get(), (frame_size.width() * kBytesPerPixel),
+      DesktopRect::MakeWH(frame_size.width(), frame_size.height()));
   if (!result) {
     callback_->OnCaptureResult(Result::ERROR_TEMPORARY, nullptr);
     return;
@@ -887,4 +1098,11 @@
   return true;
 }
 
+// static
+std::unique_ptr<DesktopCapturer> BaseCapturerPipeWire::CreateRawCapturer(
+    const DesktopCaptureOptions& options) {
+  return std::make_unique<BaseCapturerPipeWire>(
+      BaseCapturerPipeWire::CaptureSourceType::kAny);
+}
+
 }  // namespace webrtc
diff --git a/modules/desktop_capture/linux/base_capturer_pipewire.h b/modules/desktop_capture/linux/base_capturer_pipewire.h
index f28d7a5..88b567a 100644
--- a/modules/desktop_capture/linux/base_capturer_pipewire.h
+++ b/modules/desktop_capture/linux/base_capturer_pipewire.h
@@ -10,15 +10,16 @@
 
 #ifndef MODULES_DESKTOP_CAPTURE_LINUX_BASE_CAPTURER_PIPEWIRE_H_
 #define MODULES_DESKTOP_CAPTURE_LINUX_BASE_CAPTURER_PIPEWIRE_H_
-
 #include <gio/gio.h>
 #define typeof __typeof__
 #include <pipewire/pipewire.h>
 #include <spa/param/video/format-utils.h>
 
+#include "absl/types/optional.h"
 #include "modules/desktop_capture/desktop_capture_options.h"
 #include "modules/desktop_capture/desktop_capturer.h"
 #include "rtc_base/constructor_magic.h"
+#include "rtc_base/synchronization/mutex.h"
 
 namespace webrtc {
 
@@ -32,11 +33,21 @@
 
 class BaseCapturerPipeWire : public DesktopCapturer {
  public:
-  enum CaptureSourceType { Screen = 1, Window };
+  // Values are set based on source type property in
+  // xdg-desktop-portal/screencast
+  // https://github.com/flatpak/xdg-desktop-portal/blob/master/data/org.freedesktop.portal.ScreenCast.xml
+  enum class CaptureSourceType : uint32_t {
+    kScreen = 0b01,
+    kWindow = 0b10,
+    kAny = 0b11
+  };
 
   explicit BaseCapturerPipeWire(CaptureSourceType source_type);
   ~BaseCapturerPipeWire() override;
 
+  static std::unique_ptr<DesktopCapturer> CreateRawCapturer(
+      const DesktopCaptureOptions& options);
+
   // DesktopCapturer interface.
   void Start(Callback* delegate) override;
   void CaptureFrame() override;
@@ -61,10 +72,11 @@
 
   spa_video_info_raw* spa_video_format_ = nullptr;
 
+  guint32 pw_stream_node_id_ = 0;
   gint32 pw_fd_ = -1;
 
   CaptureSourceType capture_source_type_ =
-      BaseCapturerPipeWire::CaptureSourceType::Screen;
+      BaseCapturerPipeWire::CaptureSourceType::kScreen;
 
   // <-- end of PipeWire types
 
@@ -79,10 +91,12 @@
   guint sources_request_signal_id_ = 0;
   guint start_request_signal_id_ = 0;
 
+  DesktopSize video_size_;
   DesktopSize desktop_size_ = {};
   DesktopCaptureOptions options_ = {};
 
-  uint8_t* current_frame_ = nullptr;
+  webrtc::Mutex current_frame_lock_;
+  std::unique_ptr<uint8_t[]> current_frame_;
   Callback* callback_ = nullptr;
 
   bool portal_init_failed_ = false;
diff --git a/modules/desktop_capture/linux/pipewire.sigs b/modules/desktop_capture/linux/pipewire.sigs
index 3e21e9d..e915bc9 100644
--- a/modules/desktop_capture/linux/pipewire.sigs
+++ b/modules/desktop_capture/linux/pipewire.sigs
@@ -42,3 +42,5 @@
 pw_thread_loop * pw_thread_loop_new(pw_loop *loop, const char *name);
 int pw_thread_loop_start(pw_thread_loop *loop);
 void pw_thread_loop_stop(pw_thread_loop *loop);
+void pw_thread_loop_lock(struct pw_thread_loop *loop);
+void pw_thread_loop_unlock(struct pw_thread_loop *loop);
diff --git a/modules/desktop_capture/linux/screen_capturer_pipewire.cc b/modules/desktop_capture/linux/screen_capturer_pipewire.cc
deleted file mode 100644
index fe67214..0000000
--- a/modules/desktop_capture/linux/screen_capturer_pipewire.cc
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *  Copyright 2018 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/desktop_capture/linux/screen_capturer_pipewire.h"
-
-#include <memory>
-
-
-namespace webrtc {
-
-ScreenCapturerPipeWire::ScreenCapturerPipeWire()
-    : BaseCapturerPipeWire(BaseCapturerPipeWire::CaptureSourceType::Screen) {}
-ScreenCapturerPipeWire::~ScreenCapturerPipeWire() {}
-
-// static
-std::unique_ptr<DesktopCapturer>
-ScreenCapturerPipeWire::CreateRawScreenCapturer(
-    const DesktopCaptureOptions& options) {
-  return std::make_unique<ScreenCapturerPipeWire>();
-}
-
-}  // namespace webrtc
diff --git a/modules/desktop_capture/linux/screen_capturer_pipewire.h b/modules/desktop_capture/linux/screen_capturer_pipewire.h
deleted file mode 100644
index 66dcd68..0000000
--- a/modules/desktop_capture/linux/screen_capturer_pipewire.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *  Copyright 2018 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_DESKTOP_CAPTURE_LINUX_SCREEN_CAPTURER_PIPEWIRE_H_
-#define MODULES_DESKTOP_CAPTURE_LINUX_SCREEN_CAPTURER_PIPEWIRE_H_
-
-#include <memory>
-
-#include "modules/desktop_capture/linux/base_capturer_pipewire.h"
-
-namespace webrtc {
-
-class ScreenCapturerPipeWire : public BaseCapturerPipeWire {
- public:
-  ScreenCapturerPipeWire();
-  ~ScreenCapturerPipeWire() override;
-
-  static std::unique_ptr<DesktopCapturer> CreateRawScreenCapturer(
-      const DesktopCaptureOptions& options);
-
-  RTC_DISALLOW_COPY_AND_ASSIGN(ScreenCapturerPipeWire);
-};
-
-}  // namespace webrtc
-
-#endif  // MODULES_DESKTOP_CAPTURE_LINUX_SCREEN_CAPTURER_PIPEWIRE_H_
diff --git a/modules/desktop_capture/linux/window_capturer_pipewire.cc b/modules/desktop_capture/linux/window_capturer_pipewire.cc
deleted file mode 100644
index b455915..0000000
--- a/modules/desktop_capture/linux/window_capturer_pipewire.cc
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *  Copyright 2018 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/desktop_capture/linux/window_capturer_pipewire.h"
-
-#include <memory>
-
-
-namespace webrtc {
-
-WindowCapturerPipeWire::WindowCapturerPipeWire()
-    : BaseCapturerPipeWire(BaseCapturerPipeWire::CaptureSourceType::Window) {}
-WindowCapturerPipeWire::~WindowCapturerPipeWire() {}
-
-// static
-std::unique_ptr<DesktopCapturer>
-WindowCapturerPipeWire::CreateRawWindowCapturer(
-    const DesktopCaptureOptions& options) {
-  return std::make_unique<WindowCapturerPipeWire>();
-}
-
-}  // namespace webrtc
diff --git a/modules/desktop_capture/linux/window_capturer_pipewire.h b/modules/desktop_capture/linux/window_capturer_pipewire.h
deleted file mode 100644
index 7f184ef..0000000
--- a/modules/desktop_capture/linux/window_capturer_pipewire.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *  Copyright 2018 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_DESKTOP_CAPTURE_LINUX_WINDOW_CAPTURER_PIPEWIRE_H_
-#define MODULES_DESKTOP_CAPTURE_LINUX_WINDOW_CAPTURER_PIPEWIRE_H_
-
-#include <memory>
-
-#include "modules/desktop_capture/linux/base_capturer_pipewire.h"
-
-namespace webrtc {
-
-class WindowCapturerPipeWire : public BaseCapturerPipeWire {
- public:
-  WindowCapturerPipeWire();
-  ~WindowCapturerPipeWire() override;
-
-  static std::unique_ptr<DesktopCapturer> CreateRawWindowCapturer(
-      const DesktopCaptureOptions& options);
-
-  RTC_DISALLOW_COPY_AND_ASSIGN(WindowCapturerPipeWire);
-};
-
-}  // namespace webrtc
-
-#endif  // MODULES_DESKTOP_CAPTURE_LINUX_WINDOW_CAPTURER_PIPEWIRE_H_
diff --git a/modules/desktop_capture/screen_capturer_linux.cc b/modules/desktop_capture/screen_capturer_linux.cc
index 82dbae4..ed48b7d 100644
--- a/modules/desktop_capture/screen_capturer_linux.cc
+++ b/modules/desktop_capture/screen_capturer_linux.cc
@@ -14,7 +14,7 @@
 #include "modules/desktop_capture/desktop_capturer.h"
 
 #if defined(WEBRTC_USE_PIPEWIRE)
-#include "modules/desktop_capture/linux/screen_capturer_pipewire.h"
+#include "modules/desktop_capture/linux/base_capturer_pipewire.h"
 #endif  // defined(WEBRTC_USE_PIPEWIRE)
 
 #if defined(WEBRTC_USE_X11)
@@ -28,7 +28,7 @@
     const DesktopCaptureOptions& options) {
 #if defined(WEBRTC_USE_PIPEWIRE)
   if (options.allow_pipewire() && DesktopCapturer::IsRunningUnderWayland()) {
-    return ScreenCapturerPipeWire::CreateRawScreenCapturer(options);
+    return BaseCapturerPipeWire::CreateRawCapturer(options);
   }
 #endif  // defined(WEBRTC_USE_PIPEWIRE)
 
diff --git a/modules/desktop_capture/window_capturer_linux.cc b/modules/desktop_capture/window_capturer_linux.cc
index 41dbf83..2b142ae 100644
--- a/modules/desktop_capture/window_capturer_linux.cc
+++ b/modules/desktop_capture/window_capturer_linux.cc
@@ -14,7 +14,7 @@
 #include "modules/desktop_capture/desktop_capturer.h"
 
 #if defined(WEBRTC_USE_PIPEWIRE)
-#include "modules/desktop_capture/linux/window_capturer_pipewire.h"
+#include "modules/desktop_capture/linux/base_capturer_pipewire.h"
 #endif  // defined(WEBRTC_USE_PIPEWIRE)
 
 #if defined(WEBRTC_USE_X11)
@@ -28,7 +28,7 @@
     const DesktopCaptureOptions& options) {
 #if defined(WEBRTC_USE_PIPEWIRE)
   if (options.allow_pipewire() && DesktopCapturer::IsRunningUnderWayland()) {
-    return WindowCapturerPipeWire::CreateRawWindowCapturer(options);
+    return BaseCapturerPipeWire::CreateRawCapturer(options);
   }
 #endif  // defined(WEBRTC_USE_PIPEWIRE)