lib/bylaw/db/adapters/postgres/checks/scoped_foreign_keys.ex

defmodule Bylaw.Db.Adapters.Postgres.Checks.ScopedForeignKeys do
  @moduledoc """
  Validates that scoped Postgres foreign keys include configured scope columns.

  ## Examples

  Before, both tables are tenant-scoped, but the foreign key only references
  `conversations(id)`:

  ```sql
  CREATE TABLE conversations (
    tenant_id uuid NOT NULL,
    id uuid NOT NULL,
    PRIMARY KEY (tenant_id, id),
    UNIQUE (id)
  );

  CREATE TABLE messages (
    tenant_id uuid NOT NULL,
    conversation_id uuid NOT NULL,
    FOREIGN KEY (conversation_id) REFERENCES conversations(id)
  );
  ```

  A message can point at a conversation with the same `id` in another tenant if
  application code passes the wrong identifier.

  After, include the scope columns in the foreign key:

  ```sql
  CREATE TABLE messages (
    tenant_id uuid NOT NULL,
    conversation_id uuid NOT NULL,
    FOREIGN KEY (tenant_id, conversation_id)
      REFERENCES conversations(tenant_id, id)
  );
  ```

  Postgres now enforces that the child and parent rows belong to the same
  tenant, instead of relying on every query and write path to remember it.

  ## Notes

  The check only applies when the child table and referenced table both have
  every configured `scope_columns` column. Shared lookup tables that
  intentionally have no tenant column are not flagged unless they match a
  different rule.

  ## Options

  A foreign key is checked when both the child table and referenced table have
  every configured `:scope_columns` column. The foreign key must include those
  columns on both sides so a child row cannot point at a parent row from another
  scope:

  ```elixir
  {Bylaw.Db.Adapters.Postgres.Checks.ScopedForeignKeys,
   rules: [
     [
       scope_columns: ["tenant_id", "workspace_id"],
       except: [[referenced_table: "global_settings"]]
     ]
   ]}
  ```

  ## Usage

  Add this module to the checks passed to
  `Bylaw.Db.Adapters.Postgres.validate/2`. See the
  [README usage section](readme.html#usage) for the full ExUnit setup.
  """

  @behaviour Bylaw.Db.Check

  alias Bylaw.Db.Adapters.Postgres
  alias Bylaw.Db.Adapters.Postgres.Result
  alias Bylaw.Db.Adapters.Postgres.RuleOptions
  alias Bylaw.Db.Check
  alias Bylaw.Db.Issue
  alias Bylaw.Db.Target

  @query """
  WITH foreign_keys AS (
    SELECT
      namespace.nspname AS schema_name,
      table_class.relname AS table_name,
      constraint_record.conname AS constraint_name,
      referenced_namespace.nspname AS referenced_schema_name,
      referenced_class.relname AS referenced_table_name,
      constraint_record.conrelid AS table_oid,
      constraint_record.confrelid AS referenced_table_oid,
      ARRAY(
        SELECT attribute.attname::text
        FROM unnest(constraint_record.conkey) WITH ORDINALITY AS key(attnum, position)
        JOIN pg_catalog.pg_attribute AS attribute
          ON attribute.attrelid = constraint_record.conrelid
         AND attribute.attnum = key.attnum
        ORDER BY key.position
      ) AS column_names,
      ARRAY(
        SELECT attribute.attname::text
        FROM unnest(constraint_record.confkey) WITH ORDINALITY AS key(attnum, position)
        JOIN pg_catalog.pg_attribute AS attribute
          ON attribute.attrelid = constraint_record.confrelid
         AND attribute.attnum = key.attnum
        ORDER BY key.position
      ) AS referenced_column_names
    FROM pg_catalog.pg_constraint AS constraint_record
    JOIN pg_catalog.pg_class AS table_class
      ON table_class.oid = constraint_record.conrelid
    JOIN pg_catalog.pg_namespace AS namespace
      ON namespace.oid = table_class.relnamespace
    JOIN pg_catalog.pg_class AS referenced_class
      ON referenced_class.oid = constraint_record.confrelid
    JOIN pg_catalog.pg_namespace AS referenced_namespace
      ON referenced_namespace.oid = referenced_class.relnamespace
    WHERE constraint_record.contype = 'f'
      AND namespace.nspname <> 'information_schema'
      AND namespace.nspname NOT LIKE 'pg\\_%' ESCAPE '\\'
      AND ($1::text[] IS NULL OR namespace.nspname = ANY($1))
      AND ($2::text[] IS NULL OR table_class.relname = ANY($2))
  ),
  scoped_foreign_keys AS (
    SELECT foreign_keys.*
    FROM foreign_keys
    WHERE NOT EXISTS (
      SELECT 1
      FROM unnest($3::text[]) AS scope_column(column_name)
      WHERE NOT EXISTS (
        SELECT 1
        FROM pg_catalog.pg_attribute AS attribute
        WHERE attribute.attrelid = foreign_keys.table_oid
          AND attribute.attname = scope_column.column_name
          AND attribute.attnum > 0
          AND NOT attribute.attisdropped
      )
    )
      AND NOT EXISTS (
        SELECT 1
        FROM unnest($3::text[]) AS scope_column(column_name)
        WHERE NOT EXISTS (
          SELECT 1
          FROM pg_catalog.pg_attribute AS attribute
          WHERE attribute.attrelid = foreign_keys.referenced_table_oid
            AND attribute.attname = scope_column.column_name
            AND attribute.attnum > 0
            AND NOT attribute.attisdropped
        )
      )
  )
  SELECT
    schema_name,
    table_name,
    constraint_name,
    column_names,
    referenced_schema_name,
    referenced_table_name,
    referenced_column_names
  FROM scoped_foreign_keys
  WHERE NOT (
    column_names @> $3::text[]
    AND referenced_column_names @> $3::text[]
  )
  ORDER BY schema_name, table_name, constraint_name

  """

  @type matcher_value :: String.t() | Regex.t()
  @type matcher_values :: matcher_value() | list(matcher_value())
  @type matcher ::
          list(
            {:schema, matcher_values()}
            | {:table, matcher_values()}
            | {:constraint, matcher_values()}
            | {:referenced_table, matcher_values()}
          )
  @type rule ::
          list(
            {:only, matcher() | list(matcher())}
            | {:except, matcher() | list(matcher())}
            | {:scope_columns, list(String.t())}
          )
  @type check_opt ::
          {:validate, boolean()}
          | {:rules, list(rule())}

  @type check_opts :: list(check_opt())

  @row_keys %{
    "column_names" => :column_names,
    "constraint_name" => :constraint_name,
    "referenced_column_names" => :referenced_column_names,
    "referenced_schema_name" => :referenced_schema_name,
    "referenced_table_name" => :referenced_table_name,
    "schema_name" => :schema_name,
    "table_name" => :table_name
  }

  @doc """
  Implements the `Bylaw.Db.Check` validation callback.
  """
  @impl Bylaw.Db.Check
  @spec validate(target :: Target.t(), opts :: check_opts()) :: Check.result()
  def validate(%Target{adapter: Postgres} = target, opts) when is_list(opts) do
    opts = check_opts!(opts)

    if RuleOptions.enabled?(opts) do
      validate_scoped_foreign_keys(target, opts)
    else
      :ok
    end
  end

  def validate(%Target{adapter: Postgres}, opts) do
    raise ArgumentError,
          "expected scoped_foreign_keys opts to be a keyword list, got: #{inspect(opts)}"
  end

  def validate(%Target{} = target, _opts) do
    raise ArgumentError, "expected a Postgres target, got: #{inspect(target)}"
  end

  def validate(target, _opts) do
    raise ArgumentError, "expected a database target, got: #{inspect(target)}"
  end

  defp validate_scoped_foreign_keys(target, opts) do
    opts
    |> normalize_rules!()
    |> Enum.flat_map(&rule_issues(target, &1))
    |> Result.to_check_result()
  end

  defp rule_issues(target, rule) do
    {schemas, tables} = query_filters(rule)

    case Postgres.query(target, @query, [schemas, tables, rule.scope_columns], []) do
      {:ok, result} ->
        result
        |> Result.rows()
        |> Enum.filter(fn row -> RuleOptions.in_rule_scope?(row, rule, &matcher_value/2) end)
        |> Enum.map(&issue(target, &1, rule.scope_columns))

      {:error, reason} ->
        [query_error_issue(target, rule, reason)]
    end
  end

  defp check_opts!(opts) do
    RuleOptions.keyword_list!(opts, :scoped_foreign_keys)

    RuleOptions.validate_allowed_keys!(
      opts,
      [:validate, :rules, :scope_columns],
      :scoped_foreign_keys
    )

    RuleOptions.validate_boolean_option!(opts, :validate, :scoped_foreign_keys)

    if RuleOptions.enabled?(opts) do
      RuleOptions.reject_top_level_keys_with_rules!(opts, [:scope_columns], :scoped_foreign_keys)
      normalize_rules!(opts)
    end

    opts
  end

  defp normalize_rules!(opts) do
    cond do
      Keyword.has_key?(opts, :rules) ->
        opts
        |> Keyword.fetch!(:rules)
        |> RuleOptions.rules!(
          :scoped_foreign_keys,
          allowed_matcher_keys(),
          [:scope_columns],
          &rule_payload!/1
        )

      Keyword.has_key?(opts, :scope_columns) ->
        [
          %{
            scope_columns: scope_columns!(Keyword.fetch!(opts, :scope_columns)),
            only: [],
            except: []
          }
        ]

      true ->
        raise ArgumentError, "expected scoped_foreign_keys to include :scope_columns"
    end
  end

  defp rule_payload!(rule) do
    if not Keyword.has_key?(rule, :scope_columns) do
      raise ArgumentError, "expected scoped_foreign_keys rule to include :scope_columns"
    end

    %{scope_columns: scope_columns!(Keyword.fetch!(rule, :scope_columns))}
  end

  defp scope_columns!(values) when is_list(values) do
    if Enum.empty?(values) or Enum.any?(values, &(not non_empty_string?(&1))) do
      raise_scope_columns_error!()
    end

    values
  end

  defp scope_columns!(_values), do: raise_scope_columns_error!()

  defp non_empty_string?(value), do: is_binary(value) and byte_size(value) > 0

  defp raise_scope_columns_error! do
    raise ArgumentError,
          "expected scoped_foreign_keys :scope_columns to be a non-empty list of strings"
  end

  defp matcher_value(row, :schema), do: Result.value(row, "schema_name", @row_keys)
  defp matcher_value(row, :table), do: Result.value(row, "table_name", @row_keys)
  defp matcher_value(row, :constraint), do: Result.value(row, "constraint_name", @row_keys)

  defp matcher_value(row, :referenced_table),
    do: Result.value(row, "referenced_table_name", @row_keys)

  defp allowed_matcher_keys, do: [:schema, :table, :constraint, :referenced_table]

  defp query_filters(%{only: [[schema: schemas, table: tables]]}),
    do: {List.wrap(schemas), List.wrap(tables)}

  defp query_filters(%{only: [[table: tables, schema: schemas]]}),
    do: {List.wrap(schemas), List.wrap(tables)}

  defp query_filters(%{only: [[schema: schemas]]}), do: {List.wrap(schemas), nil}
  defp query_filters(%{only: [[table: tables]]}), do: {nil, List.wrap(tables)}
  defp query_filters(_rule), do: {nil, nil}

  @spec issue(target :: Target.t(), row :: Result.row(), scope_columns :: list(String.t())) ::
          Issue.t()
  defp issue(target, row, scope_columns) do
    schema_name = Result.value(row, "schema_name", @row_keys)
    table_name = Result.value(row, "table_name", @row_keys)
    constraint_name = Result.value(row, "constraint_name", @row_keys)
    column_names = Result.value(row, "column_names", @row_keys)
    referenced_schema_name = Result.value(row, "referenced_schema_name", @row_keys)
    referenced_table_name = Result.value(row, "referenced_table_name", @row_keys)
    referenced_column_names = Result.value(row, "referenced_column_names", @row_keys)

    %Issue{
      check: __MODULE__,
      target: target,
      message:
        "expected foreign key #{constraint_name} on #{schema_name}.#{table_name} " <>
          "to include required scope columns #{Enum.join(scope_columns, ", ")}",
      meta: %{
        repo: target.repo,
        dynamic_repo: target.dynamic_repo,
        schema: schema_name,
        table: table_name,
        constraint: constraint_name,
        columns: column_names,
        referenced_schema: referenced_schema_name,
        referenced_table: referenced_table_name,
        referenced_columns: referenced_column_names,
        scope_columns: scope_columns
      }
    }
  end

  @spec query_error_issue(
          target :: Target.t(),
          rule :: map(),
          reason :: term()
        ) :: Issue.t()
  defp query_error_issue(target, rule, reason) do
    %Issue{
      check: __MODULE__,
      target: target,
      message: "could not inspect Postgres scoped foreign keys",
      meta: %{
        repo: target.repo,
        dynamic_repo: target.dynamic_repo,
        rule: rule,
        reason: reason
      }
    }
  end
end