# This file steals liberally from https://github.com/supabase/realtime,
# which in turn draws on https://github.com/cainophile/cainophile
defmodule WalEx.TransactionFilter do
alias WalEx.Changes.{
DeletedRecord,
NewRecord,
UpdatedRecord,
Transaction
}
alias WalEx.Postgres.Decoder.Messages.Relation.Column
require Logger
defmodule(Filter, do: defstruct([:schema, :table, :condition]))
@doc """
Predicate to check if the filter matches the transaction.
## Examples
iex> txn = %Transaction{changes: [
...> %WalEx.Changes.NewRecord{
...> columns: [
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [:key], name: "id", type: "int8", type_modifier: 4294967295},
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [], name: "details", type: "text", type_modifier: 4294967295},
...> %WalEx.Postgres.Decoder.Messages.Relation.Column{flags: [], name: "user_id", type: "int8", type_modifier: 4294967295}
...> ],
...> commit_timestamp: nil,
...> record: %{"details" => "The SCSI system is down, program the haptic microchip so we can back up the SAS circuit!", "id" => "14", "user_id" => "1"},
...> schema: "public",
...> table: "todos",
...> type: "INSERT"
...> }
...> ]}
iex> matches?(%{event: "*", relation: "*"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "*"}, txn)
true
iex> matches?(%{event: "UPDATE", relation: "*"}, txn)
false
iex> matches?(%{event: "INSERT", relation: "public"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "myschema"}, txn)
false
iex> matches?(%{event: "INSERT", relation: "public:todos"}, txn)
true
iex> matches?(%{event: "INSERT", relation: "myschema:users"}, txn)
false
"""
def matches?(%{event: event, relation: relation}, %Transaction{changes: changes}) do
case parse_relation_filter(relation) do
{:ok, filter} ->
Enum.any?(changes, fn change -> change_matches(event, filter, change) end)
{:error, _msg} ->
false
end
end
# malformed filter or txn. Should not match.
def matches?(_filter, _txn), do: false
defp change_matches(event, _filter, %{type: type}) when event != type and event != "*" do
false
end
defp change_matches(_event, filter, change) do
name_matches?(filter.schema, change.schema) and name_matches?(filter.table, change.table)
end
@doc """
Parse a string representing a relation filter to a `Filter` struct.
## Examples
iex> parse_relation_filter("public:users")
{:ok, %Filter{schema: "public", table: "users", condition: nil}}
iex> parse_relation_filter("public")
{:ok, %Filter{schema: "public", table: nil, condition: nil}}
iex> parse_relation_filter("")
{:ok, %Filter{schema: nil, table: nil, condition: nil}}
iex> parse_relation_filter("public:users:bad")
{:error, "malformed relation filter"}
"""
def parse_relation_filter(relation) do
# We do a very loose validation here.
# When the relation filter format is well defined we can do
# proper parsing and validation.
case String.split(relation, ":") do
[""] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}}
["*"] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}}
[schema] -> {:ok, %Filter{schema: schema, table: nil, condition: nil}}
[schema, table] -> {:ok, %Filter{schema: schema, table: table, condition: nil}}
_ -> {:error, "malformed relation filter"}
end
end
defp name_matches?(nil, _change_name), do: true
defp name_matches?(filter_name, change_name), do: filter_name == change_name
def insert_event?(relation, txn), do: relation("INSERT", relation, txn)
def update_event?(relation, txn), do: relation("UPDATE", relation, txn)
def delete_event?(relation, txn), do: relation("DELETE", relation, txn)
defp relation(event, relation, txn) when is_atom(relation) do
matches?(%{event: event, relation: "public:" <> to_string(relation)}, txn)
end
defp relation(event, relation, txn) when is_binary(relation) do
if String.contains?(relation, ":") do
matches?(%{event: event, relation: relation}, txn)
else
matches?(%{event: event, relation: "public:" <> relation}, txn)
end
end
@doc """
Returns a list of subscribed changes
"""
def filter_subscribed(%Transaction{changes: changes}, app_name) do
Enum.filter(changes, &subscribes?(&1, app_name))
end
@doc """
Returns a list of changes for the given table name and type (optional)
"""
def filter_changes(%Transaction{changes: changes}, table, nil, app_name) do
subscribes_and_has_table(changes, table, app_name)
end
def filter_changes(%Transaction{changes: changes}, table, type, app_name) do
changes
|> subscribes_and_has_table(table, app_name)
|> Enum.filter(&is_type?(&1, type))
end
defp subscribes_and_has_table(changes, table, app_name) do
Enum.filter(changes, &subscribes_to_table?(&1, table, app_name))
end
def subscribes_to_table?(change, table, app_name) do
has_table?(change, table) && subscribes?(change, app_name)
end
def subscribes?(%{table: table}, app_name) do
subscriptions = WalEx.Config.get_configs(app_name, :subscriptions)
String.to_atom(table) in subscriptions
end
def has_table?(%{table: table}, table_name) when is_atom(table), do: table == table_name
def has_table?(%{table: table}, table_name) when is_binary(table),
do: String.to_atom(table) == table_name
def has_table?(_txn, _table_name), do: false
def is_type?(%NewRecord{type: "INSERT"}, :insert), do: true
def is_type?(%UpdatedRecord{type: "UPDATE"}, :update), do: true
def is_type?(%DeletedRecord{type: "DELETE"}, :delete), do: true
def is_type?(_txn, _type), do: false
def filter_unwatched_fields(events, unwatched_changes) do
Enum.filter(events, &unwatched_fields?(&1, unwatched_changes))
end
def unwatched_fields?(%{changes: nil}, _unwatched_changes), do: true
def unwatched_fields?(%{changes: changes}, unwatched_changes) do
changes
|> Enum.filter(fn {key, _value} -> key not in unwatched_changes end)
|> Kernel.!=([])
end
def unwatched_fields?(_event, _unwatched_changes), do: true
def filter_unwatched_records(events, unwatched_records) do
Enum.filter(events, &watched_record?(&1, unwatched_records))
end
def watched_record?(%{new_record: nil, old_record: old_record = %{}}, unwatched_records) do
not contains_unwatched_records?(old_record, unwatched_records)
end
def watched_record?(%{new_record: new_record = %{}}, unwatched_records) do
not contains_unwatched_records?(new_record, unwatched_records)
end
def watched_record?(_event, _unwatched_records), do: false
def contains_unwatched_records?(record = %{}, unwatched_records = %{}) do
Enum.all?(unwatched_records, fn {key, value} ->
Map.has_key?(record, key) and Map.get(record, key) == value
end)
end
def map_changes(old_record, new_record) do
fields = Map.keys(old_record)
Enum.reduce(fields, %{}, fn field, acc ->
old_value = Map.get(old_record, field)
new_value = Map.get(new_record, field)
if old_value != new_value do
Map.put(acc, field, %{old_value: old_value, new_value: new_value})
else
acc
end
end)
end
def map_columns(columns) do
Enum.reduce(columns, %{}, fn %Column{name: name, type: type}, acc ->
name = String.to_atom(name)
Map.put(acc, name, type)
end)
end
end