# This file steals liberally from https://github.com/supabase/realtime,
# which in turn draws on https://github.com/cainophile/cainophile
defmodule WalEx.TransactionFilter do
alias WalEx.Changes.Transaction
require Logger
defmodule(Filter, do: defstruct([:schema, :table, :condition]))
@doc """
Predicate to check if the filter matches the transaction.
## Examples
iex> txn = %Transaction{changes: [
...> %WalEx.Changes.NewRecord{
...> columns: [
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [:key], name: "id", type: "int8", type_modifier: 4294967295},
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [], name: "details", type: "text", type_modifier: 4294967295},
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [], name: "user_id", type: "int8", type_modifier: 4294967295}
...> ],
...> commit_timestamp: nil,
...> record: %{"details" => "The SCSI system is down, program the haptic microchip so we can back up the SAS circuit!", "id" => "14", "user_id" => "1"},
...> schema: "public",
...> table: "todos",
...> type: "INSERT"
...> }
...> ]}
iex> matches?(%{event: "*", relation: "*"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "*"}, txn)
true
iex> matches?(%{event: "UPDATE", relation: "*"}, txn)
false
iex> matches?(%{event: "INSERT", relation: "public"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "myschema"}, txn)
false
iex> matches?(%{event: "INSERT", relation: "public:todos"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "myschema:users"}, txn)
false
"""
def matches?(%{event: event, relation: relation}, %Transaction{changes: changes}) do
case parse_relation_filter(relation) do
{:ok, filter} ->
Enum.any?(changes, fn change -> change_matches(event, filter, change) end)
{:error, msg} ->
Logger.warn("Could not parse relation filter: #{inspect(msg)}")
false
end
end
# malformed filter or txn. Should not match.
def matches?(_filter, _txn), do: false
defp change_matches(event, _filter, %{type: type}) when event != type and event != "*" do
false
end
defp change_matches(_event, filter, change) do
name_matches(filter.schema, change.schema) and name_matches(filter.table, change.table)
end
@doc """
Parse a string representing a relation filter to a `Filter` struct.
## Examples
iex> parse_relation_filter("public:users")
{:ok, %Filter{schema: "public", table: "users", condition: nil}}
iex> parse_relation_filter("public")
{:ok, %Filter{schema: "public", table: nil, condition: nil}}
iex> parse_relation_filter("")
{:ok, %Filter{schema: nil, table: nil, condition: nil}}
iex> parse_relation_filter("public:users:bad")
{:error, "malformed relation filter"}
"""
def parse_relation_filter(relation) do
# We do a very loose validation here.
# When the relation filter format is well defined we can do
# proper parsing and validation.
case String.split(relation, ":") do
[""] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}}
["*"] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}}
[schema] -> {:ok, %Filter{schema: schema, table: nil, condition: nil}}
[schema, table] -> {:ok, %Filter{schema: schema, table: table, condition: nil}}
_ -> {:error, "malformed relation filter"}
end
end
defp name_matches(nil, _change_name), do: true
defp name_matches(filter_name, change_name) do
filter_name == change_name
end
def insert_event?(relation, txn), do: relation("INSERT", relation, txn)
def update_event?(relation, txn), do: relation("UPDATE", relation, txn)
def delete_event?(relation, txn), do: relation("DELETE", relation, txn)
defp relation(event, relation, txn) when is_atom(relation) do
matches?(%{event: event, relation: "public:" <> to_string(relation)}, txn)
end
defp relation(event, relation, txn) when is_binary(relation) do
if String.contains?(relation, ":") do
matches?(%{event: event, relation: relation}, txn)
else
matches?(%{event: event, relation: "public:" <> relation}, txn)
end
end
def table(table_name, %Transaction{changes: changes}) do
Enum.filter(changes, fn change -> has_table?(change, table_name) end)
end
def table(_table, _txn), do: false
defp has_table?(change, table_name), do: String.to_atom(change.table) == table_name
def has_tables?(tables, %Transaction{changes: _changes} = txn, app_name) when is_list(tables) do
tables
|> Enum.map(fn table -> has_tables?(table, txn, app_name) end)
|> Enum.all?()
end
def has_tables?(table_name, %Transaction{changes: changes}, app_name)
when is_atom(table_name) do
Enum.any?(changes, fn change ->
has_table?(change, table_name) && subscribes?(change, app_name)
end)
end
def has_tables?(table_name, txn, app_name) when is_binary(table_name) do
has_tables?(String.to_atom(table_name), txn, app_name)
end
def has_tables?(_tables, _txn, _app_name), do: false
defp subscribes?(change, app_name) do
subscriptions =
app_name
|> WalEx.Configs.get_configs([:subscriptions])
|> Keyword.get(:subscriptions)
String.to_atom(change.table) in subscriptions
end
def changes(old_record, record) do
case MapDiff.diff(old_record, record) do
%{value: changes} ->
filter_changes(changes)
_ ->
%{}
end
end
defp filter_changes(changes) do
changes
|> Enum.filter(fn {_key, change} ->
if is_map(change) && Map.has_key?(change, :changed) do
change.changed in [:primitive_change, :map_change]
end
end)
|> Enum.into(%{})
end
end