defmodule Mix.Tasks.Dripdrop.CheckSchema do
@moduledoc """
Verifies that the configured database has the current DripDrop schema version.
"""
@shortdoc "Verifies the installed dripdrop schema version"
use Mix.Task
alias DripDrop.MixHelpers
@required_tables ~w(adapter_pools adapter_pool_members adapter_sequence_budgets)
@required_columns %{
"channel_adapters" =>
~w(health_state health_score resting_until last_send_at daily_cap ramp_started_at ramp_increment ramp_floor min_gap_seconds),
"enrollments" => ~w(adapter_id effective_mode),
"sequence_versions" => ~w(mode),
"steps" => ~w(adapter_override_id),
"step_executions" => ~w(out_message_id),
"message_events" => ~w(in_reply_to references_list)
}
@required_indexes ~w(
adapter_pools_tenant_name_idx
adapter_pools_global_name_idx
adapter_pool_members_pool_adapter_idx
adapter_sequence_budgets_adapter_sequence_idx
adapter_pool_members_active_pool_idx
enrollments_tenant_adapter_idx
enrollments_tenant_effective_mode_idx
step_executions_out_message_id_idx
message_events_in_reply_to_idx
)
@required_constraints ~w(
sequence_versions_mode_chk
channel_adapters_health_state_chk
channel_adapters_health_score_range_chk
channel_adapters_daily_cap_positive_chk
channel_adapters_ramp_increment_positive_chk
channel_adapters_ramp_floor_nonnegative_chk
channel_adapters_min_gap_seconds_nonnegative_chk
enrollments_effective_mode_chk
enrollments_outbound_pin_chk
adapter_pools_on_pin_unavailable_chk
adapter_pool_members_class_chk
adapter_pool_members_weight_positive_chk
adapter_sequence_budgets_max_share_pct_range_chk
)
@impl Mix.Task
def run(args) do
{opts, _args, _invalid} =
OptionParser.parse(args, switches: [repo: :string, prefix: :string])
Mix.Task.run("app.start")
repo = MixHelpers.resolve_repo(opts[:repo])
prefix = Keyword.get(opts, :prefix, "dripdrop")
{:ok, _started} = repo.start_link(pool_size: 2)
installed = installed_version(repo, prefix)
current = DripDrop.Migration.current_version()
if installed == current do
verify_required_objects!(repo, prefix)
Mix.shell().info("dripdrop schema is current at version #{current}")
else
Mix.raise("dripdrop schema version mismatch: installed=#{installed}, expected=#{current}")
end
end
defp installed_version(repo, prefix) do
query = """
SELECT obj_description(c.oid)
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = 'dripdrop_version' AND c.relkind = 'v'
"""
case repo.query(query, [prefix]) do
{:ok, %{rows: [[comment]]}} when is_binary(comment) ->
case Regex.run(~r/version=(\d+)/, comment) do
[_, version] -> String.to_integer(version)
_match -> 0
end
_other ->
0
end
end
defp verify_required_objects!(repo, prefix) do
missing =
[]
|> missing_tables(repo, prefix)
|> missing_columns(repo, prefix)
|> missing_indexes(repo, prefix)
|> missing_constraints(repo, prefix)
case missing do
[] ->
:ok
missing ->
missing_text = Enum.join(Enum.reverse(missing), ", ")
Mix.raise("dripdrop schema is missing required objects: #{missing_text}")
end
end
defp missing_tables(missing, repo, prefix) do
Enum.reduce(@required_tables, missing, fn table, acc ->
if exists?(
repo,
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = $1 AND table_name = $2
""",
[prefix, table]
) do
acc
else
["table #{table}" | acc]
end
end)
end
defp missing_columns(missing, repo, prefix) do
@required_columns
|> Enum.flat_map(fn {table, columns} -> Enum.map(columns, &{table, &1}) end)
|> Enum.reduce(missing, &missing_column(&1, &2, repo, prefix))
end
defp missing_column({table, column}, missing, repo, prefix) do
if exists?(
repo,
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2 AND column_name = $3
""",
[prefix, table, column]
) do
missing
else
["column #{table}.#{column}" | missing]
end
end
defp missing_indexes(missing, repo, prefix) do
Enum.reduce(@required_indexes, missing, fn index, acc ->
if exists?(
repo,
"""
SELECT 1
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind = 'i'
""",
[prefix, index]
) do
acc
else
["index #{index}" | acc]
end
end)
end
defp missing_constraints(missing, repo, prefix) do
Enum.reduce(@required_constraints, missing, fn constraint, acc ->
if exists?(
repo,
"""
SELECT 1
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE n.nspname = $1 AND c.conname = $2
""",
[prefix, constraint]
) do
acc
else
["constraint #{constraint}" | acc]
end
end)
end
defp exists?(repo, query, params) do
case repo.query(query, params) do
{:ok, %{num_rows: count}} when count > 0 -> true
_other -> false
end
end
end