lib/gen_pool.ex

defmodule GenPool do
  @moduledoc """
  Documentation for `GenPool`.
  """

  @type new_state :: term

  @callback init(init_arg :: term) ::
              {:ok, state}
              | {:ok, state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | :ignore
              | {:stop, reason :: any}
            when state: any

  @callback handle_call(request :: term, pid(), state :: term) ::
              {:reply, reply, new_state}
              | {:reply, reply, new_state,
                 timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason, reply, new_state}
              | {:stop, reason, new_state}
            when reply: term, new_state: term, reason: term

  @callback handle_cast(request :: term, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason :: term, new_state}

  @callback handle_info(msg :: :timeout | term, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg :: term}}
              | {:stop, reason :: term, new_state}
            when new_state: term

  @callback handle_continue(continue_arg, state :: term) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout | :hibernate | {:continue, continue_arg}}
              | {:stop, reason :: term, new_state}
            when new_state: term, continue_arg: term

  @callback terminate(reason, state :: term) :: term
            when reason: :normal | :shutdown | {:shutdown, term} | term

  @optional_callbacks terminate: 2,
                      handle_info: 2,
                      handle_cast: 2,
                      handle_call: 3,
                      handle_continue: 2

  @doc false
  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      @behaviour GenPool

      @broker Keyword.get(opts, :broker, GenPool.Broker)
      @backend Keyword.get(opts, :backend, %GenPool.Backend.Ets{})
      @pool_size Keyword.get(opts, :pool_size, 5)

      @doc false
      def start_link(opts \\ []) do
        spawn_link(fn ->
          GenPool.Supervisor.start_link(opts)

          backend = GenPool.Backend.new(@backend)

          for _i <- 1..@pool_size do
            __start_child__(backend, self(), opts)
          end

          GenPool.cast(__MODULE__, {:__init__, opts})

          __main_heartbeat__(backend, opts)
        end)
      end

      def __broker__, do: @broker

      defp __main_heartbeat__(backend, opts) do
        receive do
          {:DOWN, _, :process, _, reason} ->
            __start_child__(backend, self(), opts)

            __main_heartbeat__(backend, opts)
        end
      end

      defp __start_child__(backend, parent_pid, opts) do
        pid = spawn_link(fn ->
          tag = make_ref()

          __process_loop__(tag, backend, parent_pid, opts)
        end)

        Process.monitor(pid)
      end

      defp __process_loop__(tag, backend, parent_pid, opts) do
        {:await, ^tag, _} = :sbroker.async_ask_r(@broker, self(), {self(), tag})

        state = GenPool.Backend.get(backend)

        receive do
          {^tag, {:go, ref, {:cast, origin_pid, {:__stop__, reason}}, _, _}} ->
            IO.inspect("stopping")
            {:stop, reason, state}
            |> __handle_reply__(backend, parent_pid, origin_pid, ref)

            __process_loop__(tag, backend, parent_pid, opts)

          {^tag, {:go, ref, {:cast, origin_pid, {:__continue__, params}}, _, _}} ->
            handle_continue(params, state)
            |> __handle_reply__(backend, parent_pid, origin_pid, ref)

            __process_loop__(tag, backend, parent_pid, opts)

          {^tag, {:go, ref, {:cast, _from, {:__init__, opts}}, _, _}} ->
            case init(opts) do
              {:ok, state} ->
                GenPool.Backend.put(backend, state)

              {:ok, state, {:continue, params}} ->
                GenPool.Backend.put(backend, state)
                GenPool.cast(__MODULE__, {:__continue__, params})
            end

            __process_loop__(tag, backend, parent_pid, opts)

          {^tag, {:go, ref, {:call, origin_pid, params}, _, _}} ->
            handle_call(params, origin_pid, state)
            |> __handle_reply__(backend, parent_pid, origin_pid, ref)

            __process_loop__(tag, backend, parent_pid, opts)

          {^tag, {:go, ref, {:cast, origin_pid, params}, _, _}} = sss ->
            handle_cast(params, state)
            |> __handle_reply__(backend, parent_pid, origin_pid, ref)

            __process_loop__(tag, backend, parent_pid, opts)

          {:EXIT, _pid, reason} ->
            terminate(reason, state)
            exit(reason)

          message ->
            handle_info(message, state)
            |> __handle_reply__(backend, parent_pid)

            __process_loop__(tag, backend, parent_pid, opts)
        end
      end

      defp __handle_reply__(reply, backend, parent_pid, origin_pid \\ nil, ref \\ nil) do
        case reply do
          {:reply, response, state} ->
            GenPool.Backend.put(backend, state)

            if not is_nil(origin_pid) and not is_nil(ref) do
              send(origin_pid, {ref, response})
            end

          {:reply, response, state, _opts} ->
            GenPool.Backend.put(backend, state)

            if not is_nil(origin_pid) and not is_nil(ref) do
              send(origin_pid, {ref, response})
            end

          {:stop, reason, state, _opts} ->
            GenPool.Backend.put(backend, state)
            Process.exit(parent_pid, reason)

          {:stop, reason, state} ->
            GenPool.Backend.put(backend, state)
            Process.exit(parent_pid, reason)

          {:noreply, state} ->
            GenPool.Backend.put(backend, state)

          {:noreply, state, _opts} ->
            GenPool.Backend.put(backend, state)
        end

        :ok
      end

      @doc false
      def init(opts), do: {:ok, nil}

      @doc false
      def handle_call(params, _from, _state) do
        raise "GenPool handle_call/3 not implemented for #{inspect(params)}"
      end

      @doc false
      def handle_cast(params, _state) do
        raise "GenPool handle_cast/2 not implemented for #{inspect(params)}"
      end

      @doc false
      def handle_info(params, _state) do
        raise "GenPool handle_info/2 not implemented for #{inspect(params)}"
      end

      @doc false
      def terminate(_reason, _state) do
        raise "GenPool terminate/2 not implemented"
      end

      @doc false
      def handle_continue(params, _state) do
        raise "GenPool handle_continue/2 not implemented for #{inspect(params)}"
      end

      defoverridable GenPool
    end
  end

  def call(gen_pool, params) do
    gen_pool.__broker__()
    |> do_call(params)
  end

  def cast(gen_pool, params) do
    gen_pool.__broker__()
    |> do_cast(params)
  end

  def stop(gen_pool, reason \\ :shutdown) do
    gen_pool.__broker__()
    |> do_cast({:__stop__, reason})
  end

  defp do_cast(broker, params) do
    case :sbroker.async_ask(broker, {:cast, self(), params}) do
      {:drop, _time} -> {:error, :overloaded}
      _ -> :ok
    end
  end

  defp do_call(broker, params) do
    case :sbroker.ask(broker, {:call, self(), params}) do
      {:go, ref, worker, _, _queue_time} ->
        monitor = Process.monitor(worker)

        receive do
          {^ref, result} ->
            Process.demonitor(monitor, [:flush])
            result

          {:DOWN, ^monitor, _, _, reason} ->
            {:error, :process_down, reason}
        end

      {:drop, _time} ->
        {:error, :too_many_requests}
    end
  end
end