Skip to main content

lib/squidie/runtime/run_projection.ex

defmodule Squidie.Runtime.RunProjection do
  @moduledoc false

  alias Squidie.Runtime.DispatchProtocol.Entry

  @doc """
  Builds the stored run-summary payload for run index projections.
  """
  @spec summary(String.t(), String.t(), String.t(), DateTime.t()) :: map()
  def summary(run_id, workflow, queue, %DateTime{} = indexed_at) do
    Map.new(run_id: run_id, workflow: workflow, queue: queue, indexed_at: indexed_at)
  end

  @doc """
  Returns run summaries in stable indexed-at order.
  """
  @spec sorted_runs(map()) :: [map()]
  def sorted_runs(runs) when is_map(runs) do
    runs
    |> Map.values()
    |> Enum.sort_by(fn %{run_id: run_id, indexed_at: indexed_at} ->
      {DateTime.to_unix(indexed_at, :microsecond), run_id}
    end)
  end

  @doc """
  Returns the projected run ids in stable indexed-at order.
  """
  @spec run_ids(map()) :: [String.t()]
  def run_ids(runs) when is_map(runs) do
    runs
    |> sorted_runs()
    |> Enum.map(& &1.run_id)
  end

  @doc """
  Returns anomalies in chronological replay order.
  """
  @spec anomalies([map()]) :: [map()]
  def anomalies(anomalies) when is_list(anomalies), do: Enum.reverse(anomalies)

  @doc """
  Builds a projection anomaly from a journal entry and reason.
  """
  @spec anomaly(Entry.t(), atom()) :: map()
  def anomaly(%Entry{} = entry, reason) do
    data = entry_data(entry)

    %{reason: reason, entry_type: entry.type}
    |> maybe_put(:run_id, data)
    |> maybe_put(:workflow, data)
    |> maybe_put(:queue, data)
  end

  defp entry_data(%Entry{data: data}) when is_map(data), do: data
  defp entry_data(%Entry{}), do: %{}

  defp maybe_put(anomaly, key, data) do
    case Map.get(data, key) do
      value when is_binary(value) -> Map.put(anomaly, key, value)
      _value -> anomaly
    end
  end
end