lib/gen_pool.ex

defmodule GenPool do
  @moduledoc """
  Documentation for `GenPool`.
  """

  @type new_state :: term

  @callback init(init_arg :: term) ::
              {:ok, state}
              | {:ok, state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | :ignore
              | {:stop, reason :: any}
            when state: any

  @callback handle_call(request :: term, pid(), state :: term) ::
              {:reply, reply, new_state}
              | {:reply, reply, new_state,
                 timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason, reply, new_state}
              | {:stop, reason, new_state}
            when reply: term, new_state: term, reason: term

  @callback handle_cast(request :: term, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason :: term, new_state}

  @callback handle_info(msg :: :timeout | term, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason :: term, new_state}
            when new_state: term

  @callback handle_continue(continue_arg, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg}}
              | {:stop, reason :: term, new_state}
            when new_state: term, continue_arg: term

  @callback terminate(reason, state :: term) :: term
            when reason: :normal | :shutdown | {:shutdown, term} | term

  @optional_callbacks terminate: 2,
                      handle_info: 2,
                      handle_cast: 2,
                      handle_call: 3,
                      handle_continue: 2

  @doc false
  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      @behaviour GenPool

      @default_opts [
        external_broker?: Keyword.get(opts, :external_broker?, false),
        shared_state?: Keyword.get(opts, :shared_state?, true),
        broker: Keyword.get(opts, :broker, GenPool.DefaultBroker),
        backend: Keyword.get(opts, :backend, %GenPool.Backend.Ane{}),
        pool_size: Keyword.get(opts, :pool_size, 5),
        pool_module: __MODULE__
      ]

      @doc false
      def start_link(opts, process_opts \\ []) do
        GenServer.start_link(
          GenPool.ProcessManager,
          opts,
          process_opts
        )
      end

      def __default_opts__, do: @default_opts
      def __broker__, do: @default_opts[:broker]

      @doc false
      def handle_call(params, _from, _state) do
        raise "GenPool handle_call/3 not implemented for #{inspect(params)}"
      end

      @doc false
      def handle_cast(params, _state) do
        raise "GenPool handle_cast/2 not implemented for #{inspect(params)}"
      end

      @doc false
      def handle_info(params, _state) do
        raise "GenPool handle_info/2 not implemented for #{inspect(params)}"
      end

      @doc false
      def terminate(_reason, _state) do
        raise "GenPool terminate/2 not implemented"
      end

      @doc false
      def handle_continue(params, _state) do
        raise "GenPool handle_continue/2 not implemented for #{inspect(params)}"
      end

      defoverridable GenPool
      defoverridable(start_link: 1, start_link: 2)
    end
  end

  def start_link(gen_pool, init_opts \\ [], process_opts \\ []) do
    default_opts = gen_pool.__default_opts__()
    opts = Keyword.merge([client_opts: init_opts], default_opts)

    gen_pool.start_link(opts, Keyword.merge(process_opts, name: default_opts[:pool_module]))
  end

  def call(gen_pool, params) do
    gen_pool.__broker__()
    |> do_call(params)
  end

  def cast(gen_pool, params) do
    gen_pool.__broker__()
    |> do_cast(params)
  end

  def stop(gen_pool, reason \\ :shutdown) do
    gen_pool.__broker__()
    |> do_cast({:__stop__, reason})
  end

  def get_state(gen_pool) do
    gen_pool.__broker__()
    |> do_call(:__get_state__)
  end

  def get_children(gen_pool) do
    GenServer.call(gen_pool, :get_children)
  end

  defp do_cast(broker, params) do
    case :sbroker.async_ask(broker, {:cast, self(), params}) do
      {:drop, _time} -> {:error, :too_many_requests}
      _ -> :ok
    end
  end

  defp do_call(broker, params) do
    case :sbroker.ask(broker, {:call, self(), params}) do
      {:go, ref, worker, _, _queue_time} ->
        monitor = Process.monitor(worker)

        receive do
          {^ref, result} ->
            Process.demonitor(monitor, [:flush])
            result

          {:DOWN, ^monitor, _, _, reason} ->
            {:error, reason}
        end

      {:drop, _time} ->
        {:error, :too_many_requests}
    end
  end
end