lib/litestream.ex

defmodule Litestream do
  @moduledoc """
  This GenServer module allows you to run [Litestream](https://litestream.io/) via a port in the background
  so that you can easily backup your SQLite database to an object store.
  """

  use GenServer,
    restart: :transient,
    shutdown: 15_000

  require Logger

  alias Litestream.Downloader

  @call_timeout 10_000

  # +--------------------------------------------------------------------+
  # |                 GenServer Public API Functions                     |
  # +--------------------------------------------------------------------+

  @doc """
  The `start_link/1` function is used to start the `Litestream` GenServer. After starting the GenServer, the process
  will download the Litestream binary and start it up to begin database replication. The `Litestream` GenServer
  expects a Keyword list with the following options:

  * `:repo` - The Ecto Repo that manages the SQLite database. REQUIRED
  * `:replica_url` - The URL to which the SQLite database should be backed up. REQUIRED
  * `:access_key_id` - The access key ID to the provided `:replica_url`. REQUIRED
  * `:secret_access_key` - The secret access key to the provided `:replica_url`. REQUIRED
  * `:name` - The name of the GenServer process. By default it is `Litestream`. OPTIONAL
  * `:bin_path` - If you already have access to the Litestream binary, provide the path via this
                  option so that you can skip the download step. OPTIONAL
  """
  def start_link(opts) do
    state = %{
      repo: Keyword.fetch!(opts, :repo),
      replica_url: Keyword.fetch!(opts, :replica_url),
      access_key_id: Keyword.fetch!(opts, :access_key_id),
      secret_access_key: Keyword.fetch!(opts, :secret_access_key),
      bin_path: Keyword.get(opts, :bin_path, :download),
      version: Keyword.get(opts, :version, Downloader.latest_version())
    }

    GenServer.start_link(__MODULE__, state, name: Keyword.get(opts, :name, __MODULE__))
  end

  @doc """
  This function will return the status of the Litestream port with either a `:down` or
  `:running` atom.
  """
  def status(name \\ __MODULE__) do
    GenServer.call(name, :status, @call_timeout)
  end

  @doc """
  This function will cleanly stop the Litestream process, but the GenServer will still be
  running.
  """
  def stop_litestream(name \\ __MODULE__) do
    GenServer.call(name, :stop_litestream, @call_timeout)
  end

  @doc """
  This function will start the Litestream process, if it is not currently running. If it is
  already running, then this operation is effectively a no-op.
  """
  def start_litestream(name \\ __MODULE__) do
    GenServer.call(name, :start_litestream, @call_timeout)
  end

  # +------------------------------------------------------------------+
  # |                 GenServer Callback Functions                     |
  # +------------------------------------------------------------------+

  @impl true
  def init(state) do
    repo_config = state.repo.config()
    otp_app = Keyword.fetch!(repo_config, :otp_app)
    database_file = Keyword.fetch!(repo_config, :database)

    # Make sure that the process traps exits so that we can cleanly shutdown the
    # Litestream replication process
    Process.flag(:trap_exit, true)

    updated_state =
      state
      |> Map.put(:otp_app, otp_app)
      |> Map.put(:database, database_file)
      |> clear_pids()

    if state.bin_path == :download do
      {:ok, updated_state, {:continue, :download_litestream}}
    else
      unless File.exists?(state.bin_path) do
        raise "The path to the Litestream binary does not exist: #{inspect(state.bin_path)}"
      end

      {:ok, updated_state, {:continue, :start_litestream}}
    end
  end

  @impl true
  def handle_continue(:download_litestream, %{otp_app: otp_app, version: version} = state) do
    otp_app_priv_dir = :code.priv_dir(otp_app)
    download_dir = Path.join(otp_app_priv_dir, "/litestream/download")
    bin_dir = Path.join(otp_app_priv_dir, "/litestream/bin")

    File.mkdir_p!(download_dir)
    File.mkdir_p!(bin_dir)

    {:ok, bin_path} = Downloader.download_litestream(version, download_dir, bin_dir)

    updated_state = Map.put(state, :bin_path, bin_path)

    {:noreply, updated_state, {:continue, :start_litestream}}
  end

  def handle_continue(:start_litestream, state) do
    {:ok, port_pid, os_pid} =
      :exec.run_link(
        "#{state.bin_path} replicate #{state.database} #{state.replica_url}",
        [
          :monitor,
          {:env,
           [
             :clear,
             {"LITESTREAM_ACCESS_KEY_ID", state.access_key_id},
             {"LITESTREAM_SECRET_ACCESS_KEY", state.secret_access_key}
           ]},
          {:kill_timeout, 10},
          :stdout,
          :stderr
        ]
      )

    updated_state =
      state
      |> Map.put(:port_pid, port_pid)
      |> Map.put(:os_pid, os_pid)

    {:noreply, updated_state}
  end

  @impl true
  def handle_call(:status, _from, %{os_pid: os_pid} = state) do
    if os_pid in :exec.which_children() do
      {:reply, :running, state}
    else
      {:reply, :down, state}
    end
  end

  def handle_call(:start_litestream, _from, %{os_pid: os_pid} = state) do
    if os_pid in :exec.which_children() do
      Logger.info("Litestream is already running")

      {:reply, :ok, state}
    else
      Logger.info("Starting Litestream")

      {:reply, :ok, state, {:continue, :start_litestream}}
    end
  end

  def handle_call(:stop_litestream, _from, %{port_pid: port_pid, os_pid: os_pid} = state) do
    if os_pid in :exec.which_children() do
      :ok = :exec.kill(port_pid, :sigterm)

      {:reply, :ok, clear_pids(state)}
    else
      Logger.info("Litestream is not running")

      {:reply, :ok, state}
    end
  end

  @impl true
  def handle_info({:EXIT, _os_pid, reason}, state) do
    Logger.info("Litestream has exited with reason: #{inspect(reason)}")

    {:noreply, clear_pids(state)}
  end

  def handle_info({:DOWN, _os_pid, _process, _pid, reason}, state) do
    Logger.info("Litestream has shutdown with reason: #{reason}")

    {:noreply, state}
  end

  def handle_info({:stdout, _os_pid, output}, state) do
    Logger.info(output)

    {:noreply, state}
  end

  def handle_info({:stderr, _os_pid, output}, state) do
    Logger.warning(output)

    {:noreply, state}
  end

  @impl true
  def terminate(reason, _state) do
    Logger.info("Litestream is terminating with reason #{inspect(reason)}")

    :ok
  end

  # +------------------------------------------------------------------+
  # |                   Private Helper Functions                       |
  # +------------------------------------------------------------------+
  defp clear_pids(state) do
    state
    |> Map.put(:port_pid, nil)
    |> Map.put(:os_pid, nil)
  end
end