lib/lsn/lsn.ex

defmodule Fly.Postgres.LSN do
  @moduledoc """
  Data structure that represents a PostgreSQL LSN or Log Sequence Number.

  Two LSN values can be compared using the `replicated?/2` function. An LSN
  associated with the DB modification has a `source` of `:insert`. On a replica
  instance, that can be used to see when the insert has been replicated locally.
  """
  alias __MODULE__

  defstruct fpart: nil, offset: nil, source: nil

  @type t :: %LSN{
          fpart: nil | integer,
          offset: nil | integer,
          source: :not_replicating | :insert | :replay
        }

  @doc """
  Create a new `Fly.Postgres.LSN` struct from the a queried WAL value.
  """
  @spec new(lsn :: nil | String.t(), source :: :insert | :replay) :: no_return() | t()
  def new(nil, :replay) do
    %LSN{fpart: nil, offset: nil, source: :not_replicating}
  end

  def new(lsn, source) when is_binary(lsn) and source in [:insert, :replay] do
    with [file_part_str, offset_str] <- String.split(lsn, "/"),
         {fpart, ""} = Integer.parse(file_part_str, 16),
         {offset, ""} = Integer.parse(offset_str, 16) do
      %LSN{fpart: fpart, offset: offset, source: source}
    else
      _ -> raise ArgumentError, "invalid lsn format #{inspect(lsn)}"
    end
  end

  # F1/O1 is at least as new as F2/O2 if (F1 > F2) or (F1 == F2 and O1 >= O2)
  @doc """
  Compare two `Fly.Postgres.LSN` structs to determine if the transaction representing a
  data change on the primary has been replayed locally.

  They are compared where the replay/replica value is in argument 1 and the
  insert value is in argument two.

  ## Examples

      repo |> last_wal_replay() |> replicated?(primary_lsn)
  """
  def replicated?(replay_lsn, insert_lsn)
  def replicated?(%LSN{source: :not_replicating}, %LSN{source: :insert}), do: true

  def replicated?(%LSN{fpart: f1, offset: o1, source: :replay}, %LSN{
        fpart: f2,
        offset: o2,
        source: :insert
      }) do
    f1 > f2 or (f1 == f2 and o1 >= o2)
  end

  @doc """
  Convert an LSN struct back into a text value.
  """
  @spec to_text(t()) :: nil | String.t()
  def to_text(%LSN{fpart: nil, offset: nil}), do: nil

  def to_text(%LSN{fpart: fpart, offset: offset}) do
    Integer.to_string(fpart, 16) <> "/" <> Integer.to_string(offset, 16)
  end

  @doc """
  After performing a database modification, calling `current_wal_insert/1`
  returns a value that can be used to compare against a WAL value from the
  replica database to determine when the changes have been replayed on the
  replica.
  """
  def current_wal_insert(repo) do
    %Postgrex.Result{rows: [[lsn]]} =
      repo.query!("select CAST(pg_current_wal_insert_lsn() AS TEXT)")

    new(lsn, :insert)
  end

  @doc """
  When talking to a replica database, this returns a value for what changes have
  been replayed on the replica from the primary.
  """
  def last_wal_replay(repo) do
    %Postgrex.Result{rows: [[lsn]]} = repo.query!("select CAST(pg_last_wal_replay_lsn() AS TEXT)")
    new(lsn, :replay)
  end

  @doc """
  When talking to a replica database, this returns a value for what changes have
  been replayed on the replica from the primary.
  """
  @spec last_wal_replay_watch(module(), nil | t()) :: nil | t()
  def last_wal_replay_watch(repo, from_lsn) do
    param_value =
      case from_lsn do
        nil -> nil
        %LSN{} -> to_text(from_lsn)
      end

    %Postgrex.Result{rows: [[result]]} =
      repo.query!("SELECT watch_for_lsn_change($1, 2);", [param_value])

    case result do
      nil -> nil
      lsn_text -> new(lsn_text, :replay)
    end
  end
end

defimpl Inspect, for: Fly.Postgres.LSN do
  import Inspect.Algebra

  def inspect(lsn, _opts) do
    concat(["#LSN<", Fly.Postgres.LSN.to_text(lsn), ">"])
  end
end