defmodule Fly.Postgres.LSN.Tracker do
@moduledoc """
Tracks the current PostgreSQL LSN or Log Sequence Number. This also tracks
requests to be notified when replication happens and the requested `:insert`
LSN was applied locally.
## GenServers and process responsibility
The GenServer process doesn't have any special behaviors other than creating
and owning the ETS tables that track the information.
The module contains functions for writing data to, and reading data from the
ETS tables.
The `Fly.Postgres.LSN.Reader` GenServer is responsible for interacting with
the database. When replication events are seen, it uses the Tracker functions
to write the cache and executes the notification code. However, the Reader
server is the process executing that code. It is designed this way on purpose
so that any crashes or failures happen to the Reader and caches with
notification requests and current replication values are not lost.
## LSN values
Tracking the LSN value is used to determine which portions of the database log
have been replicated locally. This lets us determine if a specific transaction
chunk has been replicated to know that some expected data is present.
The client process doesn't interact directly with the Tracker GenServer. The
client can `request_notification` or `request_and_await_notification` and the
requesting processes are notified when the data replication has been seen.
"""
use GenServer
require Logger
import Fly.Postgres, only: [verbose_log: 2]
alias Fly.Postgres.LSN
@lsn_table :ets_cache
@request_table :ets_requests
###
### CLIENT
###
@doc """
Start the Tracker that receives work requests.
"""
def start_link(opts \\ []) do
if !Keyword.has_key?(opts, :repo) do
raise ArgumentError, ":repo must be given when starting the LSN Tracker"
end
base_name = Keyword.fetch!(opts, :base_name)
name = get_name(base_name)
GenServer.start_link(__MODULE__, Keyword.put(opts, :name, name), name: name)
end
@doc """
Get the `Ecto.Repo` used by the tracker.
## Options
- `:tracker` - The tracker name to get the latest LSN replay value for. Uses
the default tracker name. Needs to be provided when multiple trackers are
used.
"""
@spec get_repo(opts :: keyword()) :: nil | module()
def get_repo(opts \\ []) do
table_name = get_ets_table_name(@lsn_table, opts)
case :ets.lookup(table_name, :repo) do
[{:repo, repo}] ->
repo
[] ->
nil
end
end
@doc """
Get the latest cached LSN replay value. On a first run, no value is in the
cache and a `nil` is returned.
## Options
- `:tracker` - The tracker name to get the latest LSN replay value for. Uses
the default tracker name. Required when using multiple trackers.
"""
@spec get_last_replay(opts :: keyword()) :: nil | Fly.Postgres.LSN.t()
def get_last_replay(opts \\ []) do
# Option for testing: `:override_table_name` - The ETS table name to read the values from.
table_name = get_ets_table_name(@lsn_table, opts)
case :ets.lookup(table_name, :last_log_replay) do
[{:last_log_replay, %Fly.Postgres.LSN{} = stored}] ->
stored
[] ->
nil
end
end
@doc """
Return if the LSN value was replicated. Compares against the cached value.
"""
@spec replicated?(Fly.Postgres.LSN.t(), opts :: keyword()) :: boolean()
def replicated?(%Fly.Postgres.LSN{source: :insert} = lsn, opts \\ []) do
case get_last_replay(opts) do
%Fly.Postgres.LSN{} = stored ->
Fly.Postgres.LSN.replicated?(stored, lsn)
nil ->
false
end
end
@doc """
Request notification for when the database replication includes the LSN the
process cares about. This enables a process to block and await their data to be
replicated and be notified as soon as it's detected.
Adds an entry to ETS table that tracks notification requests.
"""
@spec request_notification(Fly.Postgres.LSN.t(), opts :: keyword()) :: :ok
def request_notification(%Fly.Postgres.LSN{source: :insert} = lsn, opts \\ []) do
table_name = get_request_tracking_table(opts)
# This uses the pid of the requesting process
:ets.insert(table_name, {self(), lsn})
:ok
end
@doc """
Blocking function that waits for a `request_notification/2` response message
to be received. The timeout defaults to 5s after which time it stops waiting
and returns an `{:error, :timeout}` response.
## Options
- `:replication_timeout` - Timeout duration to wait for replication to
complete. Value is in milliseconds.
"""
@spec await_notification(Fly.Postgres.LSN.t(), opts :: keyword()) ::
:ready | {:error, :timeout}
def await_notification(%Fly.Postgres.LSN{source: :insert} = lsn, opts \\ []) do
timeout = Keyword.get(opts, :replication_timeout, 5_000)
pid = self()
receive do
{:lsn_replicated, {^pid, ^lsn}} -> :ready
after
timeout ->
{:error, :timeout}
end
end
@doc """
Request to be notified when the desired level of data replication has
completed and wait for it to complete. Optionally it may timeout if it takes
too long.
## Options
- `:tracker` - The name of the tracker to wait on for replication tracking.
- `:replication_timeout` - Timeout duration to wait for replication to
complete. Value is in milliseconds.
"""
@spec request_and_await_notification(
:wal_lookup_failure | Fly.Postgres.LSN.t(),
opts :: keyword()
) ::
:ready | {:error, :timeout}
def request_and_await_notification(error_or_lsn, opts \\ [])
def request_and_await_notification(:wal_lookup_failure, _opts) do
# Nothing to wait for. There was an error querying for the Postgres WAL. Can
# happen when user-code DB transactions fail.
verbose_log(:info, fn ->
"Received :wal_lookup_failure. Nothing to wait for."
end)
:ready
end
def request_and_await_notification(%Fly.Postgres.LSN{source: :insert} = lsn, opts) do
# Don't register notification request or wait when on the primary
if Fly.RPC.is_primary?() do
:ready
else
# First check if the LSN value is already in the ETS cache. If so, return
# immediately. Otherwise request to be notified and wait.
if replicated?(lsn, opts) do
:ready
else
verbose_log(:info, fn ->
"LSN REQ notification for #{inspect(lsn)} to #{inspect(self())}"
end)
request_notification(lsn, opts)
result = await_notification(lsn, opts)
verbose_log(:info, fn ->
case result do
:ready ->
"LSN RECV tracking notification for #{inspect(lsn)} to #{inspect(self())}"
{:error, :timeout} ->
"LSN TIMEOUT waiting on #{inspect(lsn)} to #{inspect(self())}"
end
end)
result
end
end
end
###
### SERVER CALLBACKS
###
def init(opts) do
repo = Keyword.fetch!(opts, :repo)
# base name of the tracker process.
base_name = Keyword.fetch!(opts, :base_name)
# Start with the table names to use for this tracker according to the name of the process.
cache_table_name = get_ets_table_name(@lsn_table, base_name: base_name)
requests_table_name = get_ets_table_name(@request_table, base_name: base_name)
# setup ETS table for caching most recently read DB LSN value
tab_lsn_cache = :ets.new(cache_table_name, [:named_table, :public, read_concurrency: true])
# insert special entry for which repo this tracker is using
:ets.insert(cache_table_name, {:repo, repo})
# setup ETS table for processes requesting notification when new matching LSN value is seen
tab_requests = :ets.new(requests_table_name, [:named_table, :public, read_concurrency: true])
# Initial state.
{
:ok,
%{
base_name: Keyword.get(opts, :base_name),
name: Keyword.get(opts, :name),
lsn_table: tab_lsn_cache,
requests_table: tab_requests,
repo: repo
}
}
end
@doc """
Write the latest LSN value to the cache. Don't record a `nil` LSN value.
"""
@spec write_lsn_to_cache(nil | LSN.t(), lsn_table :: atom()) :: :ok
def write_lsn_to_cache(lsn, lsn_table)
def write_lsn_to_cache(nil, lsn_table) do
# delete any cached value. Useful for testing.
:ets.delete(lsn_table, :last_log_replay)
:ok
end
def write_lsn_to_cache(%LSN{} = lsn, lsn_table) do
:ets.insert(lsn_table, {:last_log_replay, lsn})
:ok
end
# Process the list of notification requests in the ETS table. If the tracked
# insert LSN has been replicated so it is now local, notify the pid and remove
# the entry.
@doc false
# Private function
def process_request_entries(base_name) do
req_table = get_request_tracking_table(base_name: base_name)
lsn_table = get_lsn_cache_table(base_name: base_name)
case fetch_request_entries(req_table) do
[] ->
# Nothing to do. No outstanding requests being tracked
:ok
requests ->
# We have requests to process. Query for the latest replication LSN
last_replay = get_last_replay(override_table_name: lsn_table)
# Cycle and notify if replicated
Enum.each(requests, fn {pid, lsn_insert} = entry ->
# If the tracked LSN was already replicated, notify the pid and remove the
# entry
if Fly.Postgres.LSN.replicated?(last_replay, lsn_insert) do
# notify the requesting pid that the LSN was replicated
send(pid, {:lsn_replicated, entry})
# delete the request from the ETS table
:ets.delete(req_table, pid)
end
end)
end
end
# Return the current list of LSN notification subscriptions.
# Reads from the ETS table.
@doc false
# Private function
def fetch_request_entries(requests_table) do
:ets.match_object(requests_table, {:"$1", :"$2"})
end
@doc """
Get the LSN cached ETS table name for the specified tracker.
"""
@spec get_lsn_cache_table(opts :: keyword()) :: atom()
def get_lsn_cache_table(opts \\ []) do
get_ets_table_name(@lsn_table, opts)
end
@doc """
Get the notification request tracking ETS table name for the specified tracker.
"""
@spec get_request_tracking_table(opts :: keyword()) :: atom()
def get_request_tracking_table(opts \\ []) do
get_ets_table_name(@request_table, opts)
end
@doc """
Get the ETS table name. It is derived from the table prefix name and the base
name of the tracker (as there can be multiple).
"""
# Atom interpolation is OK here because the values are provided by dev
# and used to create the name of an ETS table.
#
# sobelow_skip ["DOS.BinToAtom"]
@spec get_ets_table_name(atom(), opts :: keyword()) :: atom()
def get_ets_table_name(base_table_name, opts \\ []) do
base_name = Keyword.get(opts, :base_name) || Core.LSN
Keyword.get_lazy(opts, :override_table_name, fn ->
# NOTE: This intentionally creates an atom. The input values come from
# developer code, not user input.
:"#{base_table_name}_#{get_name(base_name)}"
end)
end
@doc """
Get the name of the tracker instance that is derived from the base tracking name.
"""
# Atom interpolation is OK here because it is provided by dev.
#
# sobelow_skip ["DOS.BinToAtom"]
@spec get_name(atom()) :: atom()
def get_name(base_name) when is_atom(base_name) do
:"#{base_name}_tracker"
end
end