lib/step_flow/amqp/helpers.ex

defmodule StepFlow.Amqp.Helpers do
  require Logger

  @moduledoc """
  Helpers for AMQP.
  """

  @doc """
  Get AMQP URL from the configuration or environment variables.
  - `hostname` Setup the hostname of the RabbitMQ service
  - `username` Setup the username of the RabbitMQ service
  - `password` Setup the password of the RabbitMQ service
  - `port` Setup the port of the RabbitMQ service
  - `virtual_host` Setup the virtual host of the RabbitMQ service

  Hardcoded example:

      config :step_flow, StepFlow.Amqp,
        hostname: "example.com",
        port: "5678",
        username: "mediacloudai",
        password: "mediacloudai",
        virtual_host: "media_cloud_ai_dev"

  Environment getter example:

      config :step_flow, StepFlow.Amqp,
        hostname: {:system, "AMQP_HOSTNAME"},
        port: {:system, "AMQP_PORT"},
        username: {:system, "AMQP_USERNAME"},
        password: {:system, "AMQP_PASSWORD"},
        virtual_host: {:system, "AMQP_VIRTUAL_HOST"},

  """
  def get_amqp_connection_url do
    scheme = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :scheme, "amqp")
    hostname = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :hostname)
    username = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :username)
    password = StepFlow.Configuration.get_var_value(StepFlow.Amqp, :password)
    virtual_host = get_amqp_virtual_host()
    port = get_amqp_port()

    Logger.info("#{__MODULE__}: Connecting with hostname: #{hostname}")

    url =
      scheme <>
        "://" <> username <> ":" <> password <> "@" <> hostname <> ":" <> port <> virtual_host

    Logger.info("#{__MODULE__}: Connecting with url: #{url}")
    url
  end

  defp convert_to_integer(value) do
    case value do
      v when is_bitstring(v) -> String.to_integer(value)
      v when is_integer(v) -> value
    end
  end

  defp convert_to_atom(value) do
    case value do
      v when is_bitstring(v) -> String.to_existing_atom(value)
      v when is_atom(v) -> value
    end
  end

  defp check_file_exists(path) do
    case File.exists?(path) do
      true -> path
      false -> raise "Such a file does not exist: #{path}"
    end
  end

  defp add_to_options(options, key, variable, update_function) do
    case StepFlow.Configuration.get_var_value(StepFlow.Amqp, variable) do
      nil ->
        options

      "" ->
        options

      value ->
        options
        |> Keyword.put(key, value)
        |> Keyword.update(key, nil, fn v -> update_function.(v) end)
    end
  end

  defp get_amqp_connection_ssl_options do
    []
    |> add_to_options(:cacertfile, :ssl_cacertfile, &check_file_exists/1)
    |> add_to_options(:certfile, :ssl_certfile, &check_file_exists/1)
    |> add_to_options(:keyfile, :ssl_keyfile, &check_file_exists/1)
    |> add_to_options(:depth, :ssl_depth, &convert_to_integer/1)
    |> add_to_options(:verify, :ssl_verify, &convert_to_atom/1)
    |> add_to_options(:fail_if_no_peer_cert, :ssl_fail_if_no_peer_cert, &convert_to_atom/1)
    |> Enum.filter(fn {_key, value} -> !is_nil(value) end)
  end

  def get_amqp_connection_options do
    options =
      case get_amqp_connection_ssl_options() do
        [] ->
          []

        nil ->
          []

        ssl_options ->
          []
          |> Keyword.put(:ssl_options, ssl_options)
      end

    Logger.info("#{__MODULE__}: Connecting with options: #{inspect(options)}")

    options
  end

  defp get_amqp_port do
    StepFlow.Configuration.get_var_value(StepFlow.Amqp, :port, 5672)
    |> StepFlow.Configuration.to_string()
  end

  defp get_amqp_virtual_host do
    StepFlow.Configuration.get_var_value(StepFlow.Amqp, :virtual_host, "")
    |> case do
      "" -> ""
      virtual_host -> "/" <> virtual_host
    end
  end

  def get_amqp_message_options(options \\ []) do
    options
    |> Keyword.put(:persistent, get_amqp_delivery_mode() == 2)
  end

  def get_amqp_delivery_mode do
    StepFlow.Configuration.get_var_value(StepFlow.Amqp, :delivery_mode, "1")
    |> convert_to_integer
  end

  def get_amqp_server_configuration do
    StepFlow.Configuration.get_var_value(StepFlow.Amqp, :server_configuration, "standalone")
  end
end