lib/pipeline.ex

defmodule Pipeline do
  @moduledoc """
  Pipeline definition and execution.

  ## What is a "pipeline"?

  A pipeline is set of functions that must be executed in a specific order to transform an initial state into a desired
  state. For example, a "login pipeline" uses the request body as its initial state and generates an authentication
  token.

  ## Creating a pipeline

  To create a new feature as a pipeline, you can simply `use Pipeline` in the target module and start writing
  functions: steps and hooks.

  ### Pipeline Steps

  - Steps are executed in the same order that they are declared in the module.
  - Any function that ends with `_step` and accepts two parameters is considered a step in the pipeline.
  - A step accepts a value and must return an ok tuple with the updated value or an error tuple with the error
   description. If one step failes, the following steps are not executed.
    - The first parameter is the value that's being transformed by each step
    - The second parameter are optional values and it's immutable

  ### Pipeline Hooks

  - Hooks are executed in the same order that they are declared in the module.
  - Any function that ends with  `_hook` and accepts two parameters is considered a hook in the pipeline.
  - Hooks receive the final state of the pipeline, and they are always executed after all steps.
    - The first parameter is the final state as defined by the `%Pipeline.State{}` struct.
    - The second parameter are optional values and it's immutable, the same used by the steps.

  ### Async Hooks
  - They're just like hooks, but the function name must end with `_async_hook`
  - They are launched on isolated processes to processed asynchronously, after all steps are done and before the
    hooks start being executed.

  ## Example

  ```elixir
  defmodule StringToNumber do
    use Pipeline

    def detect_binary_step(value, _options) do
      cond do
        is_binary(value) ->
          {:ok, value}

        true ->
          {:error, "Not a string"}
      end
    end

    def cleanup_step(value, _options) do
      {:ok, String.trim(value)}
    end

    def parse_step(value, _options) do
      case Float.parse(value) do
        {number, _} ->
          {:ok, number}

        :error ->
          {:error, "Invalid number"}
      end
    end
  end
  ```

  To execute this pipeline, you can use `StringToNumber.execute/2` or `Pipeline.execute/3`

  """
  alias Pipeline.PipelineError
  alias Pipeline.State
  alias Pipeline.Types

  @doc """
  Returns a list of tuple with three elements.

  The first element is a list of functions to be used as steps of a pipeline. These steps will be executed in the same
  order that they appear on this list.

  The second element is a list of functions to be used as hooks of a pipeline. These hooks will be executed in
  the same order that they appear on this list.

  The third element is a list of functions to be used as async hooks of a pipeline. The order of execution is not
  guaranteed.
  """
  @callback __pipeline__() :: {[Types.reducer()], [Types.hook()], [Types.async_hook()]}

  @doc false
  defmacro __using__(_) do
    quote do
      @before_compile unquote(__MODULE__)
    end
  end

  # "Injects the Pipeline behaviour, the two required callbacks and an `execute/2` function"
  @doc false
  defmacro __before_compile__(env) do
    definitions = Module.definitions_in(env.module, :def)
    {steps, definitions} = filter_functions(env.module, definitions, "_step", 2)
    {async_hooks, definitions} = filter_functions(env.module, definitions, "_async_hook", 2)
    {hooks, _definitions} = filter_functions(env.module, definitions, "_hook", 2)

    quote do
      @behaviour unquote(__MODULE__)

      @impl unquote(__MODULE__)
      def __pipeline__, do: {unquote(steps), unquote(hooks), unquote(async_hooks)}

      @spec execute(Pipeline.Types.args(), Pipeline.Types.options()) :: Pipeline.Types.result()
      def execute(value, options \\ []) do
        apply(unquote(__MODULE__), :execute, [__MODULE__, value, options])
      end
    end
  end

  defp filter_functions(module, definitions, suffix, expected_arity) do
    {filtered, remaining} =
      Enum.reduce(definitions, {[], []}, fn {function, arity} = fa, {acc, remaining} ->
        valid_name? =
          function
          |> Atom.to_string()
          |> String.ends_with?(suffix)

        has_expected_args? = arity == expected_arity

        cond do
          valid_name? and has_expected_args? ->
            {_, _, [line: line], _} = Module.get_definition(module, {function, arity})
            {[{module, function, line} | acc], remaining}

          valid_name? ->
            raise(
              PipelineError,
              "Function #{function} does not accept #{expected_arity} parameters."
            )

          true ->
            {acc, [fa | remaining]}
        end
      end)

    filtered =
      filtered
      # order by line number
      |> Enum.sort(fn {_, _, a}, {_, _, b} -> a <= b end)
      # drop line number
      |> Enum.map(fn {m, f, _l} -> {m, f} end)

    {filtered, remaining}
  end

  @doc """
  Executes the pipeline defined by `module` with the given `value` and `options`.

  First, all steps are executed in the same order that they were declared on their module. If one step fails, all
  the steps that come after it will not be executed. The returned value from one step will be passed to the next step,
  along with the given `options`.

  Then all async hooks are triggered and executed asynchronously in their own process. They will receive the final
  `%Pipeline.State{}` along with the given `options`. Their return values are ignored.

  After that, all hooks are executed in the same order that they were declared on their module. They will
  receive the final `%Pipeline.State{}` along with the given `options`. Their return values are ignored.

  Once steps and hooks are executed, the state is evaluated and then this function will returns an ok or error
  tuple, depending wether or not the state is valid.

  If the given `module` does not implement the required callbacks from `Pipeline` behaviour, a `PipelineError` will
  be thrown.
  """
  @spec execute(module(), Types.args(), Types.options()) :: Types.result()
  def execute(module, value, options \\ []) do
    ensure_valid_pipeline!(module)

    initial_state = State.new(value)
    {steps, hooks, async_hooks} = apply(module, :__pipeline__, [])

    # Process state
    final_state =
      Enum.reduce(steps, initial_state, fn reducer, curent_state ->
        State.update(curent_state, reducer, options)
      end)

    # Launch async hooks
    Enum.each(async_hooks, fn {mod, fun} ->
      Task.Supervisor.async_nolink(Pipeline.TaskSupervisor, fn ->
        apply(mod, fun, [final_state, options])
      end)
    end)

    # Execute hooks
    Enum.each(hooks, fn {mod, fun} ->
      apply(mod, fun, [final_state, options])
    end)

    case final_state do
      %State{valid?: true, value: value} ->
        {:ok, value}

      %State{error: error} ->
        {:error, error}
    end
  end

  defp ensure_valid_pipeline!(module) do
    exports_pipeline_meta? = function_exported?(module, :__pipeline__, 0)

    unless exports_pipeline_meta? do
      raise(PipelineError, "Module #{module} is not a valid pipeline.")
    end
  end
end