Skip to main content

lib/core/workers/worker_pool.ex

defmodule Core.Workers.WorkerPool do
  @moduledoc """
  Supervisor that manages a pool of concurrent workers.

  Pool size defaults to the number of online schedulers (CPU cores).
  Configure via the `:worker_pool_size` application env:

      config :servcore, worker_pool_size: 4

  Or pass at startup:

      {Core.Workers.WorkerPool, name: MyApp.Pool, worker: MyApp.Worker, size: 4, queue: MyApp.Queue}
  """
  use Supervisor
  require Logger

  def start_link(opts \\ []) do
    name = Keyword.get(opts, :name, __MODULE__)
    Supervisor.start_link(__MODULE__, opts, name: name)
  end

  @impl true
  def init(opts) do
    size =
      Keyword.get(opts, :size) ||
        Application.get_env(:servcore, :worker_pool_size) ||
        System.schedulers_online()

    worker_module = Keyword.get(opts, :worker, Core.Workers.Worker)
    queue_name = Keyword.get(opts, :queue, Core.Workers.JobQueue)
    pool_name = Keyword.get(opts, :name, __MODULE__)

    Code.ensure_loaded!(worker_module)

    unless function_exported?(worker_module, :start_link, 1) do
      raise ArgumentError,
            "#{inspect(worker_module)} must implement start_link/1 accepting [id: integer()]"
    end

    Logger.info("WorkerPool #{inspect(pool_name)} starting #{size} workers (module: #{worker_module})")

    children =
      for i <- 1..size do
        worker_opts = [id: i, queue: queue_name, pool: pool_name]

        Supervisor.child_spec(
          {worker_module, worker_opts},
          id: {worker_module, i}
        )
      end

    Supervisor.init(children, strategy: :one_for_one)
  end
end