lib/faktory_worker/sandbox.ex

defmodule FaktoryWorker.Sandbox do
  @moduledoc """
  When started with sandbox mode enabled, `FaktoryWorker` won't connect
  to a running Faktory instance or perform queued jobs, but will instead
  record all jobs enqueued locally.

  In general, this mode should only be used during testing. See
  [Sandbox Testing](sandbox-testing.html) to read more.
  """

  use GenServer

  alias FaktoryWorker.Job
  alias FaktoryWorker.JobSupervisor

  @typedoc "A concise description of a single call to a job module."
  @type job :: %{worker: module(), args: list(), opts: Keyword.t()}

  @doc false
  @spec start_link(Keyword.t()) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc "Returns true if sandbox mode is enabled."
  @spec active? :: boolean()
  def active?, do: !!GenServer.whereis(__MODULE__)

  @doc false
  @spec job_supervisor :: module()
  def job_supervisor, do: GenServer.call(__MODULE__, :job_supervisor)

  @doc false
  @spec enqueue_job(module(), list(), Keyword.t()) :: :ok
  def enqueue_job(worker_mod, args, opts) do
    GenServer.call(__MODULE__, {:enqueue, worker_mod, args, opts})
  end

  @doc false
  @spec find_jobs(module(), args: list(), opts: Keyword.t()) :: [job()]
  def find_jobs(worker_mod, filters) do
    GenServer.call(__MODULE__, {:find_jobs, worker_mod, filters})
  end

  @doc """
  Returns a list of all locally enqueued jobs across all job modules, in
  the order that they were enqueued. If you only want to retrieve jobs
  enqueued for a specific job module, use `all_jobs/1` instead.
  """
  @spec all_jobs :: [job()]
  def all_jobs, do: GenServer.call(__MODULE__, :all_jobs)

  @doc """
  Returns a list of all locally enqueued jobs for the given job module,
  in the order that they were enqueued. If you want to retrieve a list of
  all jobs across all job modules, use `all_jobs/0` instead.
  """
  @spec all_jobs(module()) :: [job()]
  def all_jobs(worker_mod), do: GenServer.call(__MODULE__, {:all_jobs, worker_mod})

  @doc "Clear the queue history for all job modules."
  @spec reset :: :ok
  def reset, do: GenServer.call(__MODULE__, :reset)

  @doc "Clear the queue history for the given job module."
  @spec reset(module()) :: :ok
  def reset(worker_mod), do: GenServer.call(__MODULE__, {:reset, worker_mod})

  @doc false
  @spec encode_args(term() | list(term())) :: list()
  def encode_args(args) when is_list(args) do
    with args <- Job.normalize_job_args(args),
         {:ok, encoded} <- Jason.encode(args),
         {:ok, decoded} <- Jason.decode(encoded) do
      decoded
    else
      {:error, reason} ->
        raise "unable to encode arguments: #{reason} (attempted to encode #{inspect(args)})"
    end
  end

  def encode_args(arg), do: encode_args([arg])

  # ---

  @doc false
  @impl true
  def init(opts) do
    {:ok, %{jobs: %{}, opts: opts}}
  end

  @doc false
  @impl true
  def handle_call(:job_supervisor, _from, state) do
    {:reply, JobSupervisor.format_supervisor_name(state.opts[:name]), state}
  end

  def handle_call({:enqueue, worker_mod, args, opts}, _from, state) do
    job = %{
      worker: worker_mod,
      args: encode_args(args),
      opts: opts
    }

    jobs = Map.update(state.jobs, worker_mod, [job], &Enum.concat(&1, [job]))
    {:reply, :ok, %{state | jobs: jobs}}
  end

  def handle_call({:find_jobs, worker_mod, filters}, _from, state) do
    jobs =
      Enum.filter(state.jobs[worker_mod] || [], fn %{args: args, opts: opts} ->
        match? =
          if filters[:args] do
            args == encode_args(filters[:args])
          else
            true
          end

        match? =
          if filters[:opts] do
            Enum.reduce(filters[:opts], match?, fn {key, val}, acc ->
              acc and opts[key] == val
            end)
          else
            match?
          end

        match?
      end)

    {:reply, jobs, state}
  end

  def handle_call(:all_jobs, _from, state) do
    jobs =
      state.jobs
      |> Map.values()
      |> List.flatten()

    {:reply, jobs, state}
  end

  def handle_call({:all_jobs, worker_mod}, _from, state) do
    {:reply, state.jobs[worker_mod] || [], state}
  end

  def handle_call(:reset, _from, state) do
    {:reply, :ok, %{state | jobs: %{}}}
  end

  def handle_call({:reset, worker_mod}, _from, state) do
    jobs = Map.update(state.jobs, worker_mod, %{}, fn _ -> %{} end)
    {:reply, :ok, %{state | jobs: jobs}}
  end
end