Skip to main content

lib/systemd/signal.ex

defmodule Systemd.Signal do
  @moduledoc """
  Helpers for receiving systemd D-Bus signals.
  """

  alias Rebus.Message
  alias Systemd.{DBus, Error}

  @dbus_destination "org.freedesktop.DBus"
  @dbus_path "/org/freedesktop/DBus"
  @dbus_interface "org.freedesktop.DBus"
  @systemd_destination "org.freedesktop.systemd1"
  @manager_path "/org/freedesktop/systemd1"
  @manager_interface "org.freedesktop.systemd1.Manager"

  @type subscription :: %__MODULE__{conn: pid(), ref: reference(), match_rule: String.t()}
  @type job_removed :: %{
          id: non_neg_integer(),
          job_path: String.t(),
          unit: String.t(),
          result: String.t()
        }

  @enforce_keys [:conn, :ref, :match_rule]
  defstruct [:conn, :ref, :match_rule]

  @doc """
  Subscribes the current process to systemd manager signals.
  """
  @spec subscribe_manager(pid()) :: {:ok, subscription()} | {:error, Error.t()}
  def subscribe_manager(conn) when is_pid(conn) do
    match_rule =
      "type='signal',sender='#{@systemd_destination}',path='#{@manager_path}',interface='#{@manager_interface}'"

    with :ok <- add_match(conn, match_rule),
         {:ok, []} <- manager_call(conn, "Subscribe"),
         ref when is_reference(ref) <- Rebus.add_signal_handler(conn) do
      {:ok, %__MODULE__{conn: conn, ref: ref, match_rule: match_rule}}
    else
      {:error, %Error{} = error} -> {:error, error}
      other -> {:error, Error.protocol_error(other)}
    end
  end

  @doc """
  Removes a signal subscription returned by `subscribe_manager/1`.
  """
  @spec unsubscribe(subscription()) :: :ok | {:error, Error.t()}
  def unsubscribe(%__MODULE__{conn: conn, ref: ref, match_rule: match_rule}) do
    :ok = Rebus.delete_signal_handler(conn, ref)
    remove_match(conn, match_rule)
  end

  @doc """
  Waits for a `JobRemoved` signal matching a job object path.
  """
  @spec await_job_removed(subscription(), String.t(), keyword()) ::
          {:ok, job_removed()} | {:error, Error.t()}
  def await_job_removed(%__MODULE__{ref: ref}, job_path, opts \\ []) when is_binary(job_path) do
    timeout = Keyword.get(opts, :timeout, 30_000)
    deadline = System.monotonic_time(:millisecond) + timeout
    do_await_job_removed(ref, job_path, deadline)
  end

  defp do_await_job_removed(ref, job_path, deadline) do
    remaining = max(deadline - System.monotonic_time(:millisecond), 0)

    receive do
      {^ref, %Message{} = message} ->
        case job_removed(message) do
          {:ok, %{job_path: ^job_path} = removed} -> {:ok, removed}
          {:ok, _other_job} -> do_await_job_removed(ref, job_path, deadline)
          :ignore -> do_await_job_removed(ref, job_path, deadline)
        end
    after
      remaining -> {:error, Error.connection_error(:timeout)}
    end
  end

  defp job_removed(%Message{
         type: :signal,
         header_fields: headers,
         body: [id, job_path, unit, result]
       }) do
    if Map.get(headers, :member) == "JobRemoved" and
         Map.get(headers, :interface) == @manager_interface do
      {:ok, %{id: id, job_path: job_path, unit: unit, result: result}}
    else
      :ignore
    end
  end

  defp job_removed(_message), do: :ignore

  defp add_match(conn, match_rule), do: dbus_void_call(conn, "AddMatch", [match_rule], "s")
  defp remove_match(conn, match_rule), do: dbus_void_call(conn, "RemoveMatch", [match_rule], "s")

  defp manager_call(conn, member) do
    DBus.call_body(conn,
      destination: @systemd_destination,
      path: @manager_path,
      interface: @manager_interface,
      member: member
    )
  end

  defp dbus_void_call(conn, member, body, signature) do
    with {:ok, []} <-
           DBus.call_body(conn,
             destination: @dbus_destination,
             path: @dbus_path,
             interface: @dbus_interface,
             member: member,
             signature: signature,
             body: body
           ) do
      :ok
    end
  end
end