lib/opq.ex

defmodule OPQ do
  @moduledoc """
  A simple, in-memory queue with worker pooling and rate limiting in Elixir.
  """

  alias OPQ.{Options, Feeder, RateLimiter, WorkerSupervisor}
  alias OPQ.OptionsHandler, as: Opt

  def child_spec(opts \\ []) do
    %{
      id: opts[:name],
      start: {OPQ, :start_link, [opts]}
    }
  end

  def start_link(opts \\ []), do: init(opts)

  def init(opts \\ []) do
    opts
    |> Options.assign_defaults()
    |> start_links()
  end

  def enqueue(feeder, event) do
    GenStage.cast(feeder, {:enqueue, event})
  end

  def enqueue(feeder, mod, fun, args)
      when is_atom(mod) and
             is_atom(fun) and
             is_list(args) do
    enqueue(feeder, {mod, fun, args})
  end

  def stop(feeder) do
    Process.flag(:trap_exit, true)
    GenStage.cast(feeder, :stop)
    Opt.stop(feeder)
  end

  def pause(feeder), do: GenStage.cast(feeder, :pause)
  def resume(feeder), do: GenStage.cast(feeder, :resume)
  def info(feeder), do: GenStage.call(feeder, :info, Opt.timeout(feeder))
  def queue(feeder), do: GenStage.call(feeder, :queue, Opt.timeout(feeder))

  defp start_links(opts) do
    {:ok, feeder} = Feeder.start_link(opts[:name])

    Opt.save_opts(opts[:name] || feeder, opts)

    opts
    |> Keyword.merge(name: feeder)
    |> start_consumers(interval: opts[:interval])

    {:ok, feeder}
  end

  defp start_consumers(opts, interval: 0) do
    opts
    |> Keyword.merge(producer_consumer: opts[:name])
    |> WorkerSupervisor.start_link()
  end

  defp start_consumers(opts, _) do
    {:ok, rate_limiter} = RateLimiter.start_link(opts)

    opts
    |> Keyword.merge(producer_consumer: rate_limiter)
    |> WorkerSupervisor.start_link()
  end
end