lib/logflare_logger/http_backend.ex

defmodule LogflareLogger.HttpBackend do
  @moduledoc """
  Implements :gen_event behaviour, handles incoming Logger messages
  """

  @default_api_url "https://api.logflare.app"
  @app :logflare_logger_backend
  @behaviour :gen_event

  require Logger
  alias LogflareLogger.{Formatter, BatchCache, CLI, Utils}
  alias LogflareLogger.BackendConfig, as: Config

  @type level :: Logger.level()
  @type message :: Logger.message()
  @type metadata :: Logger.metadata()
  @type log_msg :: {level, pid, {Logger, message, term, metadata}} | :flush

  @spec init(__MODULE__, keyword) :: {:ok, Config.t()}
  def init(__MODULE__, options \\ []) when is_list(options) do
    schedule_in_flight_check()
    msg = "#{__MODULE__} v#{Application.spec(@app, :vsn)} started."
    log_after(:info, msg)

    options
    |> configure_merge(%Config{})
    |> schedule_flush()
  end

  @spec handle_event(log_msg, Config.t()) :: {:ok, Config.t()}
  def handle_event(:flush, config), do: flush!(config)

  def handle_event({_, gl, _}, config) when node(gl) != node() do
    {:ok, config}
  end

  def handle_event({level, _gl, {Logger, msg, datetime, metadata}}, %Config{} = config) do
    if log_level_matches?(level, config.level) do
      level
      |> Formatter.format_event(msg, datetime, metadata, config)
      |> BatchCache.put(config)
    end

    {:ok, config}
  end

  def handle_info({:log_after, level, message}, config) do
    Logger.log(level, message)

    {:ok, config}
  end

  def handle_info(:in_flight_check, config) do
    # If we somehow have events in flight stuck in our Repo, they get reset here to get flushed to Logflare.
    if GenServer.whereis(LogflareLogger.Repo) do
      count = BatchCache.events_in_flight() |> BatchCache.reset_events_in_flight() |> Enum.count()

      if count > 0 do
        msg =
          "#{__MODULE__} v#{Application.spec(@app, :vsn)} resetting #{count} log events in flight. If this continues please submit an issue."

        log_after(:warn, msg)
      end
    end

    {:ok, config}
  end

  def handle_info(:flush, config), do: flush!(config)

  def handle_info(_term, config), do: {:ok, config}

  @spec handle_call({:configure, keyword()}, Config.t()) :: {:ok, :ok, Config.t()}
  def handle_call({:configure, options}, %Config{} = config) do
    config = configure_merge(options, config)
    # Makes sure that next flush is done
    # after the configuration update
    # if the flush interval is lower than default or previous config
    schedule_flush(config)
    {:ok, :ok, config}
  end

  def code_change(_old_vsn, config, _extra), do: {:ok, config}

  def terminate(_reason, _state), do: :ok

  @spec configure_merge(keyword, Config.t()) :: Config.t()
  def configure_merge(options, %Config{} = config) when is_list(options) do
    # Configuration values are populated according to the following priorities:
    # 1. Dynamically confgiured options with Logger.configure(...)
    # 2. Application environment
    # 3. System environment
    # 4. Current config

    sys_options = Utils.find_logflare_sys_envs()
    app_options = Application.get_all_env(@app)

    options =
      app_options
      |> Keyword.merge(sys_options)
      |> Keyword.merge(options)

    url = Keyword.get(options, :url) || @default_api_url
    api_key = Keyword.get(options, :api_key)
    source_id = Keyword.get(options, :source_id)
    level = Keyword.get(options, :level, config.level)
    format = Keyword.get(options, :format, config.format)
    metadata = Keyword.get(options, :metadata, config.metadata)
    batch_max_size = Keyword.get(options, :batch_max_size, config.batch_max_size)
    flush_interval = Keyword.get(options, :flush_interval, config.flush_interval)

    CLI.throw_on_missing_url!(url)
    CLI.throw_on_missing_source!(source_id)
    CLI.throw_on_missing_api_key!(api_key)

    api_client = LogflareApiClient.new(%{url: url, api_key: api_key})

    config =
      struct!(
        Config,
        %{
          api_client: api_client,
          source_id: source_id,
          level: level,
          format: format,
          metadata: metadata,
          batch_size: config.batch_size,
          batch_max_size: batch_max_size,
          flush_interval: flush_interval
        }
      )

    if :ets.info(:logflare_logger_table) === :undefined do
      :ets.new(:logflare_logger_table, [:named_table, :set, :public])
    end

    :ets.insert(:logflare_logger_table, {:config, config})

    config
  end

  # Batching and flushing

  @spec flush!(Config.t()) :: {:ok, Config.t()}
  defp flush!(%Config{} = config) do
    if GenServer.whereis(LogflareLogger.Repo) do
      BatchCache.flush(config)
    end

    schedule_flush(config)
  end

  @spec schedule_flush(Config.t()) :: {:ok, Config.t()}
  defp schedule_flush(%Config{} = config) do
    Process.send_after(self(), :flush, config.flush_interval)
    {:ok, config}
  end

  defp schedule_in_flight_check() do
    Process.send_after(self(), :in_flight_check, 0)
  end

  defp log_after(level, message, delay \\ 5_000) do
    # We'd like to see these in Logflare so we delay the log message to make sure Logger and the Logflare backend has been started
    Process.send_after(self(), {:log_after, level, message}, delay)
  end

  # Events

  @spec log_level_matches?(level, level | nil) :: boolean
  defp log_level_matches?(_lvl, nil), do: true
  defp log_level_matches?(lvl, min), do: Logger.compare_levels(lvl, min) != :lt
end