lib/postgrex/simple_connection.ex

defmodule Postgrex.SimpleConnection do
  @moduledoc ~S"""
  A generic connection suitable for simple queries and pubsub functionality.

  On its own, a SimpleConnection server only maintains a connection. To execute
  queries, process results, or relay notices you must implement a callback module
  with the SimpleConnection behaviour.

  ## Example

  The SimpleConnection behaviour abstracts common client/server interactions,
  along with optional mechanisms for running queries or relaying notifications.

  Let's start with a minimal callback module that executes a query and relays
  the result back to the caller.

      defmodule MyConnection do
        @behaviour Postgrex.SimpleConnection

        @impl true
        def init(_args) do
          {:ok, %{from: nil}}
        end

        @impl true
        def handle_call({:query, query}, from, state) do
          {:query, query, %{state | from: from}}
        end

        @impl true
        def handle_result(results, state) when is_list(results) do
          SimpleConnection.reply(state.from, results)

          {:noreply, state}
        end

        @impl true
        def handle_result(%Postgrex.Error{} = error, state) do
          SimpleConnection.reply(state.from, error)

          {:noreply, state}
        end
      end

      # Start the connection
      {:ok, pid} = SimpleConnection.start_link(MyConnection, [], database: "demo")

      # Execute a literal query
      SimpleConnection.call(pid, {:query, "SELECT 1"})
      # => %Postgrex.Result{rows: [["1"]]}

  We start a connection by passing the callback module, callback options, and
  server options to `SimpleConnection.start_link/3`. The `init/1` function
  receives any callback options and returns the callback state.

  Queries are sent through `SimpleConnection.call/2`, executed on the server,
  and the result is handed off to `handle_result/2`. At that point the callback
  can process the result before replying back to the caller with
  `SimpleConnection.reply/2`.

  ## Building a PubSub Connection

  With the `notify/3` callback you can also build a pubsub server on top of
  `LISTEN/NOTIFY`. Here's a naive pubsub implementation:

      defmodule MyPubSub do
        @behaviour Postgrex.SimpleConnection

        defstruct [:from, listeners: %{}]

        @impl true
        def init(args) do
          {:ok, struct!(__MODULE__, args)}
        end

        @impl true
        def notify(channel, payload, state) do
          for pid <- state.listeners[channel] do
            send(pid, {:notice, channel, payload})
          end
        end

        @impl true
        def handle_call({:listen, channel}, {pid, _} = from, state) do
          listeners = Map.update(state.listeners, channel, [pid], &[pid | &1])

          {:query, ~s(LISTEN "#{channel}"), %{state | from: from, listeners: listeners}}
        end

        def handle_call({:query, query}, from, state) do
          {:query, query, %{state | from: from}}
        end

        @impl true
        def handle_result(_results, state) do
          SimpleConnection.reply(state.from, :ok)

          {:noreply, %{state | from: nil}}
        end
      end

      # Start the connection
      {:ok, pid} = SimpleConnection.start_link(MyPubSub, [], database: "demo")

      # Start listening to the "demo" channel
      SimpleConnection.call(pid, {:listen, "demo"})
      # => %Postgrex.Result{command: :listen}

      # Notify all listeners
      SimpleConnection.call(pid, {:query, ~s(NOTIFY "demo", 'hello')})
      # => %Postgrex.Result{command: :notify}

      # Check the inbox to see the notice message
      flush()
      # => {:notice, "demo", "hello"}

  See `Postgrex.Notifications` for a more complex implementation that can
  unlisten, handle process exits, and persist across reconnection.

  ## Name registration

  A `Postgrex.SimpleConnection` is bound to the same name registration rules as a
  `GenServer`. Read more about them in the `GenServer` docs.
  """

  @behaviour :gen_statem

  require Logger

  alias Postgrex.Protocol

  @doc false
  defstruct idle_interval: 5000,
            protocol: nil,
            auto_reconnect: false,
            reconnect_backoff: 500,
            state: nil

  ## PUBLIC API ##

  @type query :: iodata
  @type state :: term

  @typedoc since: "0.17.0"
  @type from :: {pid, term}

  @doc """
  Callback for process initialization.

  This is called once and before the Postgrex connection is established.
  """
  @callback init(term) :: {:ok, state}

  @doc """
  Callback for processing or relaying pubsub notifications.
  """
  @callback notify(binary, binary, state) :: :ok

  @doc """
  Invoked after connecting or reconnecting.

  This may be called multiple times if `:auto_reconnect` is true.
  """
  @callback handle_connect(state) :: {:noreply, state} | {:query, query, state}

  @doc """
  Invoked after disconnection.

  This is invoked regardless of the `:auto_reconnect` option.
  """
  @callback handle_disconnect(state) :: {:noreply, state}

  @doc """
  Callback for `SimpleConnection.call/3`.

  Replies must be sent with `SimpleConnection.reply/2`.
  """
  @callback handle_call(term, from, state) ::
              {:noreply, state} | {:query, query, state}

  @doc """
  Callback for `Kernel.send/2`.
  """
  @callback handle_info(term, state) :: {:noreply, state} | {:query, query, state}

  @doc """
  Callback for processing or relaying queries executed via `{:query, query, state}`.

  Either a list of successful query results or an error will be passed to this callback.
  A list is passed because the simple query protocol allows multiple commands to be
  issued in a single query.
  """
  @callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state) ::
              {:noreply, state}

  @optional_callbacks handle_call: 3,
                      handle_connect: 1,
                      handle_disconnect: 1,
                      handle_info: 2,
                      handle_result: 2

  @doc """
  Replies to the given client.

  Wrapper for `:gen_statem.reply/2`.
  """
  def reply({caller_pid, from} = _from, reply) when is_pid(caller_pid) do
    :gen_statem.reply(from, reply)
  end

  @doc """
  Calls the given server.

  Wrapper for `:gen_statem.call/3`.
  """
  def call(server, message, timeout \\ 5000) do
    with {__MODULE__, reason} <- :gen_statem.call(server, {message, self()}, timeout) do
      exit({reason, {__MODULE__, :call, [server, message, timeout]}})
    end
  end

  @doc false
  def child_spec(opts) do
    %{id: __MODULE__, start: {__MODULE__, :start_link, opts}}
  end

  @doc """
  Start the connection process and connect to Postgres.

  The options that this function accepts are the same as those accepted by
  `Postgrex.start_link/1`, as well as the extra options `:sync_connect`,
  `:auto_reconnect`, `:reconnect_backoff`, and `:configure`.

  ## Options

    * `:auto_reconnect` - automatically attempt to reconnect to the database
      in event of a disconnection. See the
      [note about async connect and auto-reconnects](#module-async-connect-and-auto-reconnects)
      above. Defaults to `false`, which means the process terminates.

    * `:configure` - A function to run before every connect attempt to dynamically
      configure the options as a `{module, function, args}`, where the current
      options will prepended to `args`. Defaults to `nil`.

    * `:idle_interval` - while also accepted on `Postgrex.start_link/1`, it has
      a default of `5000ms` in `Postgrex.SimpleConnection` (instead of 1000ms).

    * `:reconnect_backoff` - time (in ms) between reconnection attempts when
      `auto_reconnect` is enabled. Defaults to `500`.

    * `:sync_connect` - controls if the connection should be established on boot
      or asynchronously right after boot. Defaults to `true`.
  """
  @spec start_link(module, term, Keyword.t()) :: {:ok, pid} | {:error, Postgrex.Error.t() | term}
  def start_link(module, args, opts) do
    {name, opts} = Keyword.pop(opts, :name)

    opts = Keyword.put_new(opts, :sync_connect, true)
    connection_opts = Postgrex.Utils.default_opts(opts)
    start_args = {module, args, connection_opts}

    case name do
      nil ->
        :gen_statem.start_link(__MODULE__, start_args, [])

      atom when is_atom(atom) ->
        :gen_statem.start_link({:local, atom}, __MODULE__, start_args, [])

      {:global, _term} = tuple ->
        :gen_statem.start_link(tuple, __MODULE__, start_args, [])

      {:via, via_module, _term} = tuple when is_atom(via_module) ->
        :gen_statem.start_link(tuple, __MODULE__, start_args, [])

      other ->
        raise ArgumentError, """
        expected :name option to be one of the following:

          * nil
          * atom
          * {:global, term}
          * {:via, module, term}

        Got: #{inspect(other)}
        """
    end
  end

  ## CALLBACKS ##

  @state :no_state

  @doc false
  @impl :gen_statem
  def callback_mode, do: :handle_event_function

  @doc false
  @impl :gen_statem
  def init({mod, args, opts}) do
    case mod.init(args) do
      {:ok, mod_state} ->
        idle_timeout = opts[:idle_timeout]

        if idle_timeout do
          Logger.warning(
            ":idle_timeout in Postgrex.SimpleConnection is deprecated, " <>
              "please use :idle_interval instead"
          )
        end

        {idle_interval, opts} = Keyword.pop(opts, :idle_interval, idle_timeout || 5000)
        {auto_reconnect, opts} = Keyword.pop(opts, :auto_reconnect, false)
        {reconnect_backoff, opts} = Keyword.pop(opts, :reconnect_backoff, 500)

        state = %__MODULE__{
          idle_interval: idle_interval,
          auto_reconnect: auto_reconnect,
          reconnect_backoff: reconnect_backoff,
          state: {mod, mod_state}
        }

        put_opts(mod, opts)

        if opts[:sync_connect] do
          case handle_event(:internal, {:connect, :init}, @state, state) do
            {:keep_state, state} -> {:ok, @state, state}
            {:keep_state, state, actions} -> {:ok, @state, state, actions}
            {:stop, _reason, _state} = stop -> stop
          end
        else
          {:ok, @state, state, {:next_event, :internal, {:connect, :init}}}
        end
    end
  end

  @doc false
  @impl :gen_statem
  def handle_event(type, content, statem_state, state)

  def handle_event(:internal, {:connect, _}, @state, %{state: {mod, mod_state}} = state) do
    opts =
      case Keyword.get(opts(mod), :configure) do
        {module, fun, args} -> apply(module, fun, [opts(mod) | args])
        fun when is_function(fun, 1) -> fun.(opts(mod))
        nil -> opts(mod)
      end

    case Protocol.connect(opts) do
      {:ok, protocol} ->
        state = %{state | protocol: protocol}

        with {:keep_state, state, _} <- maybe_handle(mod, :handle_connect, [mod_state], state) do
          {:keep_state, state}
        end

      {:error, reason} ->
        if state.auto_reconnect do
          {:keep_state, state, {{:timeout, :backoff}, state.reconnect_backoff, nil}}
        else
          {:stop, reason, state}
        end
    end
  end

  def handle_event({:timeout, :backoff}, nil, @state, state) do
    {:keep_state, state, {:next_event, :internal, {:connect, :reconnect}}}
  end

  def handle_event({:call, from}, {msg, caller_pid}, @state, %{state: {mod, mod_state}} = state) do
    # We have to do a hack here to carry the actual caller PID over to the handle_call/3
    # callback, because gen_statem uses a proxy process to do calls with timeout != :infinity.
    # This results in the caller PID not being the same as the PID in the "from" tuple,
    # so things like Postgrex.Notifications cannot use that "from"'s PID to register
    # notification handlers. This approach is paired with reconstructing the proper
    # "from" tuple in the reply/2 function in this module.
    callback_from = {caller_pid, from}
    handle(mod, :handle_call, [msg, callback_from, mod_state], from, state)
  end

  def handle_event(:timeout, nil, @state, %{protocol: protocol} = state) do
    case Protocol.ping(protocol) do
      {:ok, protocol} ->
        {:keep_state, %{state | protocol: protocol}, {:timeout, state.idle_interval, nil}}

      {error, reason, protocol} ->
        reconnect_or_stop(error, reason, protocol, state)
    end
  end

  def handle_event(:info, msg, @state, %{protocol: protocol, state: {mod, mod_state}} = state) do
    opts = [notify: &mod.notify(&1, &2, mod_state)]

    case Protocol.handle_info(msg, opts, protocol) do
      {:ok, protocol} ->
        {:keep_state, %{state | protocol: protocol}, {:timeout, state.idle_interval, nil}}

      {:unknown, protocol} ->
        maybe_handle(mod, :handle_info, [msg, mod_state], %{state | protocol: protocol})

      {error, reason, protocol} ->
        reconnect_or_stop(error, reason, protocol, state)
    end
  end

  def handle_event(:info, msg, @state, %{state: {mod, mod_state}} = state) do
    maybe_handle(mod, :handle_info, [msg, mod_state], state)
  end

  @doc false
  @impl :gen_statem
  def format_status(_opts, [_pdict, @state, state]) do
    state
  end

  ## Helpers

  defp maybe_handle(mod, fun, args, state) do
    if function_exported?(mod, fun, length(args)) do
      handle(mod, fun, args, nil, state)
    else
      {:keep_state, state, {:timeout, state.idle_interval, nil}}
    end
  end

  defp handle(mod, fun, args, from, state) do
    case apply(mod, fun, args) do
      {:noreply, mod_state} ->
        {:keep_state, %{state | state: {mod, mod_state}}, {:timeout, state.idle_interval, nil}}

      {:query, query, mod_state} ->
        opts = [notify: &mod.notify(&1, &2, mod_state)]

        state = %{state | state: {mod, mod_state}}

        with {:ok, results, protocol} <- Protocol.handle_simple(query, opts, state.protocol),
             {:ok, protocol} <- Protocol.checkin(protocol) do
          state = %{state | protocol: protocol}

          handle(mod, :handle_result, [results, mod_state], from, state)
        else
          {:error, %Postgrex.Error{} = error, protocol} ->
            handle(mod, :handle_result, [error, mod_state], from, %{state | protocol: protocol})

          {:disconnect, reason, protocol} ->
            reconnect_or_stop(:disconnect, reason, protocol, state)
        end
    end
  end

  defp reconnect_or_stop(error, reason, protocol, %{state: {mod, mod_state}} = state)
       when error in [:error, :disconnect] do
    {:keep_state, state, _actions} = maybe_handle(mod, :handle_disconnect, [mod_state], state)

    if state.auto_reconnect do
      {:keep_state, state, {:next_event, :internal, {:connect, :reconnect}}}
    else
      {:stop, reason, %{state | protocol: protocol}}
    end
  end

  defp opts(mod), do: Process.get(mod)

  defp put_opts(mod, opts), do: Process.put(mod, opts)
end