lib/timescaledb/cleaner.ex

defmodule Membrane.RTC.Engine.TimescaleDB.Cleaner do
  @moduledoc """
  Worker responsible for deleting obsolete records from the database.
  By default started under the application's supervision tree with params passed in `:membreane_rtc_engine_timescaledb` config (params passed in config, used to start worker, are these same, as expected by `start/1` and `start_link/1`)
  `start/1` and `start_link/1` functions expect a keyword list as an argument, with the following keys:
    * `:repo` (required) is a module, that uses `Ecto.Repo`
    * `:cleanup_interval` (default: 1 hour) is the number of seconds between database cleanups
    * `:metrics_lifetime` (default: 24 hours) is the number of seconds that must pass from creation before each metric can be deleted during cleanup
  The keyword may also include `GenServer` options, see `t:GenServer.option/0` for reference.
  """

  use GenServer

  alias Membrane.RTC.Engine.TimescaleDB.Model

  @type cleaner() :: pid() | atom()
  @type option() ::
          GenServer.option()
          | {:cleanup_interval, pos_integer()}
          | {:metrics_lifetime, pos_integer()}
          | {:repo, module()}
  @type options() :: [option()]

  @spec start(options()) :: GenServer.on_start()
  def start(options \\ []) do
    do_start(:start, options)
  end

  @spec start_link(options()) :: GenServer.on_start()
  def start_link(options \\ []) do
    do_start(:start_link, options)
  end

  defp do_start(function, options) do
    unless Keyword.has_key?(options, :repo) do
      raise ":repo key is required in keyword list passed to #{__MODULE__}.#{function}/1 function, got #{inspect(options)}"
    end

    {reporter_options, gen_server_options} =
      Keyword.split(options, [:repo, :cleanup_interval, :metrics_lifetime])

    apply(GenServer, function, [__MODULE__, reporter_options, gen_server_options])
  end

  @impl true
  def init(opts) do
    cleanup_interval_ms =
      Keyword.get(opts, :cleanup_interval, 60 * 60)
      |> Membrane.Time.seconds()
      |> Membrane.Time.as_milliseconds()

    metrics_lifetime_s = Keyword.get(opts, :metrics_lifetime, 60 * 60 * 24)

    state = %{
      repo: opts[:repo],
      cleanup_interval_ms: cleanup_interval_ms,
      metrics_lifetime_s: metrics_lifetime_s
    }

    Process.send_after(self(), :cleanup, cleanup_interval_ms)

    {:ok, state}
  end

  @impl true
  def handle_info(:cleanup, state) do
    Model.remove_outdated_records(state.repo, state.metrics_lifetime_s, "second")
    Process.send_after(self(), :cleanup, state.cleanup_interval_ms)
    {:noreply, state}
  end
end