defmodule Lantern do
@moduledoc """
An embeddable Postgres viewer/editor.
Lantern is a self-contained data layer plus a `Phoenix.LiveComponent`
(`Lantern.Explorer`) that lets you browse and edit any Postgres
database from a connection you supply. It is connection-agnostic: hand it a
URL string, keyword options, or a struct exposing host/port/username/password
(see `Lantern.Source`) and it opens short-lived connections on demand.
This module is the public data API. It performs introspection
(`list_schemas/1`, `list_tables/2`, `columns/3`, `primary_keys/3`), reads (`query/3`), and safe,
primary-key-scoped writes (`insert/3`, `update/4`, `delete/3`). Writes send
values as text parameters cast to each column's type, so no value is ever
interpolated into SQL.
"""
alias Lantern.Connection
alias Lantern.Coercion
alias Lantern.SQL
@default_limit 100
@type source :: Lantern.Source.t() | String.t() | keyword() | map()
@type column :: %{
name: String.t(),
type: String.t(),
udt: String.t(),
nullable: boolean(),
enum_values: [String.t()] | nil,
fk: %{schema: String.t(), table: String.t(), column: String.t()} | nil
}
@type table_stat :: %{
name: String.t(),
total_bytes: non_neg_integer(),
table_bytes: non_neg_integer(),
index_bytes: non_neg_integer(),
total_size: String.t(),
table_size: String.t(),
index_size: String.t()
}
@type table_info :: %{
schema: String.t(),
name: String.t(),
stats: table_stat() | nil,
estimated_rows: non_neg_integer(),
row_level_security?: boolean(),
columns: [column()],
primary_keys: [String.t()],
constraints: [%{name: String.t(), type: String.t(), definition: String.t()}],
indexes: [%{name: String.t(), definition: String.t()}]
}
# ---------------------------------------------------------------------------
# Introspection
# ---------------------------------------------------------------------------
@doc "Lists user-visible schemas that contain base tables."
@spec list_schemas(source()) :: {:ok, [String.t()]} | {:error, String.t()}
def list_schemas(source) do
sql = """
SELECT DISTINCT table_schema
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('pg_catalog', 'information_schema')
AND table_schema NOT LIKE 'pg_toast%'
ORDER BY table_schema
"""
Connection.run(source, fn conn ->
with {:ok, %{rows: rows}} <- run_sql(conn, sql, []) do
{:ok, Enum.map(rows, &hd/1)}
end
end)
end
@doc "Lists views in a schema. Defaults to `public`."
@spec list_views(source(), keyword()) :: {:ok, [String.t()]} | {:error, String.t()}
def list_views(source, opts \\ []) do
schema = schema_opt(opts)
sql = """
SELECT table_name
FROM information_schema.views
WHERE table_schema = $1
ORDER BY table_name
"""
Connection.run(source, fn conn ->
with {:ok, %{rows: rows}} <- run_sql(conn, sql, [schema]) do
{:ok, Enum.map(rows, &hd/1)}
end
end)
end
@doc "Lists enum types in a schema. Defaults to `public`."
@spec list_enums(source(), keyword()) ::
{:ok, [%{name: String.t(), values: [String.t()]}]} | {:error, String.t()}
def list_enums(source, opts \\ []) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
enums = fetch_enums(conn, schema)
{:ok, Enum.map(enums, fn {name, values} -> %{name: name, values: values} end)}
end)
end
@doc "Lists base tables in a schema. Defaults to `public`."
@spec list_tables(source(), keyword()) :: {:ok, [String.t()]} | {:error, String.t()}
def list_tables(source, opts \\ []) do
schema = schema_opt(opts)
sql = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
Connection.run(source, fn conn ->
with {:ok, %{rows: rows}} <- run_sql(conn, sql, [schema]) do
{:ok, Enum.map(rows, &hd/1)}
end
end)
end
@doc "Lists base tables with total/table/index size metadata."
@spec table_stats(source(), keyword()) :: {:ok, [table_stat()]} | {:error, String.t()}
def table_stats(source, opts \\ []) do
schema = schema_opt(opts)
sql = """
SELECT
c.relname,
pg_total_relation_size(c.oid) AS total_bytes,
pg_relation_size(c.oid) AS table_bytes,
pg_indexes_size(c.oid) AS index_bytes,
pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
pg_size_pretty(pg_indexes_size(c.oid)) AS index_size
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relkind IN ('r', 'p')
ORDER BY c.relname
"""
Connection.run(source, fn conn ->
with {:ok, %{rows: rows}} <- run_sql(conn, sql, [schema]) do
{:ok,
Enum.map(rows, fn [
name,
total_bytes,
table_bytes,
index_bytes,
total_size,
table_size,
index_size
] ->
%{
name: name,
total_bytes: total_bytes,
table_bytes: table_bytes,
index_bytes: index_bytes,
total_size: total_size,
table_size: table_size,
index_size: index_size
}
end)}
end
end)
end
@doc "Returns detailed metadata for one table."
@spec table_info(source(), String.t(), keyword()) :: {:ok, table_info()} | {:error, String.t()}
def table_info(source, table, opts \\ []) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, pks} <- fetch_primary_keys(conn, schema, table),
{:ok, stats} <- fetch_table_stat(conn, schema, table),
{:ok, rel} <- fetch_table_relation(conn, schema, table),
{:ok, constraints} <- fetch_constraints(conn, schema, table),
{:ok, indexes} <- fetch_indexes(conn, schema, table) do
{:ok,
%{
schema: schema,
name: table,
stats: stats,
estimated_rows: rel.estimated_rows,
row_level_security?: rel.row_level_security?,
columns: cols,
primary_keys: pks,
constraints: constraints,
indexes: indexes
}}
end
end)
end
@doc "Returns column metadata (name, type, udt, nullability) for a table."
@spec columns(source(), String.t(), keyword()) :: {:ok, [column()]} | {:error, String.t()}
def columns(source, table, opts \\ []) do
Connection.run(source, fn conn -> fetch_columns(conn, schema_opt(opts), table) end)
end
@doc "Returns the ordered primary-key column names for a table (may be empty)."
@spec primary_keys(source(), String.t(), keyword()) ::
{:ok, [String.t()]} | {:error, String.t()}
def primary_keys(source, table, opts \\ []) do
Connection.run(source, fn conn -> fetch_primary_keys(conn, schema_opt(opts), table) end)
end
# ---------------------------------------------------------------------------
# Read
# ---------------------------------------------------------------------------
@doc """
Reads rows from `table` with optional filtering, sorting, and pagination.
## Options
* `:schema` — Postgres schema/namespace. Default `"public"`
* `:where_clause` — raw SQL fragment without `WHERE` (operator's own DB)
* `:filters` — safe filter descriptors (`%{column:, op:, value:}`) combined with AND
* `:sort_by` — column name (validated against the live column list)
* `:sort_dir` — `:asc` (default) or `:desc`
* `:limit` — default #{@default_limit}
* `:offset` — default 0
* `:count` — `:exact` (default) or `false` to skip `COUNT(*)`
Returns `{:ok, %{columns: [name], rows: [[value]], count: integer | nil, count_kind: atom}}`.
"""
@spec query(source(), String.t(), keyword()) :: {:ok, map()} | {:error, String.t()}
def query(source, table, opts \\ []) do
schema = schema_opt(opts)
where_clause = Keyword.get(opts, :where_clause)
filters = Keyword.get(opts, :filters, [])
sort_by = Keyword.get(opts, :sort_by)
sort_dir = Keyword.get(opts, :sort_dir, :asc)
limit = Keyword.get(opts, :limit, @default_limit)
offset = Keyword.get(opts, :offset, 0)
count_mode = Keyword.get(opts, :count, :exact)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
names = Enum.map(cols, & &1.name),
:ok <- ensure_sort_valid(sort_by, names),
{:ok, filter_clause, filter_params} <- build_filter_clause(filters, cols),
{:ok, effective_where, params} <-
combine_where(where_clause, filter_clause, filter_params) do
{select_sql, _} =
SQL.select(schema, table, effective_where, sort_by, sort_dir, limit, offset)
with {:ok, %{columns: result_cols, rows: rows}} <- run_sql(conn, select_sql, params),
{:ok, count, count_kind} <-
query_count(conn, schema, table, effective_where, params, count_mode) do
{:ok, %{columns: result_cols, rows: rows, count: count, count_kind: count_kind}}
end
end
end)
end
@doc """
Runs an operator-supplied SQL statement and returns its columns and rows.
This is intentionally a low-level escape hatch for trusted database workspaces.
Embedders should expose it only to operators who are allowed to run arbitrary SQL
against the configured connection.
"""
@spec run_query(source(), String.t(), keyword()) :: {:ok, map()} | {:error, String.t()}
def run_query(source, sql, opts \\ []) when is_binary(sql) do
params = Keyword.get(opts, :params, [])
Connection.run(source, fn conn ->
with {:ok, %{columns: columns, rows: rows}} <- run_sql(conn, sql, params) do
{:ok, %{columns: columns || [], rows: rows || []}}
end
end)
end
# ---------------------------------------------------------------------------
# Write
# ---------------------------------------------------------------------------
@doc """
Inserts a row. `values` is a `%{column => value}` map (value `nil` → NULL).
Returns `{:ok, inserted_row_map}` keyed by column name.
"""
@spec insert(source(), String.t(), map(), keyword()) :: {:ok, map()} | {:error, term()}
def insert(source, table, values, opts \\ []) when is_map(values) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, fields} <- build_fields(values, cols),
{:ok, {sql, params}} <- SQL.insert(schema, table, fields),
{:ok, result} <- run_sql(conn, sql, params) do
{:ok, single_row(result)}
end
end)
end
@doc """
Updates one row identified by its primary key.
`changes` is `%{column => value}`; `key` is `%{pk_column => value}`. Requires
the table to have a primary key and `key` to cover it exactly.
"""
@spec update(source(), String.t(), map(), map(), keyword()) :: {:ok, map()} | {:error, term()}
def update(source, table, changes, key, opts \\ []) when is_map(changes) and is_map(key) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, pks} <- fetch_primary_keys(conn, schema, table),
:ok <- ensure_key_matches(key, pks),
{:ok, set_fields} <- build_fields(changes, cols),
{:ok, pk_fields} <- build_fields(key, cols),
{:ok, {sql, params}} <- SQL.update(schema, table, set_fields, pk_fields),
{:ok, result} <- run_sql(conn, sql, params) do
{:ok, single_row(result)}
end
end)
end
@doc """
Applies update/delete changes in one database transaction.
Changes are maps with either `%{action: :update, changes: map, key: map}` or
`%{action: :delete, keys: [map]}`. Returns `{:ok, count}` for applied change
groups, or rolls the transaction back and returns the first error.
"""
@spec apply_changes(source(), String.t(), [map()], keyword()) ::
{:ok, non_neg_integer()} | {:error, term()}
def apply_changes(source, table, changes, opts \\ []) when is_list(changes) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, pks} <- fetch_primary_keys(conn, schema, table) do
case Postgrex.transaction(conn, fn tx ->
Enum.reduce_while(changes, {:ok, 0}, fn change, {:ok, count} ->
case apply_change(tx, schema, table, change, cols, pks) do
:ok -> {:cont, {:ok, count + 1}}
{:error, reason} -> Postgrex.rollback(tx, reason)
end
end)
end) do
{:ok, {:ok, count}} -> {:ok, count}
{:error, reason} -> {:error, reason}
end
end
end)
end
@doc """
Deletes one or more rows. `keys` is a list of `%{pk_column => value}` maps.
Returns `{:ok, deleted_count}`.
"""
@spec delete(source(), String.t(), [map()], keyword()) ::
{:ok, non_neg_integer()} | {:error, term()}
def delete(source, table, keys, opts \\ [])
def delete(_source, _table, [], _opts), do: {:ok, 0}
def delete(source, table, keys, opts) when is_list(keys) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, pks} <- fetch_primary_keys(conn, schema, table),
:ok <- ensure_keys_match(keys, pks),
{:ok, rows} <- build_key_rows(keys, cols),
{:ok, {sql, params}} <- SQL.delete(schema, table, rows),
{:ok, %{num_rows: n}} <- run_sql(conn, sql, params) do
{:ok, n}
end
end)
end
# ---------------------------------------------------------------------------
# Schema changes (DDL)
# ---------------------------------------------------------------------------
@doc """
Creates a table. `columns` is a list of column specs (see
`t:Lantern.SQL.column/0`); at least one is required.
Returns `:ok` or `{:error, message}` — both validation failures (blank name,
no columns, unsupported type) and Postgres errors come back as a message
string ready to show the operator.
"""
@spec create_table(source(), String.t(), [SQL.column()], keyword()) ::
:ok | {:error, String.t()}
def create_table(source, table, columns, opts \\ []) when is_list(columns) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
{:ok, statement} <- build_ddl(SQL.create_table(schema, table, columns)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Drops a table. Returns `:ok` or `{:error, message}`."
@spec drop_table(source(), String.t(), keyword()) :: :ok | {:error, String.t()}
def drop_table(source, table, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
{:ok, statement} <- build_ddl(SQL.drop_table(schema, table)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Adds a column. `column` is a column spec. Returns `:ok` or `{:error, message}`."
@spec add_column(source(), String.t(), SQL.column(), keyword()) :: :ok | {:error, String.t()}
def add_column(source, table, column, opts \\ []) when is_map(column) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
{:ok, statement} <- build_ddl(SQL.add_column(schema, table, column)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Drops a column. Returns `:ok` or `{:error, message}`."
@spec drop_column(source(), String.t(), String.t(), keyword()) :: :ok | {:error, String.t()}
def drop_column(source, table, column, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Column name", column),
{:ok, statement} <- build_ddl(SQL.drop_column(schema, table, column)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Changes a column's type. Returns `:ok` or `{:error, message}`."
@spec alter_column_type(source(), String.t(), String.t(), String.t(), keyword()) ::
:ok | {:error, String.t()}
def alter_column_type(source, table, column, type, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Column name", column),
{:ok, statement} <- build_ddl(SQL.alter_column_type(schema, table, column, type)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Sets whether a column accepts NULL. Returns `:ok` or `{:error, message}`."
@spec set_column_nullable(source(), String.t(), String.t(), boolean(), keyword()) ::
:ok | {:error, String.t()}
def set_column_nullable(source, table, column, nullable, opts \\ [])
when is_boolean(nullable) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Column name", column),
{:ok, statement} <- build_ddl(SQL.set_column_nullable(schema, table, column, nullable)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Renames a column. Returns `:ok` or `{:error, message}`."
@spec rename_column(source(), String.t(), String.t(), String.t(), keyword()) ::
:ok | {:error, String.t()}
def rename_column(source, table, from, to, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Column name", from),
:ok <- validate_name("New column name", to),
{:ok, statement} <- build_ddl(SQL.rename_column(schema, table, from, to)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Drops a table constraint. Returns `:ok` or `{:error, message}`."
@spec drop_constraint(source(), String.t(), String.t(), keyword()) :: :ok | {:error, String.t()}
def drop_constraint(source, table, constraint_name, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Constraint name", constraint_name),
{:ok, statement} <- build_ddl(SQL.drop_constraint(schema, table, constraint_name)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Creates an index on a table. Returns `:ok` or `{:error, message}`."
@spec create_index(source(), String.t(), String.t(), [String.t()], keyword()) ::
:ok | {:error, String.t()}
def create_index(source, table, index_name, columns, opts \\ []) when is_list(columns) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("Index name", index_name),
:ok <- validate_column_names(columns),
{:ok, statement} <- build_ddl(SQL.create_index(schema, table, index_name, columns)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Drops an index. Returns `:ok` or `{:error, message}`."
@spec drop_index(source(), String.t(), keyword()) :: :ok | {:error, String.t()}
def drop_index(source, index_name, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Index name", index_name),
{:ok, statement} <- build_ddl(SQL.drop_index(schema, index_name)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
@doc "Renames a table. Returns `:ok` or `{:error, message}`."
@spec rename_table(source(), String.t(), String.t(), keyword()) :: :ok | {:error, String.t()}
def rename_table(source, table, new_name, opts \\ []) do
schema = schema_opt(opts)
with :ok <- validate_name("Schema name", schema),
:ok <- validate_name("Table name", table),
:ok <- validate_name("New table name", new_name),
{:ok, statement} <- build_ddl(SQL.rename_table(schema, table, new_name)) do
Connection.run(source, fn conn -> exec_ddl(conn, statement) end)
end
end
# ---------------------------------------------------------------------------
# Private — introspection queries
# ---------------------------------------------------------------------------
defp apply_change(
conn,
schema,
table,
%{action: :update, changes: changes, key: key},
cols,
pks
) do
with :ok <- ensure_key_matches(key, pks),
{:ok, set_fields} <- build_fields(changes, cols),
{:ok, pk_fields} <- build_fields(key, cols),
{:ok, {sql, params}} <- SQL.update(schema, table, set_fields, pk_fields),
{:ok, _result} <- run_sql(conn, sql, params) do
:ok
end
end
defp apply_change(conn, schema, table, %{action: :delete, keys: keys}, cols, pks) do
with :ok <- ensure_keys_match(keys, pks),
{:ok, rows} <- build_key_rows(keys, cols),
{:ok, {sql, params}} <- SQL.delete(schema, table, rows),
{:ok, _result} <- run_sql(conn, sql, params) do
:ok
end
end
defp apply_change(_conn, _schema, _table, _change, _cols, _pks), do: {:error, :invalid_change}
defp fetch_table_stat(conn, schema, table) do
sql = """
SELECT
c.relname,
pg_total_relation_size(c.oid) AS total_bytes,
pg_relation_size(c.oid) AS table_bytes,
pg_indexes_size(c.oid) AS index_bytes,
pg_size_pretty(pg_total_relation_size(c.oid)) AS total_size,
pg_size_pretty(pg_relation_size(c.oid)) AS table_size,
pg_size_pretty(pg_indexes_size(c.oid)) AS index_size
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind IN ('r', 'p')
"""
case run_sql(conn, sql, [schema, table]) do
{:ok,
%{
rows: [[name, total_bytes, table_bytes, index_bytes, total_size, table_size, index_size]]
}} ->
{:ok,
%{
name: name,
total_bytes: total_bytes,
table_bytes: table_bytes,
index_bytes: index_bytes,
total_size: total_size,
table_size: table_size,
index_size: index_size
}}
{:ok, %{rows: []}} ->
{:ok, nil}
other ->
other
end
end
defp fetch_table_relation(conn, schema, table) do
sql = """
SELECT GREATEST(c.reltuples::bigint, 0), c.relrowsecurity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind IN ('r', 'p')
"""
case run_sql(conn, sql, [schema, table]) do
{:ok, %{rows: [[estimated_rows, row_level_security?]]}} ->
{:ok, %{estimated_rows: estimated_rows, row_level_security?: row_level_security?}}
{:ok, %{rows: []}} ->
{:ok, %{estimated_rows: 0, row_level_security?: false}}
other ->
other
end
end
defp fetch_constraints(conn, schema, table) do
sql = """
SELECT con.conname,
CASE con.contype
WHEN 'p' THEN 'PRIMARY KEY'
WHEN 'f' THEN 'FOREIGN KEY'
WHEN 'u' THEN 'UNIQUE'
WHEN 'c' THEN 'CHECK'
WHEN 'x' THEN 'EXCLUDE'
ELSE con.contype::text
END,
pg_get_constraintdef(con.oid, true)
FROM pg_constraint con
JOIN pg_class c ON c.oid = con.conrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2
ORDER BY con.contype DESC, con.conname
"""
case run_sql(conn, sql, [schema, table]) do
{:ok, %{rows: rows}} ->
{:ok,
Enum.map(rows, fn [name, type, definition] ->
%{name: name, type: type, definition: definition}
end)}
other ->
other
end
end
defp fetch_indexes(conn, schema, table) do
sql = """
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = $1 AND tablename = $2
ORDER BY indexname
"""
case run_sql(conn, sql, [schema, table]) do
{:ok, %{rows: rows}} ->
{:ok, Enum.map(rows, fn [name, definition] -> %{name: name, definition: definition} end)}
other ->
other
end
end
defp fetch_columns(conn, schema, table) do
sql = """
SELECT column_name, data_type, udt_name, is_nullable
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
"""
case run_sql(conn, sql, [schema, table]) do
{:ok, %{rows: []}} ->
{:error, "Table not found: #{table}"}
{:ok, %{rows: rows}} ->
enums = fetch_enums(conn, schema)
fks = fetch_foreign_keys(conn, schema, table)
cols =
Enum.map(rows, fn [name, type, udt, nullable] ->
%{
name: name,
type: type,
udt: udt,
nullable: nullable == "YES",
enum_values: Map.get(enums, udt),
fk: Map.get(fks, name)
}
end)
{:ok, cols}
other ->
other
end
end
# Maps each foreign-key column to the table/column it references, so the UI
# can offer a lookup dropdown instead of a raw id field. Composite foreign
# keys are deliberately skipped: the information_schema join below produces
# a cross product across all (local, referenced) pairs in the constraint, so
# we can't reliably pair them by ordinal position. Treating composite-FK
# columns as plain text is safer than offering a dropdown that might write
# the wrong value into the wrong column.
defp fetch_foreign_keys(conn, schema, table) do
sql = """
SELECT tc.constraint_name, kcu.column_name, ccu.table_schema, ccu.table_name, ccu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1 AND tc.table_name = $2
"""
case run_sql(conn, sql, [schema, table]) do
{:ok, %{rows: rows}} ->
rows
|> Enum.group_by(fn [constraint, _, _, _, _] -> constraint end)
|> Enum.flat_map(fn
# Single-column FK → 1 cross-product row → safe to expose.
{_constraint, [[_, col, fschema, ftable, fcol]]} ->
[{col, %{schema: fschema, table: ftable, column: fcol}}]
# Composite FK (or anything that produced > 1 row) → skip.
_ ->
[]
end)
|> Map.new()
_ ->
%{}
end
end
@doc """
Returns up to `limit` `{value, label}` options from a referenced table, for
rendering a foreign-key lookup. Labels prefer a human-readable column
(name/title/email/…) and fall back to the referenced key itself.
"""
@spec reference_options(source(), String.t(), String.t(), pos_integer(), keyword()) ::
{:ok, [{String.t(), String.t()}]} | {:error, term()}
def reference_options(source, ftable, fcolumn, limit \\ 100, opts \\ []) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
do_reference_options(conn, schema, ftable, fcolumn, limit)
end)
end
defp do_reference_options(conn, schema, ftable, fcolumn, limit) do
# Clamp the caller-supplied limit so a bad value can't end up in the SQL
# string. (Identifiers are already quoted via SQL.quote_ident.)
limit = if is_integer(limit) and limit > 0, do: min(limit, 1000), else: 100
with {:ok, cols} <- fetch_columns(conn, schema, ftable) do
names = Enum.map(cols, & &1.name)
label = Enum.find(~w(name title label display_name email username slug), &(&1 in names))
label_col = label || fcolumn
sql =
"SELECT #{SQL.quote_ident(fcolumn)}, #{SQL.quote_ident(label_col)} " <>
"FROM #{SQL.quote_table(schema, ftable)} ORDER BY 2 LIMIT #{limit}"
fcol_type = Enum.find_value(cols, &if(&1.name == fcolumn, do: &1.type))
case run_sql(conn, sql, []) do
{:ok, %{rows: rows}} ->
{:ok, Enum.map(rows, &reference_row(&1, label != nil, fcol_type))}
other ->
other
end
end
end
@doc """
Loads everything `Lantern.Explorer` needs to display a table — columns
(typed), primary keys, and pre-fetched FK lookup options — using a single
Postgres connection instead of N+1.
"""
@spec schema(source(), String.t(), keyword()) ::
{:ok, %{columns: [column()], primary_keys: [String.t()], fk_options: map()}}
| {:error, String.t()}
def schema(source, table, opts \\ []) do
schema = schema_opt(opts)
Connection.run(source, fn conn ->
with {:ok, cols} <- fetch_columns(conn, schema, table),
{:ok, pks} <- fetch_primary_keys(conn, schema, table) do
fk_options =
for %{name: name, fk: fk} <- cols, fk != nil, into: %{} do
case do_reference_options(
conn,
Map.get(fk, :schema, schema),
fk.table,
fk.column,
100
) do
{:ok, opts} -> {name, opts}
_ -> {name, nil}
end
end
{:ok, %{columns: cols, primary_keys: pks, fk_options: fk_options}}
end
end)
end
defp reference_row([value, label], labeled?, fcol_type) do
value_str = Coercion.edit_value(value, fcol_type)
text = if labeled?, do: "#{Coercion.edit_value(label)} — #{value_str}", else: value_str
{value_str, text}
end
# Maps each enum type name to its ordered labels, so enum columns can render
# as dropdowns instead of free text.
defp fetch_enums(conn, schema) do
# Scope to the selected schema so two enums with the same name in
# different schemas don't merge their labels.
sql = """
SELECT t.typname, e.enumlabel
FROM pg_type t
JOIN pg_enum e ON e.enumtypid = t.oid
JOIN pg_namespace n ON n.oid = t.typnamespace
WHERE n.nspname = $1
ORDER BY t.typname, e.enumsortorder
"""
case run_sql(conn, sql, [schema]) do
{:ok, %{rows: rows}} ->
# Build with prepends + reverse, not list ++, to keep this O(n).
rows
|> Enum.reduce(%{}, fn [name, label], acc ->
Map.update(acc, name, [label], &[label | &1])
end)
|> Map.new(fn {name, labels} -> {name, Enum.reverse(labels)} end)
_ ->
%{}
end
end
defp fetch_primary_keys(conn, schema, table) do
sql = """
SELECT a.attname
FROM pg_index i
JOIN pg_class c ON c.oid = i.indrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE c.relname = $1 AND n.nspname = $2 AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum)
"""
case run_sql(conn, sql, [table, schema]) do
{:ok, %{rows: rows}} -> {:ok, Enum.map(rows, &hd/1)}
other -> other
end
end
# ---------------------------------------------------------------------------
# Private — field building / validation
# ---------------------------------------------------------------------------
defp build_fields(values, cols) do
types = Map.new(cols, fn c -> {c.name, c} end)
Enum.reduce_while(values, {:ok, []}, fn {column, value}, {:ok, acc} ->
name = to_string(column)
case Map.fetch(types, name) do
{:ok, col} ->
field = %{column: name, value: value, cast: Coercion.cast_expr(col.type, col.udt)}
{:cont, {:ok, acc ++ [field]}}
:error ->
{:halt, {:error, "Unknown column: #{name}"}}
end
end)
end
defp build_key_rows(keys, cols) do
Enum.reduce_while(keys, {:ok, []}, fn key, {:ok, acc} ->
case build_fields(key, cols) do
{:ok, fields} -> {:cont, {:ok, acc ++ [fields]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp build_filter_clause([], _cols), do: {:ok, nil, []}
defp build_filter_clause(filters, cols) when is_list(filters) do
names = MapSet.new(Enum.map(cols, & &1.name))
filters
|> Enum.reject(&empty_filter?/1)
|> Enum.reduce_while({[], []}, fn filter, {parts, params} ->
column = filter |> Map.get(:column, Map.get(filter, "column")) |> to_string()
op = filter |> Map.get(:op, Map.get(filter, "op")) |> normalize_op()
value = Map.get(filter, :value, Map.get(filter, "value"))
cond do
not MapSet.member?(names, column) ->
{:halt, {:error, "Invalid filter column: #{column}"}}
op in [:is_null, :is_not_null] ->
sql_op = if op == :is_null, do: "IS NULL", else: "IS NOT NULL"
{:cont, {parts ++ ["#{SQL.quote_ident(column)} #{sql_op}"], params}}
op in [:eq, :neq, :gt, :lt, :gte, :lte, :contains] ->
idx = length(params) + 1
{sql_op, param} = filter_op(op, value)
{:cont, {parts ++ ["#{SQL.quote_ident(column)} #{sql_op} $#{idx}"], params ++ [param]}}
true ->
{:halt, {:error, "Invalid filter operator: #{inspect(op)}"}}
end
end)
|> case do
{:error, reason} -> {:error, reason}
{[], []} -> {:ok, nil, []}
{parts, params} -> {:ok, Enum.join(parts, " AND "), params}
end
end
defp build_filter_clause(_filters, _cols), do: {:error, "Invalid filters"}
defp empty_filter?(filter) do
value = Map.get(filter, :value, Map.get(filter, "value"))
op = filter |> Map.get(:op, Map.get(filter, "op")) |> normalize_op()
op not in [:is_null, :is_not_null] and (is_nil(value) or value == "")
end
defp normalize_op(op)
when op in [:eq, :neq, :gt, :lt, :gte, :lte, :contains, :is_null, :is_not_null],
do: op
defp normalize_op("eq"), do: :eq
defp normalize_op("neq"), do: :neq
defp normalize_op("gt"), do: :gt
defp normalize_op("lt"), do: :lt
defp normalize_op("gte"), do: :gte
defp normalize_op("lte"), do: :lte
defp normalize_op("contains"), do: :contains
defp normalize_op("is_null"), do: :is_null
defp normalize_op("is_not_null"), do: :is_not_null
defp normalize_op(_), do: :invalid
defp filter_op(:eq, value), do: {"=", value}
defp filter_op(:neq, value), do: {"<>", value}
defp filter_op(:gt, value), do: {">", value}
defp filter_op(:lt, value), do: {"<", value}
defp filter_op(:gte, value), do: {">=", value}
defp filter_op(:lte, value), do: {"<=", value}
defp filter_op(:contains, value), do: {"ILIKE", "%#{value}%"}
defp combine_where(nil, nil, _params), do: {:ok, nil, []}
defp combine_where("", nil, _params), do: {:ok, nil, []}
defp combine_where(raw, nil, _params), do: {:ok, raw, []}
defp combine_where(nil, safe, params), do: {:ok, safe, params}
defp combine_where("", safe, params), do: {:ok, safe, params}
defp combine_where(_raw, _safe, _params),
do: {:error, "Use either raw SQL filter or safe filters, not both"}
defp query_count(_conn, _schema, _table, _where_clause, _params, false), do: {:ok, nil, :none}
defp query_count(conn, schema, table, where_clause, params, _mode) do
{count_sql, _} = SQL.count(schema, table, where_clause)
with {:ok, %{rows: [[count]]}} <- run_sql(conn, count_sql, params) do
{:ok, count, :exact}
end
end
defp schema_opt(opts), do: Keyword.get(opts, :schema, "public") |> to_string()
defp ensure_sort_valid(nil, _names), do: :ok
defp ensure_sort_valid(col, names) do
if col in names, do: :ok, else: {:error, "Invalid sort column: #{col}"}
end
defp ensure_key_matches(_key, []), do: {:error, :no_primary_key}
defp ensure_key_matches(key, pks) do
provided = key |> Map.keys() |> Enum.map(&to_string/1) |> Enum.sort()
if provided == Enum.sort(pks), do: :ok, else: {:error, :key_mismatch}
end
defp ensure_keys_match(keys, pks) do
Enum.reduce_while(keys, :ok, fn key, :ok ->
case ensure_key_matches(key, pks) do
:ok -> {:cont, :ok}
error -> {:halt, error}
end
end)
end
# ---------------------------------------------------------------------------
# Private — execution helpers
# ---------------------------------------------------------------------------
defp run_sql(conn, sql, params) do
case Postgrex.query(conn, sql, params) do
{:ok, result} -> {:ok, result}
{:error, error} -> {:error, format_error(error)}
end
end
# Translates a SQL builder result into an executable statement or a
# human-readable validation message, before any connection is opened.
defp build_ddl({:ok, statement}), do: {:ok, statement}
defp build_ddl({:error, reason}), do: {:error, ddl_error(reason)}
defp exec_ddl(conn, {sql, params}) do
with {:ok, _result} <- run_sql(conn, sql, params), do: :ok
end
defp validate_column_names([]), do: {:error, "Add at least one index column"}
defp validate_column_names(columns) do
Enum.reduce_while(columns, :ok, fn column, :ok ->
case validate_name("Column name", column) do
:ok -> {:cont, :ok}
error -> {:halt, error}
end
end)
end
defp validate_name(label, name) do
if is_binary(name) and String.trim(name) != "" do
:ok
else
{:error, "#{label} can't be blank"}
end
end
defp ddl_error(:no_columns), do: "Add at least one column"
defp ddl_error(:missing_name), do: "Every column needs a name"
defp ddl_error({:invalid_type, type}), do: "Unsupported column type: #{type}"
defp ddl_error(other), do: format_error(other)
defp single_row(%{columns: cols, rows: [row | _]}), do: Enum.zip(cols, row) |> Map.new()
defp single_row(%{columns: cols, rows: []}), do: cols |> Enum.map(&{&1, nil}) |> Map.new()
# All DB/query errors humanize through one place so nothing reaches the UI as
# a raw struct (connection failures, Postgres errors, hints). See Lantern.Errors.
defp format_error(reason), do: Lantern.Errors.humanize(reason)
end