From fc717086216c85a16e296881c53b4185e9050a41 Mon Sep 17 00:00:00 2001 From: Piotr Szeremeta Date: Thu, 21 May 2026 12:48:26 +0200 Subject: [PATCH 1/5] Add multivariant output support to Membrane.Transcoder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The output pad is now availability: :on_request, allowing a single transcoder bin to produce multiple independent output streams each with their own output_stream_format, transcoding_policy, and native_acceleration options. Single-output backward-compatible usage is unchanged — implicit via_out links inherit bin-level options and use the same internal child names, so existing tests and code require no modification. Multi-output pipelines use a Membrane.Tee.Parallel (new dep) placed between the connector and per-output transcoding chains within a single atomic spec, avoiding the race where the Tee could receive data before any output is connected. Co-Authored-By: Claude Sonnet 4.6 --- examples/multivariant_output.exs | 91 +++++++++++ lib/transcoder.ex | 196 ++++++++++++++++++----- lib/transcoder/audio.ex | 98 +++++++----- lib/transcoder/video.ex | 223 ++++++++++++++++---------- mix.exs | 1 + mix.lock | 1 + test/integration_test.exs | 265 +++++++++++++++++++++++++++++-- 7 files changed, 696 insertions(+), 179 deletions(-) create mode 100644 examples/multivariant_output.exs diff --git a/examples/multivariant_output.exs b/examples/multivariant_output.exs new file mode 100644 index 0000000..7a3f1ea --- /dev/null +++ b/examples/multivariant_output.exs @@ -0,0 +1,91 @@ +Mix.install([ + :membrane_file_plugin, + {:membrane_transcoder_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()} +]) + +defmodule Example do + alias Membrane.{H264, H265, VP8, RCPipeline} + require RCPipeline + require Membrane.Pad + + import Membrane.ChildrenSpec + + @doc """ + Transcodes a single H264 input into three output files simultaneously: + - output 0: H264 (annexb, repackaged — no re-encode) + - output 1: H265 (transcoded) + - output 2: VP8 (transcoded) + + Each output pad carries its own `output_stream_format`, `transcoding_policy`, and + `native_acceleration` options, all resolved independently inside the transcoder bin. + """ + def run(input_file, h264_output_file, h265_output_file, vp8_output_file) do + pipeline = RCPipeline.start_link!() + + spec = [ + child(%Membrane.File.Source{location: input_file}) + |> child(:parser, %H264.Parser{ + output_stream_structure: :annexb, + output_alignment: :au, + generate_best_effort_timestamps: %{framerate: {30, 1}} + }) + |> child(:transcoder, Membrane.Transcoder), + + # Output 0 — keep H264, just repackage (no re-encode) + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :annexb}, + transcoding_policy: :if_needed + ] + ) + |> child(:h264_sink, %Membrane.File.Sink{location: h264_output_file}), + + # Output 1 — transcode to H265 + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), + options: [ + output_stream_format: H265, + transcoding_policy: :always + ] + ) + |> child(:h265_sink, %Membrane.File.Sink{location: h265_output_file}), + + # Output 2 — transcode to VP8 + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 2), + options: [ + output_stream_format: VP8, + transcoding_policy: :always + ] + ) + |> child(:vp8_sink, %Membrane.File.Sink{location: vp8_output_file}) + ] + + RCPipeline.subscribe(pipeline, _any) + RCPipeline.exec_actions(pipeline, spec: spec) + RCPipeline.await_end_of_stream(pipeline, :h264_sink) + RCPipeline.await_end_of_stream(pipeline, :h265_sink) + RCPipeline.await_end_of_stream(pipeline, :vp8_sink) + RCPipeline.terminate(pipeline) + end +end + +File.mkdir_p!(Path.join(__DIR__, "tmp")) + +input = Path.join(__DIR__, "../test/fixtures/video.h264") +h264_out = Path.join(__DIR__, "tmp/multivariant_output.h264") +h265_out = Path.join(__DIR__, "tmp/multivariant_output.h265") +vp8_out = Path.join(__DIR__, "tmp/multivariant_output.ivf") + +IO.puts("Input: #{input}") +IO.puts("H264 output: #{h264_out}") +IO.puts("H265 output: #{h265_out}") +IO.puts("VP8 output: #{vp8_out}") +IO.puts("") +Example.run(input, h264_out, h265_out, vp8_out) + +IO.puts("Done.") +IO.puts(" #{h264_out} (#{File.stat!(h264_out).size} bytes)") +IO.puts(" #{h265_out} (#{File.stat!(h265_out).size} bytes)") +IO.puts(" #{vp8_out} (#{File.stat!(vp8_out).size} bytes)") diff --git a/lib/transcoder.ex b/lib/transcoder.ex index 411d8d5..9a1eb0a 100644 --- a/lib/transcoder.ex +++ b/lib/transcoder.ex @@ -29,15 +29,26 @@ defmodule Membrane.Transcoder do When the `membrane_vk_video_plugin` dependency is present and Vulkan hardware is available, H.264 encode/decode can be offloaded to the GPU by setting `native_acceleration: :if_available`. + + ## Usage + + child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Pad.ref(:output, 0), options: [output_stream_format: H264, width: 1280, height: 720]) + |> child(:hd_sink, Membrane.File.Sink), + get_child(:transcoder) + |> via_out(Pad.ref(:output, 1), options: [output_stream_format: H264, width: 640, height: 360]) + |> child(:sd_sink, Membrane.File.Sink) """ use Membrane.Bin require __MODULE__.Audio require __MODULE__.Video require Membrane.Logger + require Membrane.Pad alias __MODULE__.{Audio, Video} - alias Membrane.{AAC, Funnel, H264, H265, Opus, RawAudio, RawVideo, RemoteStream, VP8, VP9} + alias Membrane.{AAC, Funnel, H264, H265, Opus, Pad, RawAudio, RawVideo, RemoteStream, VP8, VP9} @typedoc """ Describes stream formats acceptable on the bin's input and output. @@ -82,22 +93,60 @@ defmodule Membrane.Transcoder do format.__struct__ == RemoteStream def_output_pad :output, - accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format) + availability: :on_request, + accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format), + options: [ + output_stream_format: [ + spec: + stream_format() + | stream_format_module() + | stream_format_tuple() + | stream_format_resolver() + | nil, + default: nil, + description: """ + Per-output stream format. Inherits from bin's `output_stream_format` option if nil. + """ + ], + transcoding_policy: [ + spec: + :always + | :if_needed + | :never + | (stream_format() -> :always | :if_needed | :never) + | nil, + default: nil, + description: """ + Per-output transcoding policy. Inherits from bin's `transcoding_policy` option if nil. + """ + ], + native_acceleration: [ + spec: :never | :if_available | nil, + default: nil, + description: """ + Per-output native acceleration setting. Inherits from bin's `native_acceleration` option if nil. + """ + ] + ] def_options output_stream_format: [ spec: stream_format() | stream_format_module() | stream_format_tuple() - | stream_format_resolver(), + | stream_format_resolver() + | nil, + default: nil, description: """ - An option specifying desired output format. + An option specifying the desired output format for all outputs. Can be either: * a struct being a Membrane stream format, * a module in which Membrane stream format struct is defined, * a function which receives input stream format as an input argument and is supposed to return the desired output stream format or its module. + + When using per-output `via_out` options, individual outputs can override this value. """ ], transcoding_policy: [ @@ -157,20 +206,17 @@ defmodule Membrane.Transcoder do @impl true def handle_init(_ctx, opts) do - spec = [ + spec = bin_input() |> maybe_plug_stream_format_changer(opts.assumed_input_stream_format) - |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}), - child(:output_funnel, Funnel) - |> bin_output() - ] + |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}) state = opts |> Map.from_struct() |> Map.merge(%{ input_stream_format: nil, - use_hardware_acceleration?: should_use_hardware_acceleration?(opts.native_acceleration) + output_specs: %{} }) {[spec: spec], state} @@ -203,29 +249,90 @@ defmodule Membrane.Transcoder do }) end + @impl true + def handle_pad_added(Pad.ref(:output, pad_id) = pad_ref, ctx, state) do + pad_opts = ctx.pads[pad_ref].options + + suffix = pad_id_to_suffix(pad_id) + funnel_name = :"funnel_#{suffix}" + + output_spec = %{ + output_stream_format: pad_opts.output_stream_format || state.output_stream_format, + transcoding_policy: pad_opts.transcoding_policy || state.transcoding_policy, + native_acceleration: pad_opts.native_acceleration || state.native_acceleration, + funnel_name: funnel_name, + suffix: suffix, + pad_id: pad_id + } + + spec = child(funnel_name, Funnel) |> bin_output(pad_ref) + + {[spec: spec], %{state | output_specs: Map.put(state.output_specs, pad_ref, output_spec)}} + end + + @impl true + def handle_pad_removed(Pad.ref(:output, _id) = pad_ref, _ctx, state) do + {[], %{state | output_specs: Map.delete(state.output_specs, pad_ref)}} + end + @impl true def handle_child_notification({:stream_format, _pad, format}, :connector, _ctx, state) when state.input_stream_format == nil do - state = - %{state | input_stream_format: format} - |> resolve_output_stream_format() - - state = - with %{transcoding_policy: f} when is_function(f) <- state do - %{state | transcoding_policy: f.(format)} + state = %{state | input_stream_format: format} + + output_specs_list = Map.to_list(state.output_specs) + single_output? = length(output_specs_list) == 1 + + specs = + if single_output? do + [{_pad_ref, output_spec}] = output_specs_list + use_hw? = should_use_hardware_acceleration?(output_spec.native_acceleration) + resolved_format = resolve_output_stream_format(output_spec.output_stream_format, format) + + transcoding_policy = resolve_transcoding_policy(output_spec.transcoding_policy, format) + + [ + get_child(:connector) + |> plug_transcoding( + format, + resolved_format, + transcoding_policy, + use_hw?, + output_spec.suffix + ) + |> get_child(output_spec.funnel_name) + ] + else + # Build tee and all output pipelines in a single spec so the tee + # is never in a state where data flows through it without outputs connected. + tee_spec = get_child(:connector) |> child(:tee, Membrane.Tee.Parallel) + + output_pipeline_specs = + Enum.map(output_specs_list, fn {_pad_ref, output_spec} -> + use_hw? = should_use_hardware_acceleration?(output_spec.native_acceleration) + + resolved_format = + resolve_output_stream_format(output_spec.output_stream_format, format) + + transcoding_policy = + resolve_transcoding_policy(output_spec.transcoding_policy, format) + + get_child(:tee) + |> via_out(Pad.ref(:output, output_spec.pad_id)) + |> plug_transcoding( + format, + resolved_format, + transcoding_policy, + use_hw?, + output_spec.suffix + ) + |> get_child(output_spec.funnel_name) + end) + + [tee_spec | output_pipeline_specs] end - spec = - get_child(:connector) - |> plug_transcoding( - format, - state.output_stream_format, - state.transcoding_policy, - state.use_hardware_acceleration? - ) - |> get_child(:output_funnel) - - {[spec: spec], state} + {[spec: specs], state} end @impl true @@ -249,20 +356,24 @@ defmodule Membrane.Transcoder do {[], state} end - defp resolve_output_stream_format(state) do - case state.output_stream_format do + defp resolve_transcoding_policy(f, format) when is_function(f), do: f.(format) + defp resolve_transcoding_policy(policy, _format), do: policy + + defp resolve_output_stream_format(nil, input_format), do: input_format + + defp resolve_output_stream_format(output_stream_format, input_format) do + case output_stream_format do format when is_struct(format) -> - state + format module when is_atom(module) -> - %{state | output_stream_format: struct(module)} + struct(module) {module, opts} when is_atom(module) and is_list(opts) -> - %{state | output_stream_format: struct(module, opts)} + struct(module, opts) resolver when is_function(resolver) -> - %{state | output_stream_format: resolver.(state.input_stream_format)} - |> resolve_output_stream_format() + resolve_output_stream_format(resolver.(input_format), input_format) end end @@ -271,11 +382,12 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when Audio.is_audio_format(input_format) do builder - |> Audio.plug_audio_transcoding(input_format, output_format, transcoding_policy) + |> Audio.plug_audio_transcoding(input_format, output_format, transcoding_policy, suffix) end defp plug_transcoding( @@ -283,7 +395,8 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) when Video.is_video_format(input_format) do builder @@ -291,7 +404,12 @@ defmodule Membrane.Transcoder do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end + + defp pad_id_to_suffix(id) when is_integer(id), do: "output_#{id}" + defp pad_id_to_suffix(id) when is_atom(id), do: "output_#{id}" + defp pad_id_to_suffix(id), do: "output_#{:erlang.phash2(id)}" end diff --git a/lib/transcoder/audio.ex b/lib/transcoder/audio.ex index 8833f6f..e33993d 100644 --- a/lib/transcoder/audio.ex +++ b/lib/transcoder/audio.ex @@ -64,18 +64,26 @@ defmodule Membrane.Transcoder.Audio do ChildrenSpec.builder(), audio_stream_format() | RemoteStream.t(), audio_stream_format(), - boolean() + :always | :if_needed | :never, + String.t() | nil ) :: ChildrenSpec.builder() - def plug_audio_transcoding(builder, input_format, output_format, transcoding_policy) + def plug_audio_transcoding( + builder, + input_format, + output_format, + transcoding_policy, + suffix \\ nil + ) when is_audio_format(input_format) and is_audio_format(output_format) do - do_plug_audio_transcoding(builder, input_format, output_format, transcoding_policy) + do_plug_audio_transcoding(builder, input_format, output_format, transcoding_policy, suffix) end defp do_plug_audio_transcoding( builder, %format_module{}, %format_module{}, - transcoding_policy + transcoding_policy, + _suffix ) when transcoding_policy in [:if_needed, :never] do Membrane.Logger.debug(""" @@ -89,63 +97,70 @@ defmodule Membrane.Transcoder.Audio do builder, %RemoteStream{content_format: Opus}, %Opus{}, - transcoding_policy + transcoding_policy, + suffix ) when transcoding_policy in [:if_needed, :never] do - builder |> child(:opus_parser, Opus.Parser) + builder |> child(child_name(suffix, :opus_parser), Opus.Parser) end - defp do_plug_audio_transcoding(_builder, input_format, output_format, :never) do + defp do_plug_audio_transcoding(_builder, input_format, output_format, :never, _suffix) do raise """ Cannot convert input format #{inspect(input_format)} to output format #{inspect(output_format)} \ with :transcoding_policy option set to :never. """ end - defp do_plug_audio_transcoding(builder, input_format, output_format, _transcoding_policy) do + defp do_plug_audio_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + suffix + ) do builder - |> maybe_plug_parser(input_format) - |> maybe_plug_decoder(input_format) - |> maybe_plug_resampler(input_format, output_format) - |> maybe_plug_encoder(output_format) + |> maybe_plug_parser(input_format, suffix) + |> maybe_plug_decoder(input_format, suffix) + |> maybe_plug_resampler(input_format, output_format, suffix) + |> maybe_plug_encoder(output_format, suffix) end - defp maybe_plug_parser(builder, %AAC{}) do - builder |> child(:aac_parser, AAC.Parser) + defp maybe_plug_parser(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_parser), AAC.Parser) end - defp maybe_plug_parser(builder, _input_format) do + defp maybe_plug_parser(builder, _input_format, _suffix) do builder end - defp maybe_plug_decoder(builder, %Opus{}) do - builder |> child(:opus_decoder, Opus.Decoder) + defp maybe_plug_decoder(builder, %Opus{}, suffix) do + builder |> child(child_name(suffix, :opus_decoder), Opus.Decoder) end - defp maybe_plug_decoder(builder, %RemoteStream{content_format: Opus, type: :packetized}) do - builder |> child(:opus_decoder, Opus.Decoder) + defp maybe_plug_decoder(builder, %RemoteStream{content_format: Opus, type: :packetized}, suffix) do + builder |> child(child_name(suffix, :opus_decoder), Opus.Decoder) end - defp maybe_plug_decoder(builder, %AAC{}) do - builder |> child(:aac_decoder, AAC.FDK.Decoder) + defp maybe_plug_decoder(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_decoder), AAC.FDK.Decoder) end - defp maybe_plug_decoder(builder, %MPEGAudio{}) do - builder |> child(:mp3_decoder, Membrane.MP3.MAD.Decoder) + defp maybe_plug_decoder(builder, %MPEGAudio{}, suffix) do + builder |> child(child_name(suffix, :mp3_decoder), Membrane.MP3.MAD.Decoder) end - defp maybe_plug_decoder(builder, %RemoteStream{content_format: MPEGAudio}) do - builder |> child(:mp3_decoder, Membrane.MP3.MAD.Decoder) + defp maybe_plug_decoder(builder, %RemoteStream{content_format: MPEGAudio}, suffix) do + builder |> child(child_name(suffix, :mp3_decoder), Membrane.MP3.MAD.Decoder) end - defp maybe_plug_decoder(builder, %RawAudio{}) do + defp maybe_plug_decoder(builder, %RawAudio{}, _suffix) do builder end - defp maybe_plug_resampler(builder, input_format, %Opus{}) + defp maybe_plug_resampler(builder, input_format, %Opus{}, suffix) when not is_opus_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{ sample_format: :s16le, sample_rate: 48_000, @@ -154,10 +169,10 @@ defmodule Membrane.Transcoder.Audio do }) end - defp maybe_plug_resampler(builder, input_format, %AAC{}) + defp maybe_plug_resampler(builder, input_format, %AAC{}, suffix) when not is_aac_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{ sample_format: :s16le, sample_rate: 44_100, @@ -166,31 +181,34 @@ defmodule Membrane.Transcoder.Audio do }) end - defp maybe_plug_resampler(builder, input_format, %MPEGAudio{}) + defp maybe_plug_resampler(builder, input_format, %MPEGAudio{}, suffix) when not is_mp3_compliant(input_format) do builder - |> child(:resampler, %Membrane.FFmpeg.SWResample.Converter{ + |> child(child_name(suffix, :resampler), %Membrane.FFmpeg.SWResample.Converter{ output_stream_format: %RawAudio{sample_rate: 44_100, sample_format: :s32le, channels: 2} }) end - defp maybe_plug_resampler(builder, _input_format, _output_format) do + defp maybe_plug_resampler(builder, _input_format, _output_format, _suffix) do builder end - defp maybe_plug_encoder(builder, %Opus{}) do - builder |> child(:opus_encoder, Opus.Encoder) + defp maybe_plug_encoder(builder, %Opus{}, suffix) do + builder |> child(child_name(suffix, :opus_encoder), Opus.Encoder) end - defp maybe_plug_encoder(builder, %AAC{}) do - builder |> child(:aac_encoder, AAC.FDK.Encoder) + defp maybe_plug_encoder(builder, %AAC{}, suffix) do + builder |> child(child_name(suffix, :aac_encoder), AAC.FDK.Encoder) end - defp maybe_plug_encoder(builder, %MPEGAudio{}) do - builder |> child(:mp3_encoder, Membrane.MP3.Lame.Encoder) + defp maybe_plug_encoder(builder, %MPEGAudio{}, suffix) do + builder |> child(child_name(suffix, :mp3_encoder), Membrane.MP3.Lame.Encoder) end - defp maybe_plug_encoder(builder, %RawAudio{}) do + defp maybe_plug_encoder(builder, %RawAudio{}, _suffix) do builder end + + defp child_name(nil, base), do: base + defp child_name(suffix, base), do: :"#{suffix}_#{base}" end diff --git a/lib/transcoder/video.ex b/lib/transcoder/video.ex index 5e2103a..2354675 100644 --- a/lib/transcoder/video.ex +++ b/lib/transcoder/video.ex @@ -34,14 +34,16 @@ defmodule Membrane.Transcoder.Video do video_stream_format(), video_stream_format(), :always | :if_needed | :never, - boolean() + boolean(), + String.t() | nil ) :: ChildrenSpec.builder() def plug_video_transcoding( builder, input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix \\ nil ) when is_video_format(input_format) and is_video_format(output_format) do do_plug_video_transcoding( @@ -49,7 +51,8 @@ defmodule Membrane.Transcoder.Video do input_format, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end @@ -58,7 +61,8 @@ defmodule Membrane.Transcoder.Video do %RemoteStream{content_format: h26x}, output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) when h26x in [H264, H265] do do_plug_video_transcoding( @@ -66,7 +70,8 @@ defmodule Membrane.Transcoder.Video do struct!(h26x), output_format, transcoding_policy, - use_hardware_acceleration? + use_hardware_acceleration?, + suffix ) end @@ -75,11 +80,12 @@ defmodule Membrane.Transcoder.Video do %H264{}, %H264{} = output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when transcoding_policy in [:if_needed, :never] do builder - |> child(:h264_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_parser), %H264.Parser{ output_stream_structure: stream_structure_type(output_format), output_alignment: output_format.alignment }) @@ -90,11 +96,12 @@ defmodule Membrane.Transcoder.Video do %H265{}, %H265{} = output_format, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + suffix ) when transcoding_policy in [:if_needed, :never] do builder - |> child(:h265_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_parser), %H265.Parser{ output_stream_structure: stream_structure_type(output_format), output_alignment: output_format.alignment }) @@ -105,10 +112,11 @@ defmodule Membrane.Transcoder.Video do %RawVideo{} = input_format, %RawVideo{} = output_format, _transcoding_policy, - true + true, + suffix ) do builder - |> maybe_plug_swscale_converter_vulkan(input_format, output_format) + |> maybe_plug_swscale_converter_vulkan(input_format, output_format, suffix) end defp do_plug_video_transcoding( @@ -116,10 +124,11 @@ defmodule Membrane.Transcoder.Video do %RawVideo{} = input_format, %RawVideo{} = output_format, _transcoding_policy, - false + false, + suffix ) do builder - |> maybe_plug_swscale_converter(input_format, output_format) + |> maybe_plug_swscale_converter(input_format, output_format, suffix) end defp do_plug_video_transcoding( @@ -127,7 +136,8 @@ defmodule Membrane.Transcoder.Video do %format_module{}, %format_module{}, transcoding_policy, - _use_hardware_acceleration? + _use_hardware_acceleration?, + _suffix ) when transcoding_policy in [:if_needed, :never] do Membrane.Logger.debug(""" @@ -142,7 +152,8 @@ defmodule Membrane.Transcoder.Video do input_format, output_format, :never, - _use_hardware_acceleration? + _use_hardware_acceleration?, + _suffix ), do: raise(""" @@ -150,101 +161,133 @@ defmodule Membrane.Transcoder.Video do with :transcoding_policy option set to :never. """) - defp do_plug_video_transcoding(builder, input_format, output_format, _transcoding_policy, true) do + defp do_plug_video_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + true, + suffix + ) do builder - |> maybe_plug_parser_and_decoder_vulkan(input_format) - |> maybe_plug_swscale_converter_vulkan(input_format, output_format) - |> maybe_plug_encoder_and_parser_vulkan(output_format) + |> maybe_plug_parser_and_decoder_vulkan(input_format, suffix) + |> maybe_plug_swscale_converter_vulkan(input_format, output_format, suffix) + |> maybe_plug_encoder_and_parser_vulkan(output_format, suffix) end - defp do_plug_video_transcoding(builder, input_format, output_format, _transcoding_policy, false) do + defp do_plug_video_transcoding( + builder, + input_format, + output_format, + _transcoding_policy, + false, + suffix + ) do builder - |> maybe_plug_parser_and_decoder(input_format) - |> maybe_plug_swscale_converter(input_format, output_format) - |> maybe_plug_encoder_and_parser(output_format) + |> maybe_plug_parser_and_decoder(input_format, suffix) + |> maybe_plug_swscale_converter(input_format, output_format, suffix) + |> maybe_plug_encoder_and_parser(output_format, suffix) end # VK-specific decoder: child name :vk_h264_decoder distinguishes it from FFmpeg's :h264_decoder - defp maybe_plug_parser_and_decoder_vulkan(builder, %H264{}) do + defp maybe_plug_parser_and_decoder_vulkan(builder, %H264{}, suffix) do builder - |> child(:h264_input_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_input_parser), %H264.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:vk_h264_decoder, Membrane.VKVideo.Decoder) + |> child(child_name(suffix, :vk_h264_decoder), Membrane.VKVideo.Decoder) end - defp maybe_plug_parser_and_decoder_vulkan(builder, format), - do: maybe_plug_parser_and_decoder(builder, format) + defp maybe_plug_parser_and_decoder_vulkan(builder, format, suffix), + do: maybe_plug_parser_and_decoder(builder, format, suffix) - defp maybe_plug_parser_and_decoder(builder, %H264{}) do + defp maybe_plug_parser_and_decoder(builder, %H264{}, suffix) do builder - |> child(:h264_input_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_input_parser), %H264.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:h264_decoder, %H264.FFmpeg.Decoder{}) + |> child(child_name(suffix, :h264_decoder), %H264.FFmpeg.Decoder{}) end - defp maybe_plug_parser_and_decoder(builder, %H265{}) do + defp maybe_plug_parser_and_decoder(builder, %H265{}, suffix) do builder - |> child(:h265_input_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_input_parser), %H265.Parser{ output_stream_structure: :annexb, output_alignment: :au }) - |> child(:h265_decoder, %H265.FFmpeg.Decoder{}) + |> child(child_name(suffix, :h265_decoder), %H265.FFmpeg.Decoder{}) end - defp maybe_plug_parser_and_decoder(builder, %vpx{}) when vpx in [VP8, VP9] do + defp maybe_plug_parser_and_decoder(builder, %vpx{}, suffix) when vpx in [VP8, VP9] do decoder_module = Module.concat(vpx, Decoder) - builder |> child(:vp8_decoder, decoder_module) + builder |> child(child_name(suffix, :vp8_decoder), decoder_module) end - defp maybe_plug_parser_and_decoder(builder, %RemoteStream{ - content_format: vpx, - type: :packetized - }) + defp maybe_plug_parser_and_decoder( + builder, + %RemoteStream{content_format: vpx, type: :packetized}, + suffix + ) when vpx in [VP8, VP9] do decoder_module = Module.concat(vpx, Decoder) - builder |> child(:vp8_decoder, decoder_module) + builder |> child(child_name(suffix, :vp8_decoder), decoder_module) end - defp maybe_plug_parser_and_decoder(builder, %RawVideo{}), do: builder - - # VK decoder outputs NV12; these clauses handle the NV12 <-> other-format conversion - # when mixing VK decode/encode with different pixel formats. + defp maybe_plug_parser_and_decoder(builder, %RawVideo{}, _suffix), do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{pixel_format: nil}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %H264{}, + %RawVideo{pixel_format: nil}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{pixel_format: :NV12}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %H264{}, + %RawVideo{pixel_format: :NV12}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{} = output_format) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: output_format.pixel_format}) + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %RawVideo{} = output_format, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{ + format: output_format.pixel_format + }) end - defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{pixel_format: :NV12}, %H264{}), - do: builder + defp maybe_plug_swscale_converter_vulkan( + builder, + %RawVideo{pixel_format: :NV12}, + %H264{}, + _suffix + ), + do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{}, %H264{}) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :NV12}) + defp maybe_plug_swscale_converter_vulkan(builder, %RawVideo{}, %H264{}, suffix) do + builder |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :NV12}) end - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %H264{}), do: builder + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, %H264{}, _suffix), do: builder - defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, _output_format) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :I420}) + defp maybe_plug_swscale_converter_vulkan(builder, %H264{}, _output_format, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :I420}) end - defp maybe_plug_swscale_converter_vulkan(builder, _input_format, %H264{}) do - builder |> child(:raw_video_converter, %SWScale.Converter{format: :NV12}) + defp maybe_plug_swscale_converter_vulkan(builder, _input_format, %H264{}, suffix) do + builder + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :NV12}) end - defp maybe_plug_swscale_converter_vulkan(builder, input_format, output_format), - do: maybe_plug_swscale_converter(builder, input_format, output_format) + defp maybe_plug_swscale_converter_vulkan(builder, input_format, output_format, suffix), + do: maybe_plug_swscale_converter(builder, input_format, output_format, suffix) - defp maybe_plug_swscale_converter(builder, input_format, %RawVideo{} = output_format) do + defp maybe_plug_swscale_converter(builder, input_format, %RawVideo{} = output_format, suffix) do case input_format do _any when output_format.pixel_format == nil -> builder @@ -254,11 +297,14 @@ defmodule Membrane.Transcoder.Video do _input_format -> builder - |> child(:raw_video_converter, %SWScale.Converter{format: output_format.pixel_format}) + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{ + format: output_format.pixel_format + }) end end - defp maybe_plug_swscale_converter(builder, input_format, %h26x{}) when h26x in [H264, H265] do + defp maybe_plug_swscale_converter(builder, input_format, %h26x{}, suffix) + when h26x in [H264, H265] do case input_format do %RawVideo{pixel_format: pixel_format} when pixel_format in [:I420, :I422] -> builder @@ -268,43 +314,43 @@ defmodule Membrane.Transcoder.Video do _input_format -> builder - |> child(:raw_video_converter, %SWScale.Converter{format: :I420}) + |> child(child_name(suffix, :raw_video_converter), %SWScale.Converter{format: :I420}) end end - defp maybe_plug_swscale_converter(builder, _input_format, _output_format), do: builder + defp maybe_plug_swscale_converter(builder, _input_format, _output_format, _suffix), do: builder - defp maybe_plug_encoder_and_parser_vulkan(builder, %H264{} = h264) do + defp maybe_plug_encoder_and_parser_vulkan(builder, %H264{} = h264, suffix) do builder - |> child(:vk_h264_encoder, Membrane.VKVideo.Encoder) - |> child(:h264_output_parser, %H264.Parser{ + |> child(child_name(suffix, :vk_h264_encoder), Membrane.VKVideo.Encoder) + |> child(child_name(suffix, :h264_output_parser), %H264.Parser{ output_stream_structure: stream_structure_type(h264), output_alignment: h264.alignment }) end - defp maybe_plug_encoder_and_parser_vulkan(builder, format), - do: maybe_plug_encoder_and_parser(builder, format) + defp maybe_plug_encoder_and_parser_vulkan(builder, format, suffix), + do: maybe_plug_encoder_and_parser(builder, format, suffix) - defp maybe_plug_encoder_and_parser(builder, %H264{} = h264) do + defp maybe_plug_encoder_and_parser(builder, %H264{} = h264, suffix) do builder - |> child(:h264_encoder, %H264.FFmpeg.Encoder{preset: :ultrafast}) - |> child(:h264_output_parser, %H264.Parser{ + |> child(child_name(suffix, :h264_encoder), %H264.FFmpeg.Encoder{preset: :ultrafast}) + |> child(child_name(suffix, :h264_output_parser), %H264.Parser{ output_stream_structure: stream_structure_type(h264), output_alignment: h264.alignment }) end - defp maybe_plug_encoder_and_parser(builder, %H265{} = h265) do + defp maybe_plug_encoder_and_parser(builder, %H265{} = h265, suffix) do builder - |> child(:h265_encoder, %H265.FFmpeg.Encoder{preset: :ultrafast}) - |> child(:h265_output_parser, %H265.Parser{ + |> child(child_name(suffix, :h265_encoder), %H265.FFmpeg.Encoder{preset: :ultrafast}) + |> child(child_name(suffix, :h265_output_parser), %H265.Parser{ output_stream_structure: stream_structure_type(h265), output_alignment: h265.alignment }) end - defp maybe_plug_encoder_and_parser(builder, %VP8{}) do + defp maybe_plug_encoder_and_parser(builder, %VP8{}, suffix) do cpu_quota = :erlang.system_info(:cpu_quota) number_of_threads = @@ -312,10 +358,14 @@ defmodule Membrane.Transcoder.Video do do: cpu_quota, else: :erlang.system_info(:logical_processors_available) - builder |> child(:vp8_encoder, %VP8.Encoder{g_threads: number_of_threads, cpu_used: 15}) + builder + |> child(child_name(suffix, :vp8_encoder), %VP8.Encoder{ + g_threads: number_of_threads, + cpu_used: 15 + }) end - defp maybe_plug_encoder_and_parser(builder, %VP9{}) do + defp maybe_plug_encoder_and_parser(builder, %VP9{}, suffix) do cpu_quota = :erlang.system_info(:cpu_quota) number_of_threads = @@ -323,10 +373,14 @@ defmodule Membrane.Transcoder.Video do do: cpu_quota, else: :erlang.system_info(:logical_processors_available) - builder |> child(:vp8_encoder, %VP9.Encoder{g_threads: number_of_threads, cpu_used: 15}) + builder + |> child(child_name(suffix, :vp8_encoder), %VP9.Encoder{ + g_threads: number_of_threads, + cpu_used: 15 + }) end - defp maybe_plug_encoder_and_parser(builder, %RawVideo{}), do: builder + defp maybe_plug_encoder_and_parser(builder, %RawVideo{}, _suffix), do: builder defp stream_structure_type(%h26x{stream_structure: stream_structure}) when h26x in [H264, H265] do @@ -335,4 +389,7 @@ defmodule Membrane.Transcoder.Video do {type, _dcr} when type in [:avc1, :avc3, :hvc1, :hev1] -> type end end + + defp child_name(nil, base), do: base + defp child_name(suffix, base), do: :"#{suffix}_#{base}" end diff --git a/mix.exs b/mix.exs index 79227d8..e5b1ffc 100644 --- a/mix.exs +++ b/mix.exs @@ -39,6 +39,7 @@ defmodule Membrane.Transcoder.Plugin.Mixfile do [ {:membrane_vk_video_plugin, "~> 0.2.0", optional: true}, {:membrane_core, "~> 1.2 and >= 1.2.1"}, + {:membrane_tee_plugin, "~> 0.12.0"}, {:membrane_opus_plugin, "~> 0.20.3"}, {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_aac_fdk_plugin, "~> 0.18.0"}, diff --git a/mix.lock b/mix.lock index 2941492..0916b6b 100644 --- a/mix.lock +++ b/mix.lock @@ -44,6 +44,7 @@ "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, "membrane_raw_audio_parser_plugin": {:hex, :membrane_raw_audio_parser_plugin, "0.4.0", "7a1e53b68a221d00e47fb5d3c7e29200dfe8f7bc0862e69000b61c6562093acc", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}], "hexpm", "ff8d3fba45b1c2814b68d49878f19d2c1ad1147b53f606b48b6b67068435dcd0"}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.4.3", "61b2f6afdffa43c25de5a433c3a3bed933144be0f753b6fc8c6a9f255382eaff", [:mix], [{:image, ">= 0.54.0", [hex: :image, repo: "hexpm", optional: true]}], "hexpm", "11739a7d956d037f3ee109f06f075f1a99fea000c778628ac58ed28637e4c637"}, + "membrane_tee_plugin": {:hex, :membrane_tee_plugin, "0.12.0", "f94989b4080ef4b7937d74c1a14d3379577c7bd4c6d06e5a2bb41c351ad604d4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "0d61c9ed5e68e5a75d54200e1c6df5739c0bcb52fee0974183ad72446a179887"}, "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "membrane_vk_video_plugin": {:hex, :membrane_vk_video_plugin, "0.2.1", "7304ae294ca44279e8dcc5e3206ed31157467e9d65b12cb7e4db246fb1d58dd4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.4.2", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:rustler, "~> 0.37.1", [hex: :rustler, repo: "hexpm", optional: false]}], "hexpm", "19bdae7d8bbcdc110d4aa991d5cb7154a743a985a30842cdc24012b402258682"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.5.0", "a589c20bb9d97ddc9b717684d00cefc84e2500ce63a0c33c4b9618d9b2f9b2ea", [:mix], [], "hexpm", "d29e0dae4bebc6838e82e031c181fe626d168c687e4bc617c1d0772bdeed19d5"}, diff --git a/test/integration_test.exs b/test/integration_test.exs index 967e17b..65ed8a8 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -3,6 +3,8 @@ defmodule Membrane.Transcoder.IntegrationTest do import Membrane.Testing.Assertions import Membrane.ChildrenSpec + require Membrane.Pad + alias Membrane.{AAC, H264, H265, MPEGAudio, Opus, RawAudio, RawVideo, VP8, VP9} alias Membrane.Testing alias Membrane.Transcoder.Support.Preprocessors @@ -59,10 +61,11 @@ defmodule Membrane.Transcoder.IntegrationTest do location: Path.join("./test/fixtures", unquote(test_case.input_file)) }) |> then(unquote(test_case.preprocess)) - |> child(%Membrane.Transcoder{ + |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: unquote(test_case.output_format), assumed_input_stream_format: override_input_stream_format }) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) Testing.Pipeline.execute_actions(pid, spec: spec) @@ -115,6 +118,27 @@ defmodule Membrane.Transcoder.IntegrationTest do output_stream_format: test_case.output_format, native_acceleration: native_acceleration }) + |> via_out(Membrane.Pad.ref(:output, 0)) + |> child(:sink, %Membrane.File.Sink{location: tmp_path}) + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink, :input, 30_000) + Testing.Pipeline.terminate(pid) + + bytes = File.read!(tmp_path) + File.rm(tmp_path) + bytes + end + + defp transcode_to_bytes(input_file, preprocess, output_format) do + tmp_path = Path.join(System.tmp_dir!(), "ref_#{:erlang.unique_integer([:positive])}") + pid = Testing.Pipeline.start_link_supervised!() + + spec = + child(%Membrane.File.Source{location: input_file}) + |> then(preprocess) + |> child(:transcoder, %Membrane.Transcoder{output_stream_format: output_format}) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, %Membrane.File.Sink{location: tmp_path}) Testing.Pipeline.execute_actions(pid, spec: spec) @@ -173,13 +197,16 @@ defmodule Membrane.Transcoder.IntegrationTest do %AAC{} -> format end - spec = + spec = [ child(:source, %FormatSource{format: format}) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: output_format, transcoding_policy: transcoding_policy - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) @@ -189,8 +216,12 @@ defmodule Membrane.Transcoder.IntegrationTest do %H264{} -> [:h264_encoder, :h264_decoder] %AAC{} -> [:aac_encoder, :aac_decoder] end - |> Enum.each(fn child_name -> - get_child_result = Testing.Pipeline.get_child_pid(pipeline, [:transcoder, child_name]) + |> Enum.each(fn base_name -> + get_child_result = + Testing.Pipeline.get_child_pid( + pipeline, + [:transcoder, :"output_0_#{base_name}"] + ) if transcoding_policy == :always do assert {:ok, child_pid} = get_child_result @@ -211,6 +242,7 @@ defmodule Membrane.Transcoder.IntegrationTest do output_stream_format: VP8, transcoding_policy: :never }) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) {:ok, supervisor, pipeline} = Testing.Pipeline.start(spec: []) @@ -228,27 +260,220 @@ defmodule Membrane.Transcoder.IntegrationTest do test "uses FFmpeg decoder and encoder when native_acceleration is :never" do pid = Testing.Pipeline.start_link_supervised!() - spec = + spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) |> then(&Preprocessors.parse_h264/1) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: H264, transcoding_policy: :always, native_acceleration: :never - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] Testing.Pipeline.execute_actions(pid, spec: spec) assert_sink_stream_format(pid, :sink, _format) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_decoder]) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_encoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + + assert {:error, :child_not_found} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + + assert {:error, :child_not_found} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) + + Testing.Pipeline.terminate(pid) + end + + test "multivariant output: two outputs with different formats from H264 input" do + ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) + ref_h265 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H265) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + h264_tmp = tmp.() + h265_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: H264]) + |> child(:sink_h264, %Membrane.File.Sink{location: h264_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: H265]) + |> child(:sink_h265, %Membrane.File.Sink{location: h265_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_h264, :input, 30_000) + assert_end_of_stream(pid, :sink_h265, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_h264 = File.read!(h264_tmp) + mv_h265 = File.read!(h265_tmp) + File.rm(h264_tmp) + File.rm(h265_tmp) + + assert mv_h264 == ref_h264 + assert mv_h265 == ref_h265 + end + + test "multivariant output: two video outputs with different resolutions" do + ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) + ref_vp8 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, VP8) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + h264_tmp = tmp.() + vp8_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: H264]) + |> child(:sink_h264, %Membrane.File.Sink{location: h264_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: VP8]) + |> child(:sink_vp8, %Membrane.File.Sink{location: vp8_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_h264, :input, 30_000) + assert_end_of_stream(pid, :sink_vp8, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_h264 = File.read!(h264_tmp) + mv_vp8 = File.read!(vp8_tmp) + File.rm(h264_tmp) + File.rm(vp8_tmp) + + assert mv_h264 == ref_h264 + assert mv_vp8 == ref_vp8 + end + + test "multivariant output: per-output transcoding_policy is respected" do + pid = Testing.Pipeline.start_link_supervised!() + + spec = [ + child(:source, %FormatSource{format: %H264{alignment: :au, stream_structure: :annexb}}) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :avc1}, + transcoding_policy: :always + ] + ) + |> child(:sink_always, Testing.Sink), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), + options: [ + output_stream_format: %H264{alignment: :au, stream_structure: :avc1}, + transcoding_policy: :if_needed + ] + ) + |> child(:sink_if_needed, Testing.Sink) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + + Process.sleep(500) + + # :always output should have encoder/decoder with "output_0_" prefix + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + + # :if_needed output should NOT have encoder/decoder (same format type) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_decoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_encoder]) + + Testing.Pipeline.terminate(pid) + end + + test "multivariant output: three audio outputs with different formats" do + ref_aac = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, AAC) + ref_opus = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, Opus) + + ref_mp3 = + transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, MPEGAudio) + + pid = Testing.Pipeline.start_link_supervised!() + tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end + aac_tmp = tmp.() + opus_tmp = tmp.() + mp3_tmp = tmp.() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/audio.aac"}) + |> then(&Preprocessors.parse_aac/1) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), options: [output_stream_format: AAC]) + |> child(:sink_aac, %Membrane.File.Sink{location: aac_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: Opus]) + |> child(:sink_opus, %Membrane.File.Sink{location: opus_tmp}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 2), options: [output_stream_format: MPEGAudio]) + |> child(:sink_mp3, %Membrane.File.Sink{location: mp3_tmp}) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + assert_end_of_stream(pid, :sink_aac, :input, 30_000) + assert_end_of_stream(pid, :sink_opus, :input, 30_000) + assert_end_of_stream(pid, :sink_mp3, :input, 30_000) + Testing.Pipeline.terminate(pid) + + mv_aac = File.read!(aac_tmp) + mv_opus = File.read!(opus_tmp) + mv_mp3 = File.read!(mp3_tmp) + File.rm(aac_tmp) + File.rm(opus_tmp) + File.rm(mp3_tmp) + + assert mv_aac == ref_aac + assert mv_opus == ref_opus + assert mv_mp3 == ref_mp3 + end + + test "multivariant output: per-output options override bin-level options" do + pid = Testing.Pipeline.start_link_supervised!() + + spec = [ + child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) + |> then(&Preprocessors.parse_h264/1) + |> child(:transcoder, %Membrane.Transcoder{output_stream_format: H264}), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) + |> child(:sink_default, Testing.Sink), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 1), options: [output_stream_format: H265]) + |> child(:sink_override, Testing.Sink) + ] + + Testing.Pipeline.execute_actions(pid, spec: spec) + + assert_sink_stream_format(pid, :sink_default, format_default, 10_000) + assert format_default.__struct__ == H264 + + assert_sink_stream_format(pid, :sink_override, format_override, 10_000) + assert format_override.__struct__ == H265 Testing.Pipeline.terminate(pid) end @@ -257,27 +482,33 @@ defmodule Membrane.Transcoder.IntegrationTest do test "uses VKVideo decoder and encoder when native_acceleration is :if_available" do pid = Testing.Pipeline.start_link_supervised!() - spec = + spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) |> then(&Preprocessors.parse_h264/1) |> child(:transcoder, %Membrane.Transcoder{ output_stream_format: H264, transcoding_policy: :always, native_acceleration: :if_available - }) + }), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0)) |> child(:sink, Testing.Sink) + ] Testing.Pipeline.execute_actions(pid, spec: spec) assert_sink_stream_format(pid, :sink, _format) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_decoder]) - assert {:ok, _pid} = Testing.Pipeline.get_child_pid(pid, [:transcoder, :vk_h264_encoder]) + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + + assert {:ok, _pid} = + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) Testing.Pipeline.terminate(pid) end From be2620ab72be562c6fc45797b98deb629696de17 Mon Sep 17 00:00:00 2001 From: Piotr Szeremeta Date: Tue, 26 May 2026 08:29:24 +0200 Subject: [PATCH 2/5] Apply review suggestions - use strings as children names - update examples --- examples/vp8_to_h264.exs | 25 ++++++++++++++++--------- lib/transcoder.ex | 8 ++++---- lib/transcoder/audio.ex | 2 +- lib/transcoder/video.ex | 2 +- test/integration_test.exs | 26 +++++++++++++------------- 5 files changed, 35 insertions(+), 28 deletions(-) diff --git a/examples/vp8_to_h264.exs b/examples/vp8_to_h264.exs index a7eb208..9250562 100644 --- a/examples/vp8_to_h264.exs +++ b/examples/vp8_to_h264.exs @@ -20,6 +20,7 @@ Mix.install( defmodule Example do alias Membrane.{H264, RCPipeline} require RCPipeline + require Membrane.Pad import Membrane.ChildrenSpec @@ -27,15 +28,21 @@ defmodule Example do pipeline = RCPipeline.start_link!() spec = - child(%Membrane.File.Source{ - location: input_file - }) - |> child(:deserializer, Membrane.IVF.Deserializer) - |> child(:transcoder, %Membrane.Transcoder{ - output_stream_format: H264, - native_acceleration: native_acceleration - }) - |> child(:sink, %Membrane.File.Sink{location: output_file}) + [ + child(%Membrane.File.Source{ + location: input_file + }) + |> child(:deserializer, Membrane.IVF.Deserializer) + |> child(:transcoder, Membrane.Transcoder), + get_child(:transcoder) + |> via_out(Membrane.Pad.ref(:output, 0), + options: [ + output_stream_format: H264, + native_acceleration: native_acceleration + ] + ) + |> child(:sink, %Membrane.File.Sink{location: output_file}) + ] RCPipeline.subscribe(pipeline, _any) RCPipeline.exec_actions(pipeline, spec: spec) diff --git a/lib/transcoder.ex b/lib/transcoder.ex index 9a1eb0a..88c2700 100644 --- a/lib/transcoder.ex +++ b/lib/transcoder.ex @@ -34,11 +34,11 @@ defmodule Membrane.Transcoder do child(:transcoder, Membrane.Transcoder), get_child(:transcoder) - |> via_out(Pad.ref(:output, 0), options: [output_stream_format: H264, width: 1280, height: 720]) - |> child(:hd_sink, Membrane.File.Sink), + |> via_out(Pad.ref(:output, 0), options: [output_stream_format: H264, transcoding_policy: :if_needed]) + |> child(:h264_sink, Membrane.File.Sink), get_child(:transcoder) - |> via_out(Pad.ref(:output, 1), options: [output_stream_format: H264, width: 640, height: 360]) - |> child(:sd_sink, Membrane.File.Sink) + |> via_out(Pad.ref(:output, 1), options: [output_stream_format: H265, transcoding_policy: :always]) + |> child(:h265_sink, Membrane.File.Sink) """ use Membrane.Bin diff --git a/lib/transcoder/audio.ex b/lib/transcoder/audio.ex index e33993d..2e34b41 100644 --- a/lib/transcoder/audio.ex +++ b/lib/transcoder/audio.ex @@ -210,5 +210,5 @@ defmodule Membrane.Transcoder.Audio do end defp child_name(nil, base), do: base - defp child_name(suffix, base), do: :"#{suffix}_#{base}" + defp child_name(suffix, base), do: {suffix, base} end diff --git a/lib/transcoder/video.ex b/lib/transcoder/video.ex index 2354675..833e4e9 100644 --- a/lib/transcoder/video.ex +++ b/lib/transcoder/video.ex @@ -391,5 +391,5 @@ defmodule Membrane.Transcoder.Video do end defp child_name(nil, base), do: base - defp child_name(suffix, base), do: :"#{suffix}_#{base}" + defp child_name(suffix, base), do: {suffix, base} end diff --git a/test/integration_test.exs b/test/integration_test.exs index 65ed8a8..c230b5d 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -220,7 +220,7 @@ defmodule Membrane.Transcoder.IntegrationTest do get_child_result = Testing.Pipeline.get_child_pid( pipeline, - [:transcoder, :"output_0_#{base_name}"] + [:transcoder, {"output_0", base_name}] ) if transcoding_policy == :always do @@ -277,16 +277,16 @@ defmodule Membrane.Transcoder.IntegrationTest do assert_sink_stream_format(pid, :sink, _format) assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_decoder}]) assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_encoder}]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :vk_h264_decoder}]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :vk_h264_encoder}]) Testing.Pipeline.terminate(pid) end @@ -391,17 +391,17 @@ defmodule Membrane.Transcoder.IntegrationTest do # :always output should have encoder/decoder with "output_0_" prefix assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_decoder}]) assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_encoder}]) # :if_needed output should NOT have encoder/decoder (same format type) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_1", :h264_decoder}]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_1_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_1", :h264_encoder}]) Testing.Pipeline.terminate(pid) end @@ -499,16 +499,16 @@ defmodule Membrane.Transcoder.IntegrationTest do assert_sink_stream_format(pid, :sink, _format) assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :vk_h264_decoder}]) assert {:ok, _pid} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_vk_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :vk_h264_encoder}]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_decoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_decoder}]) assert {:error, :child_not_found} = - Testing.Pipeline.get_child_pid(pid, [:transcoder, :output_0_h264_encoder]) + Testing.Pipeline.get_child_pid(pid, [:transcoder, {"output_0", :h264_encoder}]) Testing.Pipeline.terminate(pid) end From 948a20005a2e0e561cecd497691af3a97fc8e5ec Mon Sep 17 00:00:00 2001 From: Piotr Szeremeta Date: Tue, 26 May 2026 11:42:58 +0200 Subject: [PATCH 3/5] Handle cpu quota for cgroups v2 in docker container --- lib/transcoder/video.ex | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/transcoder/video.ex b/lib/transcoder/video.ex index 833e4e9..55651a6 100644 --- a/lib/transcoder/video.ex +++ b/lib/transcoder/video.ex @@ -351,31 +351,17 @@ defmodule Membrane.Transcoder.Video do end defp maybe_plug_encoder_and_parser(builder, %VP8{}, suffix) do - cpu_quota = :erlang.system_info(:cpu_quota) - - number_of_threads = - if cpu_quota != :unknown, - do: cpu_quota, - else: :erlang.system_info(:logical_processors_available) - builder |> child(child_name(suffix, :vp8_encoder), %VP8.Encoder{ - g_threads: number_of_threads, + g_threads: cpu_count(), cpu_used: 15 }) end defp maybe_plug_encoder_and_parser(builder, %VP9{}, suffix) do - cpu_quota = :erlang.system_info(:cpu_quota) - - number_of_threads = - if cpu_quota != :unknown, - do: cpu_quota, - else: :erlang.system_info(:logical_processors_available) - builder - |> child(child_name(suffix, :vp8_encoder), %VP9.Encoder{ - g_threads: number_of_threads, + |> child(child_name(suffix, :vp9_encoder), %VP9.Encoder{ + g_threads: cpu_count(), cpu_used: 15 }) end @@ -390,6 +376,20 @@ defmodule Membrane.Transcoder.Video do end end + defp cpu_count do + cpu_quota = :erlang.system_info(:cpu_quota) + + if cpu_quota != :unknown do + cpu_quota + else + try do + :erlang.system_info(:logical_processors_online) + rescue + _ -> :erlang.system_info(:logical_processors_available) + end + end + end + defp child_name(nil, base), do: base defp child_name(suffix, base), do: {suffix, base} end From 889c05146655ffa85b3332ea28cdf2a9a035ec99 Mon Sep 17 00:00:00 2001 From: Piotr Szeremeta Date: Tue, 26 May 2026 16:29:11 +0200 Subject: [PATCH 4/5] Use @tag :tmp_dir in integration tests --- test/integration_test.exs | 75 +++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/test/integration_test.exs b/test/integration_test.exs index c230b5d..d866509 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -89,23 +89,24 @@ defmodule Membrane.Transcoder.IntegrationTest do Enum.map(@vk_video_cases, fn test_case -> @tag :vulkan - test "transcoder produces stable output for #{inspect(test_case.input_format)} -> H264 with native acceleration" do + @tag :tmp_dir + test "transcoder produces stable output for #{inspect(test_case.input_format)} -> H264 with native acceleration", + %{tmp_dir: tmp_dir} do fixture_path = Path.join( @vk_fixtures_dir, "#{unquote(test_case.input_format) |> Module.split() |> List.last() |> String.downcase()}_to_h264.h264" ) - actual = run_transcoder_to_file(unquote(Macro.escape(test_case)), :if_available) + actual = run_transcoder_to_file(unquote(Macro.escape(test_case)), :if_available, tmp_dir) assert byte_size(actual) > 0, "transcoder produced empty output" assert_or_regenerate_fixture!(actual, fixture_path) end end) - defp run_transcoder_to_file(test_case, native_acceleration) do - tmp_path = - Path.join(System.tmp_dir!(), "vk_transcoder_#{:erlang.unique_integer([:positive])}") + defp run_transcoder_to_file(test_case, native_acceleration, tmp_dir) do + tmp_path = tmp_path(tmp_dir, "vk_transcoder") pid = Testing.Pipeline.start_link_supervised!() @@ -130,8 +131,8 @@ defmodule Membrane.Transcoder.IntegrationTest do bytes end - defp transcode_to_bytes(input_file, preprocess, output_format) do - tmp_path = Path.join(System.tmp_dir!(), "ref_#{:erlang.unique_integer([:positive])}") + defp transcode_to_bytes(input_file, preprocess, output_format, tmp_dir) do + tmp_path = tmp_path(tmp_dir, "ref") pid = Testing.Pipeline.start_link_supervised!() spec = @@ -150,6 +151,10 @@ defmodule Membrane.Transcoder.IntegrationTest do bytes end + defp tmp_path(tmp_dir, prefix) do + Path.join(tmp_dir, "#{prefix}_#{:erlang.unique_integer([:positive])}") + end + defp assert_or_regenerate_fixture!(actual, fixture_path) do if System.get_env("REGEN_VK_FIXTURES") == "1" do File.mkdir_p!(Path.dirname(fixture_path)) @@ -291,14 +296,19 @@ defmodule Membrane.Transcoder.IntegrationTest do Testing.Pipeline.terminate(pid) end - test "multivariant output: two outputs with different formats from H264 input" do - ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) - ref_h265 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H265) + @tag :tmp_dir + test "multivariant output: two outputs with different formats from H264 input", %{ + tmp_dir: tmp_dir + } do + ref_h264 = + transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264, tmp_dir) + + ref_h265 = + transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H265, tmp_dir) pid = Testing.Pipeline.start_link_supervised!() - tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end - h264_tmp = tmp.() - h265_tmp = tmp.() + h264_tmp = tmp_path(tmp_dir, "mv") + h265_tmp = tmp_path(tmp_dir, "mv") spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) @@ -326,14 +336,17 @@ defmodule Membrane.Transcoder.IntegrationTest do assert mv_h265 == ref_h265 end - test "multivariant output: two video outputs with different resolutions" do - ref_h264 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264) - ref_vp8 = transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, VP8) + @tag :tmp_dir + test "multivariant output: two video outputs with different resolutions", %{tmp_dir: tmp_dir} do + ref_h264 = + transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, H264, tmp_dir) + + ref_vp8 = + transcode_to_bytes("./test/fixtures/video.h264", &Preprocessors.parse_h264/1, VP8, tmp_dir) pid = Testing.Pipeline.start_link_supervised!() - tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end - h264_tmp = tmp.() - vp8_tmp = tmp.() + h264_tmp = tmp_path(tmp_dir, "mv") + vp8_tmp = tmp_path(tmp_dir, "mv") spec = [ child(%Membrane.File.Source{location: "./test/fixtures/video.h264"}) @@ -406,18 +419,26 @@ defmodule Membrane.Transcoder.IntegrationTest do Testing.Pipeline.terminate(pid) end - test "multivariant output: three audio outputs with different formats" do - ref_aac = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, AAC) - ref_opus = transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, Opus) + @tag :tmp_dir + test "multivariant output: three audio outputs with different formats", %{tmp_dir: tmp_dir} do + ref_aac = + transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, AAC, tmp_dir) + + ref_opus = + transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, Opus, tmp_dir) ref_mp3 = - transcode_to_bytes("./test/fixtures/audio.aac", &Preprocessors.parse_aac/1, MPEGAudio) + transcode_to_bytes( + "./test/fixtures/audio.aac", + &Preprocessors.parse_aac/1, + MPEGAudio, + tmp_dir + ) pid = Testing.Pipeline.start_link_supervised!() - tmp = fn -> Path.join(System.tmp_dir!(), "mv_#{:erlang.unique_integer([:positive])}") end - aac_tmp = tmp.() - opus_tmp = tmp.() - mp3_tmp = tmp.() + aac_tmp = tmp_path(tmp_dir, "mv") + opus_tmp = tmp_path(tmp_dir, "mv") + mp3_tmp = tmp_path(tmp_dir, "mv") spec = [ child(%Membrane.File.Source{location: "./test/fixtures/audio.aac"}) From 76392e67fadf6052f8db2ef37f41999694120930 Mon Sep 17 00:00:00 2001 From: Piotr Szeremeta Date: Tue, 26 May 2026 16:36:09 +0200 Subject: [PATCH 5/5] fix credo --- lib/transcoder/video.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/transcoder/video.ex b/lib/transcoder/video.ex index 55651a6..e651a0c 100644 --- a/lib/transcoder/video.ex +++ b/lib/transcoder/video.ex @@ -376,7 +376,7 @@ defmodule Membrane.Transcoder.Video do end end - defp cpu_count do + defp cpu_count() do cpu_quota = :erlang.system_info(:cpu_quota) if cpu_quota != :unknown do @@ -385,7 +385,7 @@ defmodule Membrane.Transcoder.Video do try do :erlang.system_info(:logical_processors_online) rescue - _ -> :erlang.system_info(:logical_processors_available) + _cpu_quota -> :erlang.system_info(:logical_processors_available) end end end