lib/phx_json_rpc/router/pipe.ex

defmodule PhxJsonRpc.Router.Pipe do
  @moduledoc """
  Behaviour for handling requests via rpc.
  """
  alias PhxJsonRpc.Response

  @doc """
  Handles rpc requests via the given context(router).

  Processes each request by the following pipeline:
  ```
  - parse
  - validate
  - dispatch
  ```

  If batch size exceeded, an exception will be rendered and
  requests for the current batch wont be processed.
  """
  @callback handle(request :: list(map()) | map(), context :: module()) ::
              [Response.t()] | Response.t()
end

defmodule PhxJsonRpc.Router.DefaultPipe do
  @moduledoc false

  @behaviour PhxJsonRpc.Router.Pipe

  alias PhxJsonRpc.Error.InternalError
  alias PhxJsonRpc.Response

  alias PhxJsonRpc.Router.{
    DefaultDispatcher,
    DefaultMiddleware,
    DefaultParser,
    DefaultValidator
  }

  @default_parser DefaultParser
  @default_validator DefaultValidator
  @default_dispatcher DefaultDispatcher

  @impl true
  def handle(%{"_json" => requests}, context) do
    handle(requests, context)
  end

  @impl true
  def handle(requests, context) when is_list(requests) do
    if Enum.count(requests) <= context.instance.get_max_batch_size() do
      handle_batch(requests, context)
    else
      show_limit_error()
    end
  end

  @impl true
  def handle(request, context) do
    meta = get_metadata(request, context)
    schema_ref = if is_nil(meta), do: nil, else: meta.schema_ref

    config = Application.get_env(context.instance.get_otp_app(), context.instance, [])
    parser = config[:parser] || @default_parser
    validator = config[:validator] || @default_validator
    dispatcher = config[:dispatcher] || @default_dispatcher

    request
    |> parser.parse(context.instance.get_version())
    |> validator.validate(schema_ref, context.instance.get_json_schema())
    |> DefaultMiddleware.handle(context)
    |> dispatcher.dispatch(meta, context)
  end

  defp handle_batch(requests, context) do
    requests
    |> Task.async_stream(fn request -> handle(request, context) end, ordered: false)
    |> Enum.map(fn {:ok, response} -> response end)
    |> Enum.to_list()
  end

  defp show_limit_error do
    error = %InternalError{message: "Batch size limit exceeded."}
    %Response{error: error, valid?: false}
  end

  defp get_metadata(request, context) when is_map(request) do
    method = Map.get(request, "method")

    if is_binary(method) do
      Keyword.get(context.instance.get_routes(), get_key(method))
    else
      nil
    end
  end

  defp get_metadata(_request, _context), do: nil

  defp get_key(method) do
    String.to_existing_atom(method)
  rescue
    ArgumentError -> nil
  end
end