lib/on_flow/channel.ex

defmodule OnFlow.Channel do
  @moduledoc """
  Manages the GRPC channel state. Using this in production is not recommended.

  Config options are:

    * `:host` - the host to connect to
    * `:connect_on_start` - `true` to start the channel on application start
    * `:secure` - `true` to use HTTP/2.
    * `:metadata` - metadata headers to send on connect.

  Example:

      config :on_flow,
        host: "grpc.example.com:443",
        connect_on_start: true,
        secure: true,
        metadata: [api_key: "123123"]
  """

  use GenServer

  @doc false
  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg, name: __MODULE__)
  end

  @doc """
  Connects to the GRPC server. Blocks the process until a connection is
  established.
  """
  def connect(host \\ host(), opts \\ default_opts()) do
    GenServer.call(__MODULE__, {:connect, host, opts})
  end

  @doc """
  Connects to the GRPC server asynchronously.
  """
  def connect_async(host \\ host(), opts \\ default_opts()) do
    GenServer.cast(__MODULE__, {:connect_async, host, opts})
  end

  @doc """
  Returns the current GRPC channel.
  """
  def get_channel do
    GenServer.call(__MODULE__, :get_channel)
  end

  @doc """
  Sets the current GRPC channel. OnFlow will pull the channel from this
  GenServer to make requests, so use this to manage the channel manually if
  needed.
  """
  def put_channel(%GRPC.Channel{} = channel) do
    GenServer.call(__MODULE__, {:put_channel, channel})
  end

  @impl true
  def init(arg) do
    schedule_ping()

    connect? = connect_on_start?()

    channel =
      case arg do
        %GRPC.Channel{} -> arg
        _ -> if connect?, do: new_channel(host(), default_opts()), else: nil
      end

    state = %{channel: channel, connected?: connect?}

    {:ok, state}
  end

  @impl true
  def handle_call({:connect, host, opts}, _from, state) do
    channel = new_channel(host, opts)
    {:reply, channel, %{state | channel: channel}}
  end

  def handle_call(:get_channel, _from, %{channel: channel} = state) do
    {:reply, channel, state}
  end

  @impl true
  def handle_cast({:connect_async, host, opts}, state) do
    {:noreply, %{state | channel: new_channel(host, opts)}}
  end

  @impl true
  def handle_info(:ping, %{channel: nil} = state) do
    {:noreply, state}
  end

  def handle_info(:ping, %{channel: channel} = state) do
    request = OnFlow.Access.PingRequest.new()

    channel
    |> OnFlow.Access.AccessAPI.Stub.ping(request)
    |> handle_pong()

    schedule_ping()

    {:noreply, state}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end

  defp new_channel(host, opts) do
    {:ok, channel} = GRPC.Stub.connect(host, opts)
    channel
  end

  defp default_opts do
    opts = []

    opts =
      case metadata() do
        nil -> opts
        metadata -> opts ++ [metadata: metadata]
      end

    opts =
      case secure?() do
        true -> opts ++ [cred: GRPC.Credential.new([])]
        _ -> opts
      end

    opts
  end

  defp schedule_ping do
    Process.send_after(self(), :ping, 60_000)
  end

  defp handle_pong({:ok, %OnFlow.Access.PingResponse{}}), do: :ok
  defp handle_pong(_error), do: connect_async()

  defp host, do: Application.get_env(:on_flow, :host)
  defp connect_on_start?, do: Application.get_env(:on_flow, :connect_on_start, true)
  defp metadata, do: Application.get_env(:on_flow, :metadata)
  defp secure?, do: Application.get_env(:on_flow, :secure)
end