lib/rambla.ex

defmodule Rambla do
  @moduledoc """
  Interface for the message publishing through `Rambla`.

  `Rambla` maintains connection pools with `Finitomata.Pool` for each service.

  The typical config for `Rambla` service follows the pattern introduced by
    `AMQP` library:

  ```elixir
  ‹service›: [
    connections: [
      ‹connection_name›: [‹key_1›: ‹value_1›, …]
    ],
    channels: [
      ‹channel_name›: [
        connection: ‹connection_name›,
        options: [‹key_1›: ‹value_1›, …]
      ]
    ]
  ]
  ```

  Additional option, one might pass to the channel config, would be explicit handlers
    for failures and success calls (by default the former prints the warning and retries
    until the maximum count of retries reached, and then calls `on_fatal/2` callback,
    and the latter logs a debug message.)

  ```elixir
    channels: [
      chan_1: [
        connection: :conn_1,
        options: [
          callbacks: [
            on_success: fn result -> IO.inspect(result, label: "on_success") && :ok end]
          ]]]]
  ```

  ---

  To start pools, simply embed `Rambla` into the supervision tree, it’d
    start a supervisor with children for all the configured services.

  The configuration of the service implies all the `Rambla`’s code for it will
    be compiled, but the dependency itself must be added to the `deps` section
    of the `Mix.Project` file.

  The excerpt from the `Rambla.MixProject` itself follows

  ```elixir
    # optional backends
    {:amqp, "~> 3.0", optional: true},
    {:redix, "~> 1.0", optional: true},
    {:pillar, "~> 0.39", optional: true},
    {:gen_smtp, "~> 0.4 or ~> 1.0", optional: true},
    {:telemetria, "~> 0.4 or ~> 1.0", optional: true},

    # s3
    {:ex_aws, "~> 2.1", optional: true},
    {:ex_aws_s3, "~> 2.0", optional: true},
    {:ex_aws_sts, "~> 2.0", optional: true},
    {:hackney, "~> 1.9", optional: true},
    {:sweet_xml, "~> 0.6", optional: true},
    {:configparser_ex, "~> 4.0", optional: true},
  ```

  ---

  Channel names are used across connections to publish messages.
    `Rambla.publish(:channel_1, message)` would publish the message to all channels
    named `channel_1`.
  """

  @doc "Returns a map `%{‹service› => [‹channels›]}`"
  def channels do
    for {service, opts} when is_list(opts) <-
          Application.get_all_env(:rambla) ++ [{:amqp, Application.get_all_env(:amqp)}],
        {:channels, opts} <- opts,
        {name, _} <- opts,
        reduce: %{},
        do: (acc -> Map.update(acc, name, [service], &[service | &1]))
  end

  @doc "Returns a map `%{‹channel› => [‹connection›]}`"
  def connections do
    configs = Application.get_all_env(:rambla) ++ [{:amqp, Application.get_all_env(:amqp)}]

    configs
    |> get_in([Access.all(), Access.elem(1), :channels])
    |> Enum.filter(&Keyword.keyword?/1)
    |> List.flatten()
    |> Enum.reduce(%{}, fn {chan, config}, acc ->
      Map.update(acc, chan, [config], &[config | &1])
    end)
  end

  @doc "Returns a list of all the configured services (connections)"
  def services do
    channel_services = channels() |> Map.values() |> Enum.reduce([], &Kernel.++/2)
    explicit = Application.get_env(:rambla, :services, [])
    Enum.uniq(explicit ++ channel_services)
  end

  @doc false
  def handler_for_service(name) do
    with nil <- :persistent_term.get({Rambla, {:handler, name}}, nil) do
      handler =
        case Application.get_env(:rambla, name) do
          [{_, _} | _] = cfg -> Keyword.get(cfg, :handler)
          _ -> nil
        end || Module.concat(Rambla.Handlers, name |> to_string() |> Macro.camelize())

      tap(handler, &:persistent_term.put({Rambla, {:handler, name}}, &1))
    end
  end

  use Supervisor

  @doc "Starts the supervisor with all the configured services"
  def start_link(opts \\ []) do
    case Keyword.pop_lazy(opts, :name, fn -> Keyword.get(opts, :id) end) do
      {nil, opts} -> Supervisor.start_link(__MODULE__, opts)
      {name, opts} -> Supervisor.start_link(__MODULE__, opts, name: name)
    end
  end

  @impl true
  def init(_opts) do
    services()
    |> Enum.map(&handler_for_service/1)
    |> case do
      [] -> :ignore
      children -> Supervisor.init(children, strategy: :one_for_one)
    end
  end

  @enable_deprecated Application.compile_env(:rambla, :enable_deprecated, true)

  if @enable_deprecated do
    @doc """
    Starts the pools configured in the `config.exs` / `releases.exs` file.

    This call is equivalent to `start_pools(Application.get_env(:rambla, :pools))`.
    """
    @doc deprecated: "Use configuration instead"
    def start_pools do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.start_pools()
    end

    @doc "Starts the pools as specified by options (`map()` or `keyword()`)"
    @doc deprecated: "Use configuration instead"
    def start_pools(opts) do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.start_pools(opts)
    end

    @doc "Returns the currently active pools"
    @doc deprecated: "Use configuration instead"
    def pools do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.pools()
    end
  end

  @doc """
  Publishes the message to the target channels. The message structure depends on
  the destination. For `RabbitMQ` is might be whatever, for `Smtp` it expects
  to have `to:`, `subject:` and `body:` fields.
  """
  def publish(target, message, pid \\ nil)

  if @enable_deprecated do
    def publish(target, message, opts) when is_tuple(target) or is_map(opts) do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.publish(target, message, opts || %{})
    end

    def publish(target, message, opts) when target in [:amqp, :redis, :http, :smtp, :process] do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.publish(target, message, opts || %{})
    end

    def publish(target, message, opts)
        when target in [Rambla.Amqp, Rambla.Redis, Rambla.Http, Rambla.Smtp, Rambla.Process] do
      IO.warn("This call is deprecated and will be removed")
      Rambla.ConnectionPool.publish(target, message, opts || %{})
    end
  end

  def publish(channels, message, pid) when not is_list(channels) do
    publish([channels], message, pid)
  end

  def publish(channels, message, pid) do
    for channel <- channels,
        service <- Map.get(channels(), channel, []),
        handler <- [handler_for_service(service)] do
      handler.publish(channel, message, pid)
    end
  end

  if @enable_deprecated do
    @doc """
    Publishes the message to the destination synchronously, avoiding the pool.
    """
    @doc deprecated: "Use configuration instead with `[count: 1]` option"
    defdelegate publish_synch(target, message), to: Rambla.ConnectionPool

    @doc """
    Publishes the message to the destination synchronously, avoiding the pool.
    Unlike `publish_synch/2`, allows to specify additional options per request.
    """
    @doc deprecated: "Use configuration instead with `[count: 1]` option"
    defdelegate publish_synch(target, message, opts), to: Rambla.ConnectionPool

    @doc """
    Executes any arbitrary function in the context of one of workers in the
    respective connection pool for the target.

    The function would receive a pid of the connection process.
    """
    @doc deprecated: "Use `publish(channels, FUNCTION, pid)` instead"
    defdelegate raw(target, f), to: Rambla.ConnectionPool
  end

  @doc false
  def do_inspect(value) do
    IO.puts(inspect(value))
  end
end