lib/walex/replication_server.ex

# This file steals liberally from https://github.com/chasers/postgrex_replication_demo/blob/main/lib/replication.ex
# which in turn draws on https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#module-logical-replication

defmodule WalEx.ReplicationServer do
  use Postgrex.ReplicationConnection

  alias WalEx.Postgres.Decoder
  alias WalEx.ReplicationPublisher

  def start_link(opts) do
    app_name = Keyword.get(opts, :app_name)

    opts = set_pgx_replication_conn_opts(app_name)

    Postgrex.ReplicationConnection.start_link(__MODULE__, [app_name: app_name], opts)
  end

  defp set_pgx_replication_conn_opts(app_name) do
    database_configs_keys = [:hostname, :username, :password, :port, :database, :ssl, :ssl_opts]

    extra_opts = [auto_reconnect: true]
    database_configs = WalEx.Configs.get_configs(app_name, database_configs_keys)
    replications_name = [name: WalEx.Registry.set_name(:set_gen_server, __MODULE__, app_name)]

    extra_opts ++ database_configs ++ replications_name
  end

  @impl true
  def init(opts) do
    app_name = Keyword.get(opts, :app_name)

    if is_nil(Process.whereis(ReplicationPublisher)) do
      {:ok, _pid} = ReplicationPublisher.start_link([])
    end

    {:ok, %{step: :disconnected, app_name: app_name}}
  end

  @impl true
  def handle_connect(state) do
    temp_slot = "walex_temp_slot_" <> Integer.to_string(:rand.uniform(9_999))

    query = "CREATE_REPLICATION_SLOT #{temp_slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT;"

    {:query, query, %{state | step: :create_slot}}
  end

  @impl true
  def handle_result([%Postgrex.Result{rows: rows} | _results], %{step: :create_slot} = state) do
    slot_name = rows |> hd |> hd

    publication =
      state.app_name
      |> WalEx.Configs.get_configs([:publication])
      |> Keyword.get(:publication)

    query =
      "START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{publication}')"

    {:stream, query, [], %{state | step: :streaming}}
  end

  @impl true
  # https://www.postgresql.org/docs/14/protocol-replication.html
  def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
    rest
    |> Decoder.decode_message()
    |> ReplicationPublisher.process_message(state.app_name)

    {:noreply, state}
  end

  def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
    messages =
      case reply do
        1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
        0 -> []
      end

    {:noreply, messages, state}
  end

  @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
  defp current_time, do: System.os_time(:microsecond) - @epoch
end