lib/odd_job/queue.ex

defmodule OddJob.Queue do
  @moduledoc """
  The `OddJob.Queue` is a `GenServer` that manages the assignments given to the pool workers.

  The `queue` receives jobs and assigns them to available workers. If all workers in a pool are
  currently busy then new jobs are added to a FIFO queue to be processed as workers
  become available.
  """
  @moduledoc since: "0.4.0"

  @doc false
  use GenServer

  alias OddJob.{Job, Utils}

  @spec __struct__ :: OddJob.Queue.t()
  defstruct [:pool, workers: [], assigned: [], jobs: []]

  @typedoc """
  The `OddJob.Queue` struct holds the state of the job queue.

    * `:pool` is a term representing the name of the job pool that the `queue` belongs to

    * `:workers` is a list of the active worker `pid`s, whether they are busy working or not

    * `:assigned` is a list of the worker `pid`s that are currently assigned to a job

    * `:jobs` is a list of `OddJob.Job` structs representing the jobs that are queued to be performed
      when workers are available
  """
  @typedoc since: "0.3.0"
  @type t :: %__MODULE__{
          pool: atom,
          workers: [pid],
          assigned: [pid],
          jobs: [job]
        }

  @type job :: Job.t()

  @doc false
  def start_link(pool_name) do
    GenServer.start_link(__MODULE__, pool_name, name: Utils.queue_name(pool_name))
  end

  @doc false
  @spec state(atom | pid) :: t
  def state(pool_name) do
    Utils.queue_name(pool_name)
    |> GenServer.call(:state)
  end

  @doc false
  @spec monitor(atom | pid, pid) :: :ok
  def monitor(queue, worker) when is_pid(queue), do: GenServer.cast(queue, {:monitor, worker})

  @impl GenServer
  @spec init(atom) :: {:ok, t}
  def init(pool_name) do
    state = struct(__MODULE__, pool: pool_name)
    {:ok, state}
  end

  @impl GenServer
  def handle_cast({:monitor, pid}, %{workers: workers, jobs: []} = state) do
    Process.monitor(pid)
    {:noreply, %{state | workers: workers ++ [pid]}}
  end

  def handle_cast(
        {:monitor, pid},
        %{workers: workers, jobs: jobs, assigned: assigned} = state
      ) do
    Process.monitor(pid)
    workers = workers ++ [pid]
    assigned = assigned ++ [pid]
    [job | rest] = jobs
    GenServer.cast(pid, {:do_perform, job})
    {:noreply, %{state | workers: workers, assigned: assigned, jobs: rest}}
  end

  def handle_cast({:complete, worker}, %{assigned: assigned, jobs: []} = state) do
    {:noreply, %{state | assigned: assigned -- [worker]}, _timeout = 10_000}
  end

  def handle_cast({:complete, worker}, %{jobs: jobs} = state) do
    [new_job | rest] = jobs
    GenServer.cast(worker, {:do_perform, new_job})
    {:noreply, %{state | jobs: rest}, _timeout = 10_000}
  end

  def handle_cast({:perform, job}, state) do
    state = do_perform(job, state)
    {:noreply, state}
  end

  def handle_cast({:perform_many, jobs}, state) do
    state = do_perform_many(jobs, state)
    {:noreply, state}
  end

  defp do_perform_many([], state), do: state

  defp do_perform_many(
         [job | rest] = new_jobs,
         %{jobs: jobs, assigned: assigned, workers: workers} = state
       ) do
    available = available_workers(workers, assigned)

    if available == [] do
      %{state | jobs: jobs ++ new_jobs}
    else
      [worker | _rest] = available
      GenServer.cast(worker, {:do_perform, job})
      do_perform_many(rest, %{state | assigned: assigned ++ [worker]})
    end
  end

  defp do_perform(job, %{jobs: jobs, assigned: assigned, workers: workers} = state) do
    available = available_workers(workers, assigned)

    if available == [] do
      %{state | jobs: jobs ++ [job]}
    else
      [worker | _rest] = available
      GenServer.cast(worker, {:do_perform, job})
      %{state | assigned: assigned ++ [worker]}
    end
  end

  defp available_workers(workers, assigned), do: workers -- assigned

  @impl GenServer
  def handle_call(:state, _from, state) do
    {:reply, state, state}
  end

  @impl GenServer
  def handle_info(
        {:DOWN, ref, :process, pid, _reason},
        %{workers: workers, assigned: assigned} = state
      ) do
    Process.demonitor(ref, [:flush])
    workers = workers -- [pid]
    assigned = assigned -- [pid]
    {:noreply, %{state | workers: workers, assigned: assigned}}
  end

  def handle_info(:timeout, state) do
    {:noreply, state, :hibernate}
  end
end