lib/odd_job/pool/worker.ex

defmodule OddJob.Pool.Worker do
  @moduledoc """
  The `OddJob.Pool.Worker` is a `GenServer` that performs concurrent work as one of many
  members of an `OddJob.Pool`.

  The `OddJob.Pool.Worker` checks in with the pool and asks to be monitored upon startup. Once the worker is monitored
  it can start receiving jobs. In the unlikely event that the pool crashes, the worker will be notified
  and request to be monitored again when the pool restarts.
  """
  @moduledoc since: "0.1.0"

  use GenServer

  alias OddJob.Utils

  defstruct [:id, :pool, :queue]

  @type t :: %__MODULE__{
          id: non_neg_integer,
          pool: atom,
          queue: pid
        }

  @doc false
  def child_spec(opts) do
    opts
    |> super()
    |> Supervisor.child_spec(id: opts[:id])
  end

  @doc false
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl GenServer
  def init(opts) do
    queue = Utils.queue_name(opts[:pool]) |> GenServer.whereis()
    state = struct(__MODULE__, opts ++ [queue: queue])
    Process.monitor(queue)
    OddJob.Queue.monitor(queue, self())
    {:ok, state}
  end

  @impl GenServer
  def handle_cast({:do_perform, %{async: true, proxy: proxy} = job}, %{pool: pool} = state) do
    GenServer.call(proxy, :link_and_monitor)
    job = do_perform(pool, job)
    GenServer.call(proxy, {:complete, job})
    {:noreply, state}
  end

  def handle_cast({:do_perform, job}, %{pool: pool} = state) do
    do_perform(pool, job)
    {:noreply, state}
  end

  defp do_perform(pool, job) do
    results = job.function.()

    Utils.queue_name(pool)
    |> GenServer.cast({:complete, self()})

    %{job | results: results}
  end

  @impl GenServer
  def handle_info({:DOWN, _ref, :process, pid, _reason}, %{queue: queue} = state)
      when pid == queue do
    queue = check_for_new_queue(state.pool)
    Process.monitor(queue)
    OddJob.Queue.monitor(queue, self())

    {:noreply, %{state | queue: queue}}
  end

  defp check_for_new_queue(pool) do
    case Utils.queue_name(pool) |> GenServer.whereis() do
      nil -> check_for_new_queue(pool)
      pid -> pid
    end
  end
end