Skip to main content

lib/phoenix_kit_db.ex

defmodule PhoenixKitDb do
  @moduledoc """
  Database explorer module for PhoenixKit.

  Provides metadata, stats, and paginated previews for Postgres tables
  so the admin UI can browse data without exposing full SQL access.
  Live updates ride on Postgres `LISTEN/NOTIFY` via the
  `PhoenixKitDb.Listener` GenServer.

  ## Live Updates

  When a table is being viewed, changes to that table trigger automatic
  refreshes. This requires:

    1. The `Listener` GenServer running (started via the host's
       `PhoenixKit.Supervisor` from this module's `children/0` callback).
    2. A notification trigger on the table being viewed — installed
       lazily by `ensure_trigger/2` on first view.
  """

  use PhoenixKit.Module

  alias PhoenixKit.Dashboard.Tab
  alias PhoenixKit.RepoHelper
  alias PhoenixKit.Settings

  require Logger

  @enabled_key "db_enabled"
  @default_table_page 1
  @default_table_page_size 20
  @default_row_page 1
  @default_row_page_size 50
  @notify_channel "phoenix_kit_db_changes"
  @notify_function_name "phoenix_kit_notify_table_change"
  @trigger_prefix "phoenix_kit_db_change_"

  @textual_types ~w(text character varying character citext json jsonb uuid inet)

  @typedoc "An identifier accepted by the schema/table-name validator."
  @type identifier_string :: String.t()

  # ===========================================================================
  # Required callbacks
  # ===========================================================================

  @impl PhoenixKit.Module
  @spec module_key() :: String.t()
  def module_key, do: "db"

  @impl PhoenixKit.Module
  @spec module_name() :: String.t()
  def module_name, do: "DB"

  @impl PhoenixKit.Module
  @doc """
  Whether the DB module is enabled.

  Reads from the DB-backed settings table. Defensive against three
  failure modes that can hit before/around DB availability:

    - `rescue _`: DB not running, table missing, schema mismatch, etc.
    - `catch :exit, _`: connection pool checkout `EXIT` (e.g. when a
      test sandbox owner has just stopped — test-environment artifact,
      but harmless to handle in production code too).

  All branches return `false` so callers don't need to special-case
  startup ordering.
  """
  @spec enabled?() :: boolean()
  def enabled? do
    Settings.get_boolean_setting(@enabled_key, false)
  rescue
    _ -> false
  catch
    :exit, _ -> false
  end

  @impl PhoenixKit.Module
  @spec enable_system() :: term()
  def enable_system do
    result = Settings.update_boolean_setting_with_module(@enabled_key, true, module_key())
    log_module_toggle(:enabled)
    result
  end

  @impl PhoenixKit.Module
  @spec disable_system() :: term()
  def disable_system do
    result = Settings.update_boolean_setting_with_module(@enabled_key, false, module_key())
    log_module_toggle(:disabled)
    result
  end

  # ===========================================================================
  # Optional callbacks
  # ===========================================================================

  @impl PhoenixKit.Module
  @spec version() :: String.t()
  def version, do: "0.1.0"

  @impl PhoenixKit.Module
  @spec css_sources() :: [atom()]
  def css_sources, do: [:phoenix_kit_db]

  @impl PhoenixKit.Module
  @spec permission_metadata() :: %{
          key: String.t(),
          label: String.t(),
          icon: String.t(),
          description: String.t()
        }
  def permission_metadata do
    %{
      key: module_key(),
      label: "DB",
      icon: "hero-server-stack",
      description: "Database explorer and schema inspection"
    }
  end

  @impl PhoenixKit.Module
  @spec get_config() :: map()
  def get_config do
    stats = database_stats()

    %{
      enabled: enabled?(),
      table_count: stats.table_count,
      approx_rows: stats.approx_rows,
      total_size_bytes: stats.total_size_bytes,
      database_size_bytes: stats.database_size_bytes
    }
  end

  @impl PhoenixKit.Module
  @spec admin_tabs() :: [Tab.t()]
  def admin_tabs do
    [
      # Parent tab — match: :prefix keeps subtabs highlighted on any /db/* page.
      %Tab{
        id: :admin_db,
        label: "DB",
        icon: "hero-table-cells",
        path: "db",
        priority: 570,
        level: :admin,
        permission: module_key(),
        match: :prefix,
        group: :admin_modules,
        subtab_display: :when_active,
        highlight_with_subtabs: false,
        live_view: {PhoenixKitDb.Web.IndexLive, :index}
      },
      # Subtab — Overview at the same path as parent.
      %Tab{
        id: :admin_db_overview,
        label: "Overview",
        icon: "hero-table-cells",
        path: "db",
        priority: 571,
        level: :admin,
        permission: module_key(),
        match: :exact,
        parent: :admin_db,
        live_view: {PhoenixKitDb.Web.IndexLive, :index}
      },
      # Subtab — Activity feed (visible).
      %Tab{
        id: :admin_db_activity,
        label: "Activity",
        icon: "hero-signal",
        path: "db/activity",
        priority: 572,
        level: :admin,
        permission: module_key(),
        parent: :admin_db,
        live_view: {PhoenixKitDb.Web.ActivityLive, :activity}
      },
      # Hidden — Table detail page, reached by clicking a row in the index.
      %Tab{
        id: :admin_db_show,
        label: "Table",
        icon: "hero-table-cells",
        path: "db/:schema/:table",
        priority: 573,
        level: :admin,
        permission: module_key(),
        parent: :admin_db,
        visible: false,
        live_view: {PhoenixKitDb.Web.ShowLive, :show}
      }
    ]
  end

  @impl PhoenixKit.Module
  @spec children() :: [module()]
  def children, do: [PhoenixKitDb.Listener]

  # ============================================================================
  # Stats / table listing
  # ============================================================================

  @doc "Aggregated Postgres stats for all user tables."
  @spec database_stats() :: %{
          table_count: non_neg_integer(),
          approx_rows: non_neg_integer(),
          total_size_bytes: non_neg_integer(),
          database_size_bytes: non_neg_integer()
        }
  def database_stats do
    sql = """
    SELECT
      COUNT(*) AS table_count,
      COALESCE(SUM(n_live_tup), 0) AS approx_rows,
      COALESCE(SUM(pg_total_relation_size(relid)), 0) AS total_size_bytes,
      pg_database_size(current_database()) AS database_size_bytes
    FROM pg_stat_user_tables
    """

    case RepoHelper.query(sql) do
      {:ok, %{rows: [[table_count, approx_rows, total_size_bytes, db_size]]}} ->
        %{
          table_count: table_count,
          approx_rows: approx_rows,
          total_size_bytes: total_size_bytes,
          database_size_bytes: db_size
        }

      _ ->
        %{
          table_count: 0,
          approx_rows: 0,
          total_size_bytes: 0,
          database_size_bytes: 0
        }
    end
  end

  @doc "Lists tables + stats with pagination and search."
  @spec list_tables(map()) :: map()
  def list_tables(opts \\ %{}) do
    page = normalize_page(Map.get(opts, :page, @default_table_page))
    per_page = normalize_page_size(Map.get(opts, :per_page, @default_table_page_size))
    search = Map.get(opts, :search, "") |> to_string()
    offset = (page - 1) * per_page

    {where_sql, where_params} = table_search_clause(search)

    count_sql = "SELECT COUNT(*) FROM pg_stat_user_tables #{where_sql}"

    total_entries =
      case RepoHelper.query(count_sql, where_params) do
        {:ok, %{rows: [[count]]}} -> count
        _ -> 0
      end

    list_sql = """
    SELECT schemaname, relname, n_live_tup, pg_total_relation_size(relid)
    FROM pg_stat_user_tables
    #{where_sql}
    ORDER BY schemaname ASC, relname ASC
    LIMIT $#{length(where_params) + 1}
    OFFSET $#{length(where_params) + 2}
    """

    params = where_params ++ [per_page, offset]

    entries =
      case RepoHelper.query(list_sql, params) do
        {:ok, %{rows: rows}} -> Enum.map(rows, &row_to_table_entry/1)
        _ -> []
      end

    total_pages = max(div_with_ceiling(total_entries, per_page), 1)

    %{
      entries: entries,
      page: page,
      per_page: per_page,
      total_entries: total_entries,
      total_pages: total_pages
    }
  end

  defp row_to_table_entry([schema, name, approx_rows, size_bytes]) do
    %{schema: schema, name: name, approx_rows: approx_rows, size_bytes: size_bytes}
  end

  @doc """
  Fetches a single row by ID from a table.

  Returns `{:ok, row_map}` or `{:error, :not_found | :invalid_id |
  :invalid_identifier | term()}`. `:invalid_identifier` covers the case
  where `schema` or `table` contains characters that can't be safely
  quoted into SQL — this surfaces graceful "table not found" UX
  instead of a 500.
  """
  @spec fetch_row(identifier_string() | nil, identifier_string(), term()) ::
          {:ok, map()} | {:error, :not_found | :invalid_id | :invalid_identifier | term()}
  def fetch_row(schema, table, row_id) when is_binary(table) do
    schema = schema || "public"

    with {:ok, qualified} <- safe_qualified_table(schema, table),
         id when not is_nil(id) <- parse_row_id(row_id) do
      pk_col = RepoHelper.get_pk_column(qualified)

      case safe_quote_ident(pk_col) do
        {:ok, quoted_pk} ->
          fetch_row_by_pk(qualified, quoted_pk, id)

        {:error, _} = err ->
          err
      end
    else
      nil -> {:error, :invalid_id}
      {:error, _} = err -> err
    end
  end

  defp fetch_row_by_pk(qualified, quoted_pk, id) do
    sql = "SELECT * FROM #{qualified} WHERE #{quoted_pk} = $1 LIMIT 1"

    case RepoHelper.query(sql, [id]) do
      {:ok, %{columns: columns, rows: [row]}} ->
        {:ok, columns |> Enum.zip(row) |> Map.new()}

      {:ok, %{rows: []}} ->
        {:error, :not_found}

      {:error, reason} ->
        {:error, reason}
    end
  end

  defp parse_row_id(id) when is_integer(id), do: id

  defp parse_row_id(id) when is_binary(id) do
    case Integer.parse(id) do
      {int_id, ""} -> int_id
      _ -> if match?({:ok, _}, Ecto.UUID.cast(id)), do: id, else: nil
    end
  end

  defp parse_row_id(_), do: nil

  @doc """
  Returns table metadata and a row preview window.

  When `schema` or `table` is malformed, returns the empty preview
  shape rather than crashing — the LV uses this signal to render a
  graceful "table not found" message.
  """
  @spec table_preview(identifier_string() | nil, identifier_string(), map()) :: map()
  def table_preview(schema, table, opts \\ %{}) when is_binary(table) do
    schema = schema || "public"
    page = normalize_page(Map.get(opts, :page, @default_row_page))
    per_page = normalize_page_size(Map.get(opts, :per_page, @default_row_page_size), 10, 200)
    search = Map.get(opts, :search, "") |> to_string()

    with {:ok, qualified} <- safe_qualified_table(schema, table),
         true <- table_exists?(schema, table),
         columns when is_list(columns) <- fetch_columns(schema, table) do
      build_preview(schema, table, qualified, columns, page, per_page, search)
    else
      _ -> empty_preview(schema, table, page, per_page)
    end
  end

  defp build_preview(schema, table, qualified, columns, page, per_page, search) do
    {where_clause, search_params} = row_search_clause(search, columns)
    offset = (page - 1) * per_page

    total_rows = count_rows(qualified, where_clause, search_params)
    order_column = pick_order_column(columns)

    select_sql = """
    SELECT * FROM #{qualified}
    #{where_clause}
    ORDER BY #{order_column}
    LIMIT $#{length(search_params) + 1}
    OFFSET $#{length(search_params) + 2}
    """

    params = search_params ++ [per_page, offset]
    rows = fetch_preview_rows(select_sql, params)

    %{
      schema: schema,
      table: table,
      columns: columns,
      rows: rows,
      row_count: total_rows,
      page: page,
      per_page: per_page,
      total_pages: max(div_with_ceiling(total_rows, per_page), 1),
      approx_rows: get_table_stat(schema, table, :approx_rows),
      size_bytes: get_table_stat(schema, table, :size_bytes)
    }
  end

  defp empty_preview(schema, table, page, per_page) do
    %{
      schema: schema,
      table: table,
      columns: [],
      rows: [],
      row_count: 0,
      page: page,
      per_page: per_page,
      total_pages: 1,
      approx_rows: 0,
      size_bytes: 0
    }
  end

  defp count_rows(qualified, where_clause, search_params) do
    sql = "SELECT COUNT(*) FROM #{qualified} #{where_clause}"

    case RepoHelper.query(sql, search_params) do
      {:ok, %{rows: [[count]]}} -> count
      _ -> 0
    end
  end

  defp pick_order_column(columns) do
    col_names = Enum.map(columns, & &1.name)

    cond do
      "uuid" in col_names -> "uuid"
      "id" in col_names -> "id"
      true -> "ctid"
    end
  end

  defp fetch_preview_rows(select_sql, params) do
    case RepoHelper.query(select_sql, params) do
      {:ok, %{columns: cols, rows: rows}} -> materialise_rows(cols, rows)
      _ -> []
    end
  end

  defp materialise_rows(columns, rows) do
    Enum.map(rows, fn row -> columns |> Enum.zip(row) |> Map.new() end)
  end

  defp fetch_columns(schema, table) do
    sql = """
    SELECT column_name, data_type, is_nullable, ordinal_position
    FROM information_schema.columns
    WHERE table_schema = $1 AND table_name = $2
    ORDER BY ordinal_position
    """

    case RepoHelper.query(sql, [schema, table]) do
      {:ok, %{rows: rows}} -> Enum.map(rows, &row_to_column/1)
      _ -> []
    end
  end

  defp row_to_column([name, data_type, nullable, position]) do
    %{
      name: name,
      data_type: data_type,
      nullable: nullable == "YES",
      ordinal_position: position
    }
  end

  defp table_exists?(schema, table) do
    sql = """
    SELECT 1 FROM pg_stat_user_tables
    WHERE schemaname = $1 AND relname = $2
    LIMIT 1
    """

    case RepoHelper.query(sql, [schema, table]) do
      {:ok, %{num_rows: num_rows}} when num_rows > 0 -> true
      _ -> false
    end
  end

  defp get_table_stat(schema, table, field) do
    sql = """
    SELECT n_live_tup, pg_total_relation_size(relid)
    FROM pg_stat_user_tables
    WHERE schemaname = $1 AND relname = $2
    LIMIT 1
    """

    case RepoHelper.query(sql, [schema, table]) do
      {:ok, %{rows: [[approx_rows, size_bytes]]}} ->
        case field do
          :approx_rows -> approx_rows
          :size_bytes -> size_bytes
        end

      _ ->
        0
    end
  end

  # Returns `{:ok, "\"schema\".\"table\""}` when both identifiers are
  # safe to quote, `{:error, :invalid_identifier}` otherwise. Wrapping
  # the previously-raise-on-bad-input `quote_ident/1` in a tuple-shape
  # API lets callers (LV mounts, `fetch_row/3`) render a graceful
  # "table not found" message instead of bubbling `ArgumentError` into
  # a 500 page.
  defp safe_qualified_table(schema, table) do
    with {:ok, s} <- safe_quote_ident(schema),
         {:ok, t} <- safe_quote_ident(table) do
      {:ok, "#{s}.#{t}"}
    end
  end

  defp safe_quote_ident(name) when is_binary(name) do
    if Regex.match?(~r/^[a-zA-Z0-9_]+$/, name) do
      {:ok, ~s("#{name}")}
    else
      {:error, :invalid_identifier}
    end
  end

  defp safe_quote_ident(_), do: {:error, :invalid_identifier}

  # Internal-only — every call site lives in this module and passes
  # validated identifiers (via `safe_quote_ident/1` upstream).
  defp quote_ident!(name) do
    case safe_quote_ident(name) do
      {:ok, quoted} -> quoted
      {:error, _} -> raise ArgumentError, "invalid identifier: #{inspect(name)}"
    end
  end

  defp table_search_clause(""), do: {"", []}

  defp table_search_clause(search) do
    {"WHERE (schemaname ILIKE $1 OR relname ILIKE $1)", ["%" <> search <> "%"]}
  end

  defp row_search_clause("", _columns), do: {"", []}

  defp row_search_clause(search, columns) do
    text_columns =
      Enum.filter(columns, fn column ->
        data_type = String.downcase(column.data_type || "")
        data_type in @textual_types
      end)

    if text_columns == [] do
      {"", []}
    else
      pattern = "%" <> search <> "%"

      clauses =
        text_columns
        |> Enum.with_index(1)
        |> Enum.map(fn {column, idx} ->
          "CAST(#{quote_ident!(column.name)} AS TEXT) ILIKE $#{idx}"
        end)

      {
        "WHERE (" <> Enum.join(clauses, " OR ") <> ")",
        List.duplicate(pattern, length(clauses))
      }
    end
  end

  defp normalize_page(page) when is_integer(page) and page > 0, do: page

  defp normalize_page(page) when is_binary(page) do
    page
    |> Integer.parse()
    |> case do
      {value, _} when value > 0 -> value
      _ -> @default_table_page
    end
  end

  defp normalize_page(_), do: @default_table_page

  defp normalize_page_size(size, min \\ 5, max \\ 100)

  defp normalize_page_size(size, min, max) when is_integer(size) do
    size
    |> max(min)
    |> min(max)
  end

  defp normalize_page_size(size, min, max) when is_binary(size) do
    size
    |> Integer.parse()
    |> case do
      {value, _} -> normalize_page_size(value, min, max)
      _ -> normalize_page_size(@default_table_page_size, min, max)
    end
  end

  defp normalize_page_size(_, min, max),
    do: normalize_page_size(@default_table_page_size, min, max)

  defp div_with_ceiling(0, _per_page), do: 0

  defp div_with_ceiling(total, per_page) when per_page > 0 do
    div(total + per_page - 1, per_page)
  end

  # ============================================================================
  # Live-update triggers
  # ============================================================================

  @doc """
  Ensures the notification function exists and creates a trigger on
  the table.

  Idempotent. A concurrent caller racing to create the same trigger
  surfaces as `:ok` rather than an error — the win condition is "the
  trigger exists", and Postgres' `duplicate_object` reply on the loser
  is folded back into success.

  Returns `:ok` on success, `{:error, :invalid_identifier}` if the
  schema/table contain unsafe characters, or `{:error, reason}` on a
  Postgres error.
  """
  @spec ensure_trigger(identifier_string(), identifier_string()) ::
          :ok | {:error, :invalid_identifier | term()}
  def ensure_trigger(schema, table) do
    with {:ok, _qualified} <- safe_qualified_table(schema, table),
         :ok <- ensure_notify_function() do
      create_table_trigger(schema, table)
    end
  end

  @doc "Removes the notification trigger from a table."
  @spec remove_trigger(identifier_string(), identifier_string()) ::
          :ok | {:error, :invalid_identifier | term()}
  def remove_trigger(schema, table) do
    with {:ok, qualified} <- safe_qualified_table(schema, table) do
      trigger = trigger_name(schema, table)
      sql = "DROP TRIGGER IF EXISTS #{trigger} ON #{qualified}"

      case RepoHelper.query(sql) do
        {:ok, _} -> :ok
        {:error, reason} -> {:error, reason}
      end
    end
  end

  @doc "Whether a table has a notification trigger installed."
  @spec has_trigger?(identifier_string(), identifier_string()) :: boolean()
  def has_trigger?(schema, table) do
    trigger = trigger_name(schema, table)

    sql = """
    SELECT EXISTS (
      SELECT 1 FROM information_schema.triggers
      WHERE trigger_schema = $1
      AND event_object_table = $2
      AND trigger_name = $3
    )
    """

    case RepoHelper.query(sql, [schema, table, trigger]) do
      {:ok, %{rows: [[true]]}} -> true
      _ -> false
    end
  end

  @doc "Lists all tables that have notification triggers installed."
  @spec list_triggered_tables() :: [{String.t(), String.t()}]
  def list_triggered_tables do
    # @trigger_prefix is a hardcoded module attribute — not user input —
    # so direct interpolation into a LIKE pattern is safe by construction.
    sql = """
    SELECT trigger_schema, event_object_table
    FROM information_schema.triggers
    WHERE trigger_name LIKE '#{@trigger_prefix}%'
    GROUP BY trigger_schema, event_object_table
    ORDER BY trigger_schema, event_object_table
    """

    case RepoHelper.query(sql) do
      {:ok, %{rows: rows}} -> Enum.map(rows, fn [schema, table] -> {schema, table} end)
      _ -> []
    end
  end

  @doc "Removes all notification triggers from all tables."
  @spec remove_all_triggers() :: :ok
  def remove_all_triggers do
    Enum.each(list_triggered_tables(), fn {schema, table} ->
      remove_trigger(schema, table)
    end)
  end

  @doc false
  @spec notify_channel() :: String.t()
  def notify_channel, do: @notify_channel

  defp ensure_notify_function do
    sql = """
    CREATE OR REPLACE FUNCTION #{@notify_function_name}()
    RETURNS trigger AS $$
    DECLARE
      row_id TEXT;
    BEGIN
      -- Try uuid first (Category A tables), then id (Category B), then empty
      IF TG_OP = 'DELETE' THEN
        BEGIN
          row_id := OLD.uuid::TEXT;
        EXCEPTION WHEN undefined_column THEN
          BEGIN
            row_id := OLD.id::TEXT;
          EXCEPTION WHEN undefined_column THEN
            row_id := '';
          END;
        END;
      ELSE
        BEGIN
          row_id := NEW.uuid::TEXT;
        EXCEPTION WHEN undefined_column THEN
          BEGIN
            row_id := NEW.id::TEXT;
          EXCEPTION WHEN undefined_column THEN
            row_id := '';
          END;
        END;
      END IF;

      -- Payload format: schema.table:operation:row_id
      PERFORM pg_notify('#{@notify_channel}', TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME || ':' || TG_OP || ':' || COALESCE(row_id, ''));

      -- AFTER triggers ignore return value, but we return appropriately for completeness
      IF TG_OP = 'DELETE' THEN
        RETURN OLD;
      ELSE
        RETURN NEW;
      END IF;
    END;
    $$ LANGUAGE plpgsql;
    """

    case RepoHelper.query(sql) do
      {:ok, _} -> :ok
      {:error, reason} -> {:error, reason}
    end
  end

  defp create_table_trigger(schema, table) do
    if has_trigger?(schema, table) do
      :ok
    else
      do_create_trigger(schema, table)
    end
  end

  defp do_create_trigger(schema, table) do
    trigger = trigger_name(schema, table)
    qualified = qualified_table(schema, table)

    sql = """
    CREATE TRIGGER #{trigger}
    AFTER INSERT OR UPDATE OR DELETE ON #{qualified}
    FOR EACH ROW
    EXECUTE FUNCTION #{@notify_function_name}();
    """

    case RepoHelper.query(sql) do
      {:ok, _} ->
        :ok

      # Concurrent ensure_trigger lost the race — the trigger is now present,
      # which is what we wanted. Return :ok rather than propagate.
      {:error, %Postgrex.Error{postgres: %{code: :duplicate_object}}} ->
        :ok

      {:error, reason} ->
        {:error, reason}
    end
  end

  defp qualified_table(schema, table) do
    "#{quote_ident!(schema)}.#{quote_ident!(table)}"
  end

  defp trigger_name(schema, table) do
    safe_schema = String.replace(schema, ~r/[^a-zA-Z0-9_]/, "_")
    safe_table = String.replace(table, ~r/[^a-zA-Z0-9_]/, "_")
    "#{@trigger_prefix}#{safe_schema}_#{safe_table}"
  end

  # ============================================================================
  # Activity logging
  # ============================================================================

  defp log_module_toggle(state) when state in [:enabled, :disabled] do
    if Code.ensure_loaded?(PhoenixKit.Activity) do
      PhoenixKit.Activity.log(%{
        action: "db.module_#{state}",
        module: module_key(),
        mode: "manual",
        resource_type: "module",
        metadata: %{}
      })
    end
  rescue
    Postgrex.Error ->
      :ok

    DBConnection.OwnershipError ->
      :ok

    e ->
      Logger.warning("[PhoenixKitDb] Activity logging error: #{Exception.message(e)}")
      {:error, e}
  catch
    :exit, _reason -> :ok
  end
end