lib/membrane_element_udp/source.ex

defmodule Membrane.Element.UDP.Source do
  @moduledoc """
  Element that reads packets from a UDP socket and sends their payloads through the output pad.
  """
  use Membrane.Source

  alias Membrane.{Buffer, RemoteStream}
  alias Membrane.Element.UDP.{CommonSocketBehaviour, Socket}

  def_options local_port_no: [
                type: :integer,
                spec: pos_integer,
                default: 5000,
                description: "A UDP port number used when opening a receiving socket."
              ],
              local_address: [
                type: :ip_address,
                spec: :inet.socket_address(),
                default: :any,
                description: """
                An IP Address on which the socket will listen. It allows to choose which
                network interface to use if there's more than one.
                """
              ],
              recv_buffer_size: [
                type: :integer,
                spec: pos_integer,
                default: 16_384,
                description: """
                Size of the receive buffer. Packages of size greater than this buffer will be truncated
                """
              ]

  def_output_pad :output, caps: {RemoteStream, type: :packetized}, mode: :push

  @impl true
  def handle_init(%__MODULE__{} = opts) do
    socket = %Socket{
      ip_address: opts.local_address,
      port_no: opts.local_port_no,
      sock_opts: [recbuf: opts.recv_buffer_size]
    }

    {:ok, %{local_socket: socket}}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    {{:ok, caps: {:output, %RemoteStream{type: :packetized}}}, state}
  end

  @impl true
  def handle_other(
        {:udp, _socket_handle, address, port_no, payload},
        %{playback_state: :playing},
        state
      ) do
    metadata =
      Map.new()
      |> Map.put(:udp_source_address, address)
      |> Map.put(:udp_source_port, port_no)
      |> Map.put(:arrival_ts, Membrane.Time.vm_time())

    actions = [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}]

    {{:ok, actions}, state}
  end

  @impl true
  def handle_other({:udp, _socket_handle, _address, _port_no, _payload}, _ctx, state) do
    {:ok, state}
  end

  @impl true
  defdelegate handle_stopped_to_prepared(context, state), to: CommonSocketBehaviour

  @impl true
  defdelegate handle_prepared_to_stopped(context, state), to: CommonSocketBehaviour
end