lib/event_store_dashboard/components/events_table.ex

defmodule EventStoreDashboard.Components.EventsTable do
  @moduledoc false

  use Phoenix.Component

  import Phoenix.LiveDashboard.PageBuilder

  alias EventStoreDashboard.Components.{EventLink, EventModal, Pagination, TableParams}
  alias EventStoreDashboard.{Params, Repo}
  alias EventStoreDashboard.Repo.Context
  alias Phoenix.LiveDashboard.PageBuilder
  alias Phoenix.LiveView.Socket

  @page_param :events_page

  attr(:ctx, Context, required: true)
  attr(:page, PageBuilder, required: true)
  attr(:socket, Socket, required: true)
  attr(:page_number, :integer, required: true)
  attr(:event_number, :integer, default: nil)
  attr(:stream_uuid, :string, required: true)
  attr(:correlation_id, :string, default: nil)
  attr(:causation_id, :string, default: nil)

  def render(assigns) do
    params = assigns.page.params
    sort_dir = TableParams.parse_sort_dir(params, :asc)
    limit = TableParams.parse_limit(params)
    filter = active_filter(assigns)
    type_filter = event_type_pattern(TableParams.parse_search(params))

    result = fetch_result(filter, assigns, sort_dir, type_filter, limit)

    assigns =
      assign(assigns,
        page_param: @page_param,
        result: result,
        filter: filter
      )

    ~H"""
    <.filter_card
      :if={@filter}
      filter={@filter}
      ctx={@ctx}
      socket={@socket}
      page={@page}
    />
    <.live_table
      id="event_store_events_table"
      page={@page}
      title={events_title(@filter, @stream_uuid)}
      default_sort_by={:event_number}
      rows_name="events"
      row_attrs={&row_attrs(&1, @filter, @stream_uuid)}
      row_fetcher={&fetch_rows(&1, &2, @result)}
    >
      <:col :if={is_nil(@filter)} field={:event_number} header="Position" sortable={:asc} />
      <:col :if={is_nil(@filter)} field={:stream_version} header="Stream Version" :let={row}>
        <EventLink.render
          socket={@socket}
          page={@page}
          ctx={@ctx}
          stream_uuid={row[:stream_uuid]}
          event_number={row[:stream_version]}
          stop_propagation
        />
      </:col>
      <:col :if={not is_nil(@filter)} field={:event_number} header="Stream Version" sortable={:asc} :let={row}>
        <EventLink.render
          socket={@socket}
          page={@page}
          ctx={@ctx}
          stream_uuid={row[:stream_uuid]}
          event_number={row[:stream_version]}
          stop_propagation
        />
      </:col>
      <:col field={:event_type} header="Type" />
      <:col :if={not match?({:stream, _}, @filter)} field={:stream_uuid} header="Stream" :let={row}>
        <.link
          patch={stream_modal_path(@socket, @page, @ctx, row[:stream_uuid])}
          onclick="event.stopPropagation()"
          class="es-modal-trigger"
        >
          {row[:stream_uuid]}
        </.link>
      </:col>
      <:col field={:created_at} header="Created at" />
      <:col field={:event_id} header="Event ID" />
    </.live_table>
    <Pagination.render
      id="events-pagination"
      param={@page_param}
      page_number={@page_number}
      total_pages={@result.total_pages}
      socket={@socket}
      page={@page}
    />
    <.live_modal
      :if={@event_number}
      id="event-modal"
      title="Event"
      return_to={modal_return_to(@socket, @page)}
    >
      <.live_component
        module={EventModal}
        id="event-modal-content"
        page={@page}
        ctx={@ctx}
        stream_uuid={@stream_uuid}
        event_number={@event_number}
        return_to={modal_return_to(@socket, @page)}
      />
    </.live_modal>
    """
  end

  defp fetch_result({kind, value}, assigns, _sort_dir, type_filter, limit)
       when kind in [:correlation, :causation] do
    paginate_by_id(
      assigns.ctx,
      assigns.page.node,
      id_column(kind),
      value,
      type_filter,
      assigns.page_number,
      limit
    )
  end

  defp fetch_result(_filter, assigns, sort_dir, type_filter, limit) do
    read_stream(
      assigns.ctx,
      assigns.page.node,
      assigns.stream_uuid,
      sort_dir,
      type_filter,
      assigns.page_number,
      limit
    )
  end

  defp id_column(:correlation), do: :correlation_id
  defp id_column(:causation), do: :causation_id

  defp active_filter(%{correlation_id: id}) when is_binary(id), do: {:correlation, id}
  defp active_filter(%{causation_id: id}) when is_binary(id), do: {:causation, id}
  defp active_filter(%{stream_uuid: "$all"}), do: nil
  defp active_filter(%{stream_uuid: uuid}) when is_binary(uuid), do: {:stream, uuid}
  defp active_filter(_), do: nil

  defp events_title({_, _}, _), do: "Events"
  defp events_title(nil, _), do: "All stream events"

  attr(:filter, :any, required: true)
  attr(:ctx, Context, required: true)
  attr(:socket, Socket, required: true)
  attr(:page, PageBuilder, required: true)

  defp filter_card(assigns) do
    ~H"""
    <div class="card mb-4">
      <div class="card-body">
        <div class="d-flex justify-content-between align-items-center">
          <div>
            <strong>{filter_label(@filter)}:</strong>
            <%= case @filter do %>
              <% {:stream, uuid} -> %>
                <.link
                  patch={stream_modal_path(@socket, @page, @ctx, uuid)}
                  class="ml-2 es-modal-trigger"
                >
                  <code>{uuid}</code>
                </.link>
              <% {_, value} -> %>
                <code class="ml-2">{value}</code>
            <% end %>
          </div>
          <.link
            patch={clear_filter_path(@socket, @page, @filter)}
            class="btn btn-sm btn-outline-secondary"
          >
            Clear filter
          </.link>
        </div>
      </div>
    </div>
    """
  end

  defp filter_label({:correlation, _}), do: "Correlation ID"
  defp filter_label({:causation, _}), do: "Causation ID"
  defp filter_label({:stream, _}), do: "Stream"

  defp event_type_pattern(nil), do: nil
  defp event_type_pattern(""), do: nil
  defp event_type_pattern(value), do: "%" <> value <> "%"

  defp stream_modal_path(socket, page, %Context{} = ctx, stream_uuid) do
    Params.to_live_dashboard_path(socket, page, %Params{
      eventstore: ctx.event_store,
      stream_modal: stream_uuid
    })
  end

  defp modal_return_to(socket, page) do
    live_dashboard_path(socket, page, event: nil)
  end

  defp clear_filter_path(socket, page, {:stream, _}) do
    live_dashboard_path(socket, page,
      stream: nil,
      event: nil,
      events_page: nil
    )
  end

  defp clear_filter_path(socket, page, {kind, _}) when kind in [:correlation, :causation] do
    live_dashboard_path(socket, page,
      correlation_id: nil,
      causation_id: nil,
      event: nil,
      events_page: nil
    )
  end

  defp read_stream(%Context{} = ctx, node, stream_uuid, sort_dir, type_filter, page_number, limit) do
    offset = (page_number - 1) * limit

    case Repo.fetch_stream(node, ctx, stream_uuid) do
      {:ok, stream} ->
        total_entries = count_stream_events(node, ctx, stream, type_filter)
        total_pages = total_pages(total_entries, limit)
        entries = fetch_stream_entries(node, ctx, stream.stream_id, sort_dir, type_filter, limit, offset)

        %{entries: entries, total_entries: total_entries, total_pages: total_pages}

      _ ->
        %{entries: [], total_entries: 0, total_pages: 0}
    end
  end

  defp count_stream_events(_node, _ctx, stream, nil), do: stream.stream_version

  defp count_stream_events(node, %Context{} = ctx, stream, pattern) do
    case Repo.count_events_in_stream(node, ctx, stream.stream_id, pattern) do
      {:ok, n} -> n
      _ -> 0
    end
  end

  defp fetch_stream_entries(node, %Context{} = ctx, stream_id, sort_dir, type_filter, limit, offset) do
    case Repo.query_events_in_stream(node, ctx, stream_id, sort_dir, type_filter, limit, offset) do
      {:ok, rows} -> Enum.map(rows, &Repo.row_to_event(&1, ctx))
      _ -> []
    end
  end

  # Filter events by correlation_id or causation_id by querying the events table
  # directly. There's no default index on these columns — fine for diagnostic
  # use, slow on huge tables.
  defp paginate_by_id(%Context{} = ctx, node, column, value, type_filter, page_number, limit) do
    offset = (page_number - 1) * limit

    with {:ok, total} <-
           Repo.count_events_by_id(node, ctx, column, value, type_filter),
         {:ok, rows} <-
           query_events_by_id(node, ctx, column, value, type_filter, limit, offset) do
      total_pages = total_pages(total, limit)

      %{
        entries: Enum.map(rows, &Repo.row_to_event(&1, ctx)),
        total_entries: total,
        total_pages: total_pages
      }
    else
      _ -> %{entries: [], total_entries: 0, total_pages: 0}
    end
  end

  defp query_events_by_id(node, %Context{} = ctx, column, value, type_filter, limit, offset) do
    with {:ok, uuid} <- Repo.string_to_uuid(value) do
      {type_clause, params} = Repo.event_type_clause(type_filter, [uuid, limit, offset], 4)

      sql = """
      SELECT
        se.stream_version,
        e.event_id,
        s.stream_uuid,
        se.original_stream_version,
        e.event_type,
        e.correlation_id,
        e.causation_id,
        e.data,
        e.metadata,
        e.created_at
      FROM #{ctx.schema}.events e
      LEFT JOIN #{ctx.schema}.stream_events se
        ON se.event_id = e.event_id
        AND se.stream_id = se.original_stream_id
      LEFT JOIN #{ctx.schema}.streams s ON s.stream_id = se.original_stream_id
      WHERE e.#{column} = $1#{type_clause}
      ORDER BY e.created_at ASC
      LIMIT $2 OFFSET $3;
      """

      Repo.query(node, ctx.conn, sql, params)
    end
  end

  defp total_pages(0, _limit), do: 0
  defp total_pages(total, limit), do: div(total - 1, limit) + 1

  defp fetch_rows(_params, _node, result), do: {result.entries, result.total_entries}

  defp row_attrs(row, nil, stream_uuid) do
    [
      {"phx-click", "show_event"},
      {"phx-value-stream", stream_uuid},
      {"phx-value-event", row[:event_number]},
      {"phx-page-loading", true}
    ]
  end

  defp row_attrs(row, _filter, _stream_uuid) do
    [
      {"phx-click", "show_event"},
      {"phx-value-stream", row[:stream_uuid]},
      {"phx-value-event", row[:stream_version]},
      {"phx-page-loading", true}
    ]
  end
end