lib/membrane/clock.ex

defmodule Membrane.Clock do
  @moduledoc """
  Clock is a Membrane utility that allows elements to measure time according to
  a particular clock, which can be e.g. a soundcard hardware clock.

  Internally, Clock is a GenServer process that can receive _updates_ (see `t:update_message/0`),
  which are messages containing amount of time until the next update.
  For example, a sink playing audio to the sound card can send an update before
  each write to the sound card buffer (for practical reasons that can be done every
  100 or 1000 writes). Although it might be more intuitive to send updates with
  the time passed, in practice the described approach turns out to be more convenient,
  as it simplifies the first update.

  Basing on updates, Clock calculates the `t:ratio/0` of its time to the reference
  time. The reference time can be configured with `:time_provider` option. The ratio
  is broadcasted (see `t:ratio_message/0`) to _subscribers_ (see `subscribe/2`)
  - processes willing to synchronize to the custom clock. Subscribers can adjust
  their timers according to received ratio - timers started with
  `t:Membrane.Element.Action.start_timer/0` action in elements do it automatically.
  Initial ratio is equal to 1, which means that if no updates are received,
  Clock is synchronized to the reference time.

  ## Proxy mode
  Clock can work in _proxy_ mode, which means it cannot receive updates, but
  it receives ratio from another clock instead, and forwards it to subscribers.
  Proxy mode is enabled with `proxy_for: pid` or `proxy: true` (no initial proxy)
  option, and the proxy is set/changed using `proxy_for/2`.
  """
  use Bunch
  use GenServer

  alias Membrane.Core.Message
  alias Membrane.Time

  require Membrane.Core.Utils, as: Utils

  @typedoc @moduledoc
  @type t :: pid

  @typedoc """
  Ratio of the Clock time to the reference time.
  """
  @type ratio :: Ratio.t()

  @typedoc """
  Update message received by the Clock. It should contain the time till the next
  update.
  """
  @type update_message ::
          {:membrane_clock_update,
           milliseconds ::
             non_neg_integer
             | ratio
             | {numerator :: non_neg_integer, denominator :: pos_integer()}}

  @typedoc """
  Ratio message sent by the Clock to all its subscribers. It contains the ratio
  of the custom clock time to the reference time.
  """
  @type ratio_message :: {:membrane_clock_ratio, clock :: pid, ratio}

  @typedoc """
  Options accepted by `start_link/2` and `start/2` functions.

  They are the following:
    - `time_provider` - function providing the reference time in milliseconds
    - `proxy` - determines whether the Clock should work in proxy mode
    - `proxy_for` - enables the proxy mode and sets proxied Clock to pid

  Check the moduledoc for more details.
  """
  @type option ::
          {:time_provider, (-> Time.t())}
          | {:proxy, boolean}
          | {:proxy_for, pid | nil}

  @spec start_link([option], GenServer.options()) :: GenServer.on_start()
  def start_link(options \\ [], gen_server_options \\ []) do
    GenServer.start_link(__MODULE__, options, gen_server_options)
  end

  @spec start([option], GenServer.options()) :: GenServer.on_start()
  def start(options \\ [], gen_server_options \\ []) do
    GenServer.start(__MODULE__, options, gen_server_options)
  end

  @doc """
  Subscribes `pid` for receiving `t:ratio_message/0` messages from the clock.

  This function can be called multiple times from the same process. To unsubscribe,
  `unsubscribe/2` should be called the same amount of times. The subscribed pid
  always receives one message, regardless of how many times it called `subscribe/2`.
  """
  @spec subscribe(t, subscriber :: pid) :: :ok
  def subscribe(clock, pid \\ self()) do
    GenServer.cast(clock, {:clock_subscribe, pid})
  end

  @doc """
  Unsubscribes `pid` from receiving `t:ratio_message/0` messages from the clock.

  For unsubscription to take effect, `unsubscribe/2` should be called the same
  amount of times as `subscribe/2`.
  """
  @spec unsubscribe(t, subscriber :: pid) :: :ok
  def unsubscribe(clock, pid \\ self()) do
    GenServer.cast(clock, {:clock_unsubscribe, pid})
  end

  @doc """
  Sets a new proxy clock to `clock_to_proxy_for`.
  """
  @spec proxy_for(t, clock_to_proxy_for :: pid | nil) :: :ok
  def proxy_for(clock, clock_to_proxy_for) do
    GenServer.cast(clock, {:proxy_for, clock_to_proxy_for})
  end

  @impl GenServer
  def init(options) do
    Utils.log_on_error do
      do_init(options)
    end
  end

  defp do_init(options) do
    proxy_opts = get_proxy_options(options[:proxy], options[:proxy_for])

    state =
      %{
        ratio: Ratio.new(1),
        subscribers: %{},
        time_provider: options |> Keyword.get(:time_provider, fn -> Time.monotonic_time() end)
      }
      |> Map.merge(proxy_opts)

    if pid = proxy_opts[:proxy_for], do: Message.send(pid, :clock_subscribe, self())

    {:ok, state}
  end

  @impl GenServer
  def handle_cast(request, state) do
    Utils.log_on_error do
      do_handle_cast(request, state)
    end
  end

  defp do_handle_cast({:proxy_for, proxy_for}, %{proxy: true} = state) do
    if state.proxy_for, do: unsubscribe(state.proxy_for)

    state = %{state | proxy_for: proxy_for}

    state =
      if proxy_for do
        subscribe(proxy_for)
        state
      else
        broadcast_and_update_ratio(Ratio.new(1), state)
      end

    {:noreply, state}
  end

  defp do_handle_cast({:clock_subscribe, pid}, state) do
    state
    |> update_in([:subscribers, pid], fn
      nil ->
        send_ratio(pid, state.ratio)
        monitor = Process.monitor(pid)
        %{monitor: monitor, subscriptions: 1}

      %{subscriptions: subs} = subscriber ->
        %{subscriber | subscriptions: subs + 1}
    end)
    ~> {:noreply, &1}
  end

  defp do_handle_cast({:clock_unsubscribe, pid}, state) do
    if Map.has_key?(state.subscribers, pid) do
      {subs, state} =
        state |> Bunch.Access.get_updated_in([:subscribers, pid, :subscriptions], &(&1 - 1))

      if subs == 0, do: handle_unsubscribe(pid, state), else: state
    else
      state
    end
    ~> {:noreply, &1}
  end

  @impl true
  def handle_info(msg, state) do
    Utils.log_on_error do
      do_handle_info(msg, state)
    end
  end

  defp do_handle_info({:membrane_clock_update, till_next}, %{proxy: false} = state) do
    {:noreply, handle_clock_update(till_next, state)}
  end

  defp do_handle_info({:membrane_clock_ratio, pid, ratio}, %{proxy: true, proxy_for: pid} = state) do
    {:noreply, broadcast_and_update_ratio(ratio, state)}
  end

  # When ratio from previously proxied clock comes in after unsubscribing
  defp do_handle_info({:membrane_clock_ratio, _pid, _ratio}, %{proxy: true} = state) do
    {:noreply, state}
  end

  defp do_handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    {:noreply, handle_unsubscribe(pid, state)}
  end

  defp get_proxy_options(proxy, proxy_for)
  defp get_proxy_options(_proxy, pid) when is_pid(pid), do: %{proxy: true, proxy_for: pid}
  defp get_proxy_options(true, _proxy_for), do: %{proxy: true, proxy_for: nil}

  defp get_proxy_options(_proxy, _proxy_for),
    do: %{init_time: nil, clock_time: Ratio.new(0), till_next: nil, proxy: false}

  defp handle_unsubscribe(pid, state) do
    Process.demonitor(state.subscribers[pid].monitor, [:flush])
    {_subscriber, state} = pop_in(state, [:subscribers, pid])
    state
  end

  defp handle_clock_update({nom, denom}, state) do
    handle_clock_update(Ratio.new(nom, denom), state)
  end

  defp handle_clock_update(till_next, state) do
    till_next = Ratio.new(till_next)

    if Ratio.lt?(till_next, 0) do
      raise "Clock update time cannot be negative, received: #{inspect(till_next)}"
    end

    till_next = Ratio.mult(till_next, Ratio.new(Time.millisecond()))

    case state.init_time do
      nil -> %{state | init_time: state.time_provider.(), till_next: till_next}
      _init_time -> do_handle_clock_update(till_next, state)
    end
  end

  defp do_handle_clock_update(till_next, state) do
    %{till_next: from_previous, clock_time: clock_time} = state
    clock_time = Ratio.add(clock_time, from_previous)
    ratio = Ratio.div(clock_time, Ratio.new(state.time_provider.() - state.init_time))
    state = %{state | clock_time: clock_time, till_next: till_next}
    broadcast_and_update_ratio(ratio, state)
  end

  defp broadcast_and_update_ratio(ratio, state) do
    state.subscribers |> Bunch.KVList.each_key(&send_ratio(&1, ratio))
    %{state | ratio: ratio}
  end

  defp send_ratio(pid, ratio), do: send(pid, {:membrane_clock_ratio, self(), ratio})
end