lib/event_store_dashboard/components/subscriptions_table.ex

defmodule EventStoreDashboard.Components.SubscriptionsTable do
  @moduledoc false

  use Phoenix.LiveComponent

  import Phoenix.LiveDashboard.PageBuilder

  alias EventStoreDashboard.Components.{EventLink, Pagination, SubscriptionModal, TableParams}
  alias EventStoreDashboard.{Params, Repo, Subscription}
  alias EventStoreDashboard.Repo.Context

  @page_param :subscriptions_page
  @sort_columns ~w(subscription_id stream_uuid subscription_name last_seen created_at)

  @impl true
  def mount(socket) do
    {:ok, assign(socket, :lag_snapshot, %{})}
  end

  @impl true
  def update(assigns, socket) do
    result =
      paginate_subscriptions(
        assigns.ctx,
        assigns.page.node,
        assigns.page_number,
        assigns.page.params
      )

    prev_snapshot = socket.assigns.lag_snapshot

    entries = Enum.map(result.entries, &apply_lag_trend(&1, prev_snapshot))
    next_snapshot = Map.new(entries, &subscription_lag/1)

    {:ok,
     socket
     |> assign(assigns)
     |> assign(
       page_param: @page_param,
       result: %{result | entries: entries},
       lag_snapshot: next_snapshot
     )}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div>
      <.live_table
        id="event_store_subscriptions_table"
        page={@page}
        title="Subscriptions"
        default_sort_by={:subscription_id}
        rows_name="subscriptions"
        row_attrs={&row_attrs/1}
        row_fetcher={&fetch_rows(&1, &2, @result)}
      >
        <:col field={:subscription_id} header="ID" sortable={:asc} />
        <:col field={:stream_uuid} header="Stream" sortable={:asc} :let={row}>
          <.link
            patch={stream_modal_path(@socket, @page, @ctx, row[:stream_uuid])}
            onclick="event.stopPropagation()"
            class="es-modal-trigger"
          >
            {row[:stream_uuid]}
          </.link>
        </:col>
        <:col field={:subscription_name} header="Name" sortable={:asc} />
        <:col field={:lag} header="Lag" :let={row}>
          <.lag_badge lag={row[:lag]} trend={row[:trend]} />
        </:col>
        <:col field={:last_seen} header="Last seen" sortable={:asc} :let={row}>
          <EventLink.render
            socket={@socket}
            page={@page}
            ctx={@ctx}
            stream_uuid={row[:stream_uuid]}
            event_number={row[:last_seen]}
            stop_propagation
          />
        </:col>
        <:col field={:created_at} header="Created at" sortable={:asc} />
      </.live_table>
      <Pagination.render
        id="subscriptions-pagination"
        param={@page_param}
        page_number={@page_number}
        total_pages={@result.total_pages}
        socket={@socket}
        page={@page}
      />
      <.live_modal
        :if={@subscription_id}
        id="subscription-modal"
        title="Subscription"
        return_to={modal_return_to(@socket, @page)}
      >
        <SubscriptionModal.render
          page={@page}
          ctx={@ctx}
          subscription_id={@subscription_id}
          socket={@socket}
        />
      </.live_modal>
    </div>
    """
  end

  attr(:lag, :integer, required: true)
  attr(:trend, :atom, required: true)

  defp lag_badge(assigns) do
    ~H"""
    <span class={"badge #{lag_class(@lag)}"}>
      {@lag}<.trend_arrow trend={@trend} />
    </span>
    """
  end

  attr(:trend, :atom, required: true)

  defp trend_arrow(%{trend: :catching_up} = assigns),
    do: ~H|<span title="Catching up" class="ml-1">&#x2193;</span>|

  defp trend_arrow(%{trend: :falling_behind} = assigns),
    do: ~H|<span title="Falling behind" class="ml-1">&#x2191;</span>|

  defp trend_arrow(assigns), do: ~H||

  defp lag_trend(nil, _current), do: :unknown
  defp lag_trend(prev, current) when current < prev, do: :catching_up
  defp lag_trend(prev, current) when current > prev, do: :falling_behind
  defp lag_trend(_prev, _current), do: :stable

  defp lag_class(0), do: "badge-success"
  defp lag_class(lag) when lag < 100, do: "badge-secondary"
  defp lag_class(lag) when lag < 10_000, do: "badge-warning"
  defp lag_class(_), do: "badge-danger"

  defp paginate_subscriptions(nil, _node, _page_number, _url_params),
    do: %{entries: [], total_entries: 0, total_pages: 0}

  defp paginate_subscriptions(%Context{} = ctx, node, page_number, url_params) do
    sort_by = TableParams.parse_sort_by(url_params, @sort_columns, :subscription_id)
    sort_dir = TableParams.parse_sort_dir(url_params, :asc)
    limit = TableParams.parse_limit(url_params)

    search_term = url_params |> TableParams.parse_search() |> like_pattern()
    offset = (page_number - 1) * limit

    with {:ok, total_entries} <- count_subscriptions(node, ctx, search_term),
         {:ok, rows} <-
           query_subscriptions(
             node,
             ctx,
             sort_by,
             sort_dir,
             search_term,
             limit,
             offset
           ) do
      total_pages = if total_entries == 0, do: 0, else: div(total_entries - 1, limit) + 1

      %{
        entries: Enum.map(rows, &row_to_subscription/1),
        total_entries: total_entries,
        total_pages: total_pages
      }
    else
      _ -> %{entries: [], total_entries: 0, total_pages: 0}
    end
  end

  defp count_subscriptions(node, %Context{} = ctx, search_term) do
    {where, params} = search_clause(search_term, [], 1)
    sql = "SELECT COUNT(*) FROM #{ctx.schema}.subscriptions s#{where};"

    case Repo.query(node, ctx.conn, sql, params) do
      {:ok, [[count]]} -> {:ok, count}
      _ -> :error
    end
  end

  defp query_subscriptions(node, %Context{} = ctx, sort_by, sort_dir, search_term, limit, offset) do
    {where, params} = search_clause(search_term, [limit, offset], 3)

    sql = """
    SELECT
      s.subscription_id,
      s.stream_uuid,
      s.subscription_name,
      s.last_seen,
      s.created_at,
      st.stream_version
    FROM #{ctx.schema}.subscriptions s
    LEFT JOIN #{ctx.schema}.streams st ON st.stream_uuid = s.stream_uuid#{where}
    ORDER BY s.#{sort_by} #{sort_dir_sql(sort_dir)}
    LIMIT $1 OFFSET $2;
    """

    Repo.query(node, ctx.conn, sql, params)
  end

  defp row_to_subscription([
         subscription_id,
         stream_uuid,
         subscription_name,
         last_seen,
         created_at,
         stream_version
       ]) do
    version = stream_version || 0
    seen = last_seen || 0

    %Subscription{
      subscription_id: subscription_id,
      stream_uuid: stream_uuid,
      subscription_name: subscription_name,
      last_seen: last_seen,
      stream_version: version,
      lag: max(version - seen, 0),
      created_at: created_at
    }
  end

  defp apply_lag_trend(%Subscription{} = entry, prev_snapshot) do
    %{entry | trend: lag_trend(prev_snapshot[entry.subscription_id], entry.lag)}
  end

  defp subscription_lag(%Subscription{} = entry), do: {entry.subscription_id, entry.lag}

  defp fetch_rows(_params, _node, result), do: {result.entries, result.total_entries}

  defp like_pattern(nil), do: nil
  defp like_pattern(term), do: "%" <> term <> "%"

  defp search_clause(nil, base_params, _index), do: {"", base_params}

  defp search_clause(term, base_params, index),
    do: {" WHERE s.subscription_name ILIKE $#{index} OR s.stream_uuid ILIKE $#{index}", base_params ++ [term]}

  defp sort_dir_sql(:asc), do: "ASC"
  defp sort_dir_sql(:desc), do: "DESC"

  defp row_attrs(row) do
    [
      {"phx-click", "show_subscription"},
      {"phx-value-subscription_id", row[:subscription_id]},
      {"phx-page-loading", true}
    ]
  end

  defp modal_return_to(socket, page) do
    live_dashboard_path(socket, page, subscription_id: nil)
  end

  defp stream_modal_path(socket, page, %Context{} = ctx, stream_uuid) do
    Params.to_live_dashboard_path(socket, page, %Params{
      eventstore: ctx.event_store,
      stream_modal: stream_uuid,
      subscription_id: nil
    })
  end
end