lib/pipeline.ex

defmodule Pipeline do
  @moduledoc """
  Pipeline management.
  """
  alias Pipeline.PipelineError
  alias Pipeline.State

  @type result :: {:ok, any} | {:error, any}
  @type args :: any
  @type options :: Keyword.t()
  @type reducer :: (args, options -> result)
  @type callback :: (State.t(), options -> any())

  @doc """
  Returns a list of functions to be used as steps of a pipeline. These steps will be executed in the same order that
  they appear on this list.
  """
  @callback __pipeline_steps__() :: [reducer]

  @doc """
  Returns a list of functions to be used as callbacks of a pipeline. These callbacks will be executed in the same order
  that they appear on this list.
  """
  @callback __pipeline_callbacks__() :: [callback]

  defmacro __using__(_) do
    quote do
      @before_compile unquote(__MODULE__)
    end
  end

  # Injects the Pipeline behaviour, the two required callbacks and an `execute/2` function
  defmacro __before_compile__(env) do
    definitions = Module.definitions_in(env.module, :def)
    steps = filter_functions(env.module, definitions, "_step", 2)
    callbacks = filter_functions(env.module, definitions, "_callback", 2)

    quote do
      @behaviour unquote(__MODULE__)

      @impl unquote(__MODULE__)
      def __pipeline_steps__, do: unquote(steps)

      @impl unquote(__MODULE__)
      def __pipeline_callbacks__, do: unquote(callbacks)

      def execute(value, options \\ []) do
        apply(unquote(__MODULE__), :execute, [__MODULE__, value, options])
      end
    end
  end

  defp filter_functions(module, definitions, suffix, expected_arity) do
    functions =
      Enum.reduce(definitions, [], fn {function, arity}, acc ->
        valid_name? =
          function
          |> Atom.to_string()
          |> String.ends_with?(suffix)

        has_expected_args? = arity == expected_arity

        cond do
          valid_name? and has_expected_args? ->
            {_, _, [line: line], _} = Module.get_definition(module, {function, arity})
            [{module, function, line} | acc]

          valid_name? ->
            raise(
              PipelineError,
              "Function #{function} does not accept #{expected_arity} parameters."
            )

          true ->
            acc
        end
      end)

    functions
    # order by line number
    |> Enum.sort(fn {_, _, a}, {_, _, b} -> a <= b end)
    # drop line number
    |> Enum.map(fn {m, f, _l} -> {m, f} end)
  end

  @doc """
  Executes the pipeline defined by `module` with the given `value` and `options`.
  """
  def execute(module, value, options \\ []) do
    ensure_valid_pipeline!(module)

    initial_state = State.new(value)
    steps = apply(module, :__pipeline_steps__, [])
    callbacks = apply(module, :__pipeline_callbacks__, [])

    final_state =
      Enum.reduce(steps, initial_state, fn reducer, curent_state ->
        State.update(curent_state, reducer, options)
      end)

    Enum.each(callbacks, fn callback ->
      State.callback(final_state, callback, options)
    end)

    case final_state do
      %State{valid?: true, value: value} ->
        {:ok, value}

      %State{errors: errors} ->
        {:error, errors}
    end
  end

  defp ensure_valid_pipeline!(module) do
    exports_steps? = function_exported?(module, :__pipeline_steps__, 0)
    exports_callbacks? = function_exported?(module, :__pipeline_callbacks__, 0)

    unless exports_steps? && exports_callbacks? do
      raise(PipelineError, "Module #{module} is not a valid pipeline.")
    end
  end
end