lib/sequins.ex

defmodule Sequins do
  alias Sequins.{Queues, Subscriptions, Topics}
  use Application
  require Logger

  @moduledoc """
  Utilities to create the queues, topics, and subscriptions required to
  support the ingest pipeline.
  """

  @moduledoc deprecated: "Sequins is no longer being actively maintained. Use [Broadway](https://github.com/dashbitco/broadway) or [Broadway SQS](https://github.com/dashbitco/broadway_sqs) directly instead", since: "0.7.2"

  @type stringish :: atom() | binary()
  @type filter :: {stringish(), stringish()}
  @type subscription :: stringish() | {stringish(), list(filter())}
  @type subscriptions :: list(subscription())
  @type spec :: stringish() | {stringish(), subscriptions()}
  @type specs :: list(spec)

  @doc """
  Set up pipeline infrastructure based on a list of queue/topic/subscription specs. A spec
  looks like this:

      {Action, [{OtherAction, [filters]}, ...]}

  An action with no subscriptions can simply be specified as

      Action

  For example, let's say you have 4 actions – `A` (do some things), `B` & `C` (do some things; depend
  on A's success), and `D` (handles all errors). The spec for this would be:

      [
        :A,
        B: [A: [status: :ok]],
        C: [A: [status: :ok]],
        D: [A: [status: :error], B: [status: :error], C: [status: :error]]
      ]

  The above spec will create:

  * 4 SQS queues (`sequins-a`, `sequins-c`, `sequins-c`, `sequins-d`)
  * 4 SNS topics (`sequins-a`, `sequins-c`, `sequins-c`, `sequins-d`)
  * 5 SNS subscriptions
    * `sequins-a` -> `sequins-b` ({"status": "ok"})
    * `sequins-a` -> `sequins-c` ({"status": "ok"})
    * `sequins-a` -> `sequins-d` ({"status": "error"})
    * `sequins-b` -> `sequins-d` ({"status": "error"})
    * `sequins-c` -> `sequins-d` ({"status": "error"})

  The default prefix for created resources is `sequins`, but can be changed by configuring
  the `:sequins` application's `:prefix` attribute. The options passed to the supervisor
  on startup can be specified by setting the `:supervisor_opts` attribute to a keyword list
  of valid [`Supervisor`](https://hexdocs.pm/elixir/Supervisor.html) options.

  In addition to `:status`, subscriptions can filter on any attribute added to the `attrs` hash
  by the `Sequins.Pipeline.Action.process/2` callback.
  """
  @spec setup(specs :: specs()) :: {list(), list(), list()}
  def setup(specs) do
    (queues = parse_queues(specs)) |> Queues.create_queues()
    (topics = parse_topics(specs)) |> Topics.create_topics()
    Subscriptions.delete_subscriptions(prefix())
    (subscriptions = parse_subscriptions(specs)) |> Subscriptions.create_subscriptions()
    {queues, topics, subscriptions}
  end

  def inflect(value) do
    if is_atom(value) &&
         Code.ensure_loaded?(value) &&
         function_exported?(value, :queue_name, 0),
       do: value.queue_name(),
       else:
         [
           prefix(),
           value
           |> to_string()
           |> Inflex.underscore()
           |> String.replace("_", "-")
         ]
         |> Enum.reject(&is_nil/1)
         |> Enum.join("")
  end

  def parse_queues(specs) do
    specs
    |> Enum.map(fn
      {queue, _} -> inflect(queue)
      queue -> inflect(queue)
    end)
  end

  def parse_topics(specs), do: parse_queues(specs)

  def parse_filters(filters) do
    filters
    |> Enum.map(fn
      {key, value} when is_list(value) -> {key, value}
      {key, value} -> {key, [value]}
    end)
    |> Enum.into(%{})
  end

  def parse_subscriptions(specs) do
    specs
    |> Enum.filter(fn
      {_, _} -> true
      _ -> false
    end)
    |> Enum.map(fn {queue, queue_subs} ->
      queue_subs
      |> Enum.map(fn
        {target, filters} ->
          {inflect(queue), inflect(target), filters |> parse_filters()}

        target ->
          {inflect(queue), inflect(target), nil}
      end)
    end)
    |> List.flatten()
  end

  @doc false
  @impl Application
  def start(_type, _args) do
    with config <- Application.get_env(:sequins, :supervisor_opts, []),
         opts <- Keyword.merge([name: __MODULE__.Supervisor, strategy: :one_for_one], config) do
      Supervisor.start_link([], opts)
    end
  end

  def start_children(children) do
    Enum.map(children, &start_child/1)
  end

  defp start_child(action) do
    child_name =
      case action do
        {mod, _} -> to_string(mod)
        mod -> to_string(mod)
      end

    Logger.info("Sequins: Starting #{child_name}")

    case Supervisor.start_child(__MODULE__.Supervisor, action) do
      {:ok, pid} ->
        {:ok, action, pid}

      {:error, reason} ->
        message =
          case reason do
            {{:EXIT, {err, _}}, _} -> inspect(err)
            err -> inspect(err)
          end

        Logger.warn("Sequins: #{child_name} failed to start: #{message}")
        {:error, action, reason}
    end
  end

  def prefix do
    Application.get_env(:sequins, :prefix, "sequins") <> "-"
  end
end