diff --git a/src/platform/linux/graphics.h b/src/platform/linux/graphics.h index 00e38943cbf..d9bc1cdeaed 100644 --- a/src/platform/linux/graphics.h +++ b/src/platform/linux/graphics.h @@ -295,6 +295,10 @@ namespace egl { // Increment sequence when new rgb_t needs to be created std::uint64_t sequence; + + // PipeWire metadata + std::optional pts; + std::optional seq; }; class sws_t { diff --git a/src/platform/linux/portalgrab.cpp b/src/platform/linux/portalgrab.cpp index 2d9eeb281f3..f5fa5065cc5 100644 --- a/src/platform/linux/portalgrab.cpp +++ b/src/platform/linux/portalgrab.cpp @@ -137,6 +137,21 @@ namespace portal { struct pw_buffer *current_buffer; uint64_t drm_format; std::shared_ptr shared; + std::mutex frame_mutex; + std::condition_variable frame_cv; + size_t local_stride = 0; + bool frame_ready = false; + // Two distinct memory pools + std::vector buffer_a; + std::vector buffer_b; + // Points to the buffer currently owned by fill_img + std::vector *front_buffer; + // Points to the buffer currently being written by on_process + std::vector *back_buffer; + + stream_data_t(): + front_buffer(&buffer_a), + back_buffer(&buffer_b) {} }; struct dmabuf_format_info_t { @@ -147,7 +162,37 @@ namespace portal { class dbus_t { public: - ~dbus_t() { + ~dbus_t() noexcept { + try { + if (conn && !session_handle.empty()) { + g_autoptr(GError) err = nullptr; + // This is a blocking C call; it won't throw, but we wrap for safety + g_dbus_connection_call_sync( + conn, + "org.freedesktop.portal.Desktop", + session_handle.c_str(), + "org.freedesktop.portal.Session", + "Close", + nullptr, + nullptr, + G_DBUS_CALL_FLAGS_NONE, + -1, + nullptr, + &err + ); + + if (err) { + BOOST_LOG(warning) << "Failed to explicitly close portal session: "sv << err->message; + } else { + BOOST_LOG(debug) << "Explicitly closed portal session: "sv << session_handle; + } + } + } catch (const std::exception &e) { + BOOST_LOG(error) << "Standard exception caught in ~dbus_t: "sv << e.what(); + } catch (...) { + BOOST_LOG(error) << "Unknown exception caught in ~dbus_t"sv; + } + if (screencast_proxy) { g_object_unref(screencast_proxy); } @@ -249,6 +294,7 @@ namespace portal { GDBusConnection *conn; GDBusProxy *screencast_proxy; GDBusProxy *remote_desktop_proxy; + std::string session_handle; int create_portal_session(GMainLoop *loop, gchar **session_path_out, const gchar *session_token, bool use_screencast) { GDBusProxy *proxy = use_screencast ? screencast_proxy : remote_desktop_proxy; @@ -311,6 +357,8 @@ namespace portal { } BOOST_LOG(debug) << session_type << " CreateSession: got session handle: "sv << *session_path_out; + // Save it for the destructor to use during cleanup + this->session_handle = *session_path_out; return 0; } @@ -617,16 +665,24 @@ namespace portal { * * Call this when the session becomes invalid (e.g., on error). */ - void invalidate() { - std::scoped_lock lock(mutex_); - if (valid_) { - BOOST_LOG(debug) << "Invalidating cached portal session"sv; - if (pipewire_fd_ >= 0) { - close(pipewire_fd_); - pipewire_fd_ = -1; + void invalidate() noexcept { + try { + std::scoped_lock lock(mutex_); + if (valid_) { + BOOST_LOG(debug) << "Invalidating cached portal session"sv; + if (pipewire_fd_ >= 0) { + close(pipewire_fd_); + pipewire_fd_ = -1; + } + + dbus_.reset(); + + valid_ = false; } - dbus_.reset(); - valid_ = false; + } catch (const std::exception &e) { + BOOST_LOG(error) << "Exception during session invalidation: "sv << e.what(); + } catch (...) { + BOOST_LOG(error) << "Unknown error during session invalidation"sv; } } @@ -666,28 +722,16 @@ namespace portal { } ~pipewire_t() { - pw_thread_loop_lock(loop); - - if (stream_data.stream) { - pw_stream_destroy(stream_data.stream); - stream_data.stream = nullptr; - } - if (core) { - pw_core_disconnect(core); - core = nullptr; - } - if (context) { - pw_context_destroy(context); - context = nullptr; - } + cleanup_stream(); + pw_thread_loop_destroy(loop); + } - pw_thread_loop_unlock(loop); + std::mutex &frame_mutex() { + return stream_data.frame_mutex; + } - pw_thread_loop_stop(loop); - if (fd >= 0) { - close(fd); - } - pw_thread_loop_destroy(loop); + std::condition_variable &frame_cv() { + return stream_data.frame_cv; } void init(int stream_fd, int stream_node, std::shared_ptr shared_state) { @@ -711,10 +755,33 @@ namespace portal { void cleanup_stream() { if (loop && stream_data.stream) { pw_thread_loop_lock(loop); - pw_stream_disconnect(stream_data.stream); - pw_stream_destroy(stream_data.stream); - stream_data.stream = nullptr; + + // 1. Lock the frame mutex to stop fill_img + { + std::scoped_lock lock(stream_data.frame_mutex); + stream_data.frame_ready = false; + stream_data.current_buffer = nullptr; + } + + if (stream_data.stream) { + pw_stream_destroy(stream_data.stream); + stream_data.stream = nullptr; + } + if (core) { + pw_core_disconnect(core); + core = nullptr; + } + if (context) { + pw_context_destroy(context); + context = nullptr; + } + pw_thread_loop_unlock(loop); + + pw_thread_loop_stop(loop); + if (fd >= 0) { + close(fd); + } } session_cache_t::instance().invalidate(); } @@ -765,12 +832,33 @@ namespace portal { void fill_img(platf::img_t *img) { pw_thread_loop_lock(loop); - if (stream_data.current_buffer) { - struct spa_buffer *buf; - buf = stream_data.current_buffer->buffer; + // 1. Lock the frame mutex immediately to protect against on_process reallocations + std::scoped_lock lock(stream_data.frame_mutex); + + // Check if the stream is marked dead by modesetting logic + if (stream_data.shared && stream_data.shared->stream_dead.load()) { + img->data = nullptr; + pw_thread_loop_unlock(loop); + return; + } + + // 2. Validate we have a buffer and a signal that it's "new" + if (stream_data.current_buffer && stream_data.frame_ready) { + struct spa_buffer *buf = stream_data.current_buffer->buffer; + if (buf->datas[0].chunk->size != 0) { const auto img_descriptor = static_cast(img); img_descriptor->frame_timestamp = std::chrono::steady_clock::now(); + + // Passthrough PipeWire metadata + struct spa_meta_header *h = static_cast( + spa_buffer_find_meta_data(buf, SPA_META_Header, sizeof(*h)) + ); + if (h) { + img_descriptor->seq = h->seq; + img_descriptor->pts = h->pts; + } + if (buf->datas[0].type == SPA_DATA_DmaBuf) { img_descriptor->sd.width = stream_data.format.info.raw.size.width; img_descriptor->sd.height = stream_data.format.info.raw.size.height; @@ -783,10 +871,18 @@ namespace portal { img_descriptor->sd.offsets[i] = buf->datas[i].chunk->offset; } } else { - img->data = static_cast(buf->datas[0].data); - img->row_pitch = buf->datas[0].chunk->stride; + // Point the encoder to the front buffer + img->data = stream_data.front_buffer->data(); + img->row_pitch = stream_data.local_stride; + + // Reset flags + stream_data.frame_ready = false; + stream_data.current_buffer = nullptr; } } + } else { + // No new frame ready, or buffer was cleared during reinit + img->data = nullptr; } pw_thread_loop_unlock(loop); @@ -867,41 +963,67 @@ namespace portal { case PW_STREAM_STATE_PAUSED: // Trigger a reinit to identify if changes occurred if (d->shared && old == PW_STREAM_STATE_STREAMING) { + std::scoped_lock lock(d->frame_mutex); + d->frame_ready = false; + d->current_buffer = nullptr; d->shared->stream_dead.store(true, std::memory_order_relaxed); } break; - case PW_STREAM_STATE_CONNECTING: - case PW_STREAM_STATE_STREAMING: default: break; } - return; } static void on_process(void *user_data) { const auto d = static_cast(user_data); struct pw_buffer *b = nullptr; - while (true) { - struct pw_buffer *aux = pw_stream_dequeue_buffer(d->stream); - if (!aux) { - break; - } + // 1. Drain the queue: Always grab the most recent buffer + while (struct pw_buffer *aux = pw_stream_dequeue_buffer(d->stream)) { if (b) { - pw_stream_queue_buffer(d->stream, b); + pw_stream_queue_buffer(d->stream, b); // Return the older, unused buffer } b = aux; } - if (b == nullptr) { - BOOST_LOG(warning) << "out of pipewire buffers"sv; + if (!b) { return; } - if (d->current_buffer) { - pw_stream_queue_buffer(d->stream, d->current_buffer); + // 2. Fast Path: DMA-BUF + if (b->buffer->datas[0].type == SPA_DATA_DmaBuf) { + std::scoped_lock lock(d->frame_mutex); + if (d->current_buffer) { + pw_stream_queue_buffer(d->stream, d->current_buffer); + } + d->current_buffer = b; + d->frame_ready = true; + } + // 3. Optimized Path: Software/MemPtr + else if (b->buffer->datas[0].data != nullptr) { + size_t size = b->buffer->datas[0].chunk->size; + + // Perform the copy to the BACK buffer while NOT holding the lock + if (d->back_buffer->size() < size) { + d->back_buffer->resize(size); + } + std::memcpy(d->back_buffer->data(), b->buffer->datas[0].data, size); + + { + // Lock only for the pointer swap and state update + std::scoped_lock lock(d->frame_mutex); + std::swap(d->front_buffer, d->back_buffer); + + d->local_stride = b->buffer->datas[0].chunk->stride; + d->frame_ready = true; + d->current_buffer = b; + } + + // Release the PW buffer immediately after copy + pw_stream_queue_buffer(d->stream, b); } - d->current_buffer = b; + + d->frame_cv.notify_one(); } static void on_param_changed(void *user_data, uint32_t id, const struct spa_pod *param) { @@ -964,14 +1086,18 @@ namespace portal { buffer_types |= 1 << SPA_DATA_MemPtr; } - // Ack the buffer type + // Ack the buffer type and metadata std::array buffer; - std::array params; + std::array params; int n_params = 0; struct spa_pod_builder pod_builder = SPA_POD_BUILDER_INIT(buffer.data(), buffer.size()); auto buffer_param = static_cast(spa_pod_builder_add_object(&pod_builder, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, SPA_PARAM_BUFFERS_dataType, SPA_POD_Int(buffer_types))); params[n_params] = buffer_param; n_params++; + auto meta_param = static_cast(spa_pod_builder_add_object(&pod_builder, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header), SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header)))); + params[n_params] = meta_param; + n_params++; + pw_stream_update_params(d->stream, params.data(), n_params); } @@ -986,8 +1112,18 @@ namespace portal { class portal_t: public platf::display_t { public: int init(platf::mem_type_e hwdevice_type, const std::string &display_name, const ::video::config_t &config) { + // calculate frame interval we should capture at framerate = config.framerate; - delay = std::chrono::nanoseconds {1s} / framerate; + if (config.framerateX100 > 0) { + AVRational fps_strict = ::video::framerateX100_to_rational(config.framerateX100); + delay = std::chrono::nanoseconds( + (static_cast(fps_strict.den) * 1'000'000'000LL) / fps_strict.num + ); + BOOST_LOG(info) << "Requested frame rate [" << fps_strict.num << "/" << fps_strict.den << ", approx. " << av_q2d(fps_strict) << " fps]"; + } else { + delay = std::chrono::nanoseconds {1s} / framerate; + BOOST_LOG(info) << "Requested frame rate [" << framerate << "fps]"; + } mem_type = hwdevice_type; if (get_dmabuf_modifiers() < 0) { @@ -1049,23 +1185,48 @@ namespace portal { platf::capture_e snapshot(const pull_free_image_cb_t &pull_free_image_cb, std::shared_ptr &img_out, std::chrono::milliseconds timeout, bool show_cursor) { // FIXME: show_cursor is ignored - if (!pull_free_image_cb(img_out)) { - return platf::capture_e::interrupted; - } + auto start_time = std::chrono::steady_clock::now(); - const auto img_egl = static_cast(img_out.get()); - img_egl->reset(); - pipewire.fill_img(img_egl); + while (true) { + if (!pull_free_image_cb(img_out)) { + return platf::capture_e::interrupted; + } - // Check if we got valid data (either DMA-BUF fd or memory pointer) - if (img_egl->sd.fds[0] < 0 && img_egl->data == nullptr) { - // No buffer available yet from pipewire - return platf::capture_e::timeout; - } + const auto img_egl = static_cast(img_out.get()); + img_egl->reset(); + pipewire.fill_img(img_egl); - img_egl->sequence = ++sequence; + // Check if we got valid data (either DMA-BUF fd or memory pointer) + bool is_valid_data = (img_egl->sd.fds[0] >= 0 || img_egl->data != nullptr); - return platf::capture_e::ok; + // Duplicate detection: PipeWire seq increments on each new frame, + // pts advances with each buffer update. Both must advance to accept frame. + bool is_duplicate = (img_egl->seq.has_value() && img_egl->pts.has_value() && last_pts.has_value() && last_seq.has_value() && img_egl->pts.value() == last_pts.value() && img_egl->seq.value() == last_seq.value()); + + if (is_valid_data && !is_duplicate) { + // Frame found; check deadline + auto end_time = std::chrono::steady_clock::now(); + if (end_time - start_time > timeout) { + return platf::capture_e::timeout; + } + + if (img_egl->seq.has_value() && img_egl->pts.has_value()) { + last_seq = img_egl->seq.value(); + last_pts = img_egl->pts.value(); + } + img_egl->sequence = ++sequence; + return platf::capture_e::ok; + } + + // No valid frame yet, or it was a duplicate + auto now = std::chrono::steady_clock::now(); + if (now - start_time >= timeout) { + return platf::capture_e::timeout; + } + + std::unique_lock lock(pipewire.frame_mutex()); + pipewire.frame_cv().wait_until(lock, start_time + timeout); + } } std::shared_ptr alloc_img() override { @@ -1113,19 +1274,18 @@ namespace portal { } } + // Advance to (or catch up with) next delay interval auto now = std::chrono::steady_clock::now(); + while (next_frame < now) { + next_frame += delay; + } if (next_frame > now) { - std::this_thread::sleep_for(next_frame - now); + std::this_thread::sleep_until(next_frame); sleep_overshoot_logger.first_point(next_frame); sleep_overshoot_logger.second_point_now_and_log(); } - next_frame += delay; - if (next_frame < now) { // some major slowdown happened; we couldn't keep up - next_frame = now + delay; - } - std::shared_ptr img_out; switch (const auto status = snapshot(pull_free_image_cb, img_out, 1000ms, *cursor)) { case platf::capture_e::reinit: @@ -1281,6 +1441,8 @@ namespace portal { int n_dmabuf_infos; bool display_is_nvidia = false; // Track if display GPU is NVIDIA std::chrono::nanoseconds delay; + std::optional last_pts {}; + std::optional last_seq {}; std::uint64_t sequence {}; uint32_t framerate; static inline std::atomic previous_height {0};