lib/jackalope/transient_work_list.ex

defmodule Jackalope.TransientWorkList do
  @moduledoc """
  A simple work list that does not survive across reboots.
  """

  alias Jackalope.WorkList.Expiration

  require Logger
  @default_max_size 100

  defstruct items: [],
            pending: %{},
            max_size: @default_max_size,
            expiration_fn: nil

  @type t() :: %__MODULE__{items: list(), max_size: non_neg_integer()}

  @doc "Create a new work list"
  @spec new(function(), function(), non_neg_integer(), Keyword.t()) :: t()
  def new(expiration_fn, _update_expiration_fn, max_size \\ @default_max_size, _opts \\ [])
      when max_size > 0 do
    %__MODULE__{max_size: max_size, expiration_fn: expiration_fn}
  end

  @doc false
  @spec prepend(t(), list()) :: t()
  def prepend(work_list, items) when is_list(items) do
    updated_items =
      (items ++ work_list.items)
      |> bound_work_items(work_list)

    %__MODULE__{work_list | items: updated_items}
  end

  @doc false
  @spec bound_pending_items(any, atom | t()) :: any
  def bound_pending_items(pending, work_list) do
    if Enum.count(pending) > work_list.max_size do
      # Trim expired pending requests
      kept_pairs =
        Enum.reduce(
          pending,
          [],
          fn {ref, item}, acc ->
            if Expiration.unexpired?(item, work_list.expiration_fn) do
              [{ref, item} | acc]
            else
              acc
            end
          end
        )

      # If still over maximum, remove the oldest pending request (expiration is smallest)
      if Enum.count(kept_pairs) > work_list.max_size do
        [{ref, item} | newer_pairs] =
          Enum.sort(kept_pairs, fn {_, item1}, {_, item2} ->
            work_list.expiration_fn.(item1) < work_list.expiration_fn.(item2)
          end)

        Logger.info(
          "[Jackalope] Maximum number of unexpired pending requests reached. Dropping #{inspect(item)}:#{inspect(ref)}."
        )

        newer_pairs
      else
        kept_pairs
      end
      |> Enum.into(%{})
    else
      pending
    end
  end

  @doc false
  @spec bound_work_items([any()], t()) :: [any()]
  def bound_work_items(items, work_list) do
    current_size = length(items)

    if current_size <= work_list.max_size do
      items
    else
      Logger.warn(
        "[Jackalope] The worklist exceeds #{work_list.max_size} (#{current_size}). Looking to shrink it."
      )

      {active_items, deleted_count} = remove_expired_work_items(items, work_list.expiration_fn)

      active_size = current_size - deleted_count

      if active_size <= work_list.max_size do
        active_items
      else
        {dropped, cropped_list} = List.pop_at(active_items, active_size - 1)

        Logger.warn("[Jackalope] Dropped #{inspect(dropped)}  from oversized work list")

        cropped_list
      end
    end
  end

  defp remove_expired_work_items(items, expiration_fn) do
    {list, count} =
      Enum.reduce(
        items,
        {[], 0},
        fn item, {active_list, deleted_count} ->
          if Expiration.unexpired?(item, expiration_fn),
            do: {[item | active_list], deleted_count},
            else: {active_list, deleted_count + 1}
        end
      )

    {Enum.reverse(list), count}
  end
end

defimpl Jackalope.WorkList, for: Jackalope.TransientWorkList do
  alias Jackalope.TransientWorkList
  require Logger

  @impl Jackalope.WorkList
  def push(work_list, item) do
    updated_items =
      [item | work_list.items]
      |> TransientWorkList.bound_work_items(work_list)

    %TransientWorkList{work_list | items: updated_items}
  end

  @impl Jackalope.WorkList
  def peek(work_list) do
    List.first(work_list.items)
  end

  @impl Jackalope.WorkList
  def pop(work_list) do
    %TransientWorkList{work_list | items: tl(work_list.items)}
  end

  @impl Jackalope.WorkList
  def pending(work_list, ref) do
    item = hd(work_list.items)

    updated_pending =
      Map.put(work_list.pending, ref, item) |> TransientWorkList.bound_pending_items(work_list)

    %TransientWorkList{
      work_list
      | items: tl(work_list.items),
        pending: updated_pending
    }
  end

  @impl Jackalope.WorkList
  def reset_pending(work_list) do
    pending_items = Map.values(work_list.pending)

    TransientWorkList.prepend(%TransientWorkList{work_list | pending: %{}}, pending_items)
  end

  @impl Jackalope.WorkList
  def done(work_list, ref) do
    {item, pending} = Map.pop(work_list.pending, ref)
    # item can be nil
    {%TransientWorkList{work_list | pending: pending}, item}
  end

  @impl Jackalope.WorkList
  def count(work_list) do
    length(work_list.items)
  end

  @impl Jackalope.WorkList
  def count_pending(work_list) do
    Enum.count(work_list.pending)
  end

  @impl Jackalope.WorkList
  def empty?(work_list) do
    work_list.items == []
  end

  @impl Jackalope.WorkList
  def remove_all(work_list) do
    %TransientWorkList{work_list | items: [], pending: %{}}
  end
end