lib/clickhouse_logger.ex

defmodule ClickhouseLogger do
  @moduledoc """
  The Elixir [Logger](https://hexdocs.pm/logger/Logger.html) backend that sends logs to
  [clickhouse](https://clickhouse.com) server.

  ## Configuration example

  ```elixir
  # config.exs
  config :logger, ClickhouseLogger,
    base_uri: "http://localhost:8123",
    database: "logs",
    fields: [
      ts: :timestamp,
      msg: :message,
      app: {:meta, :app, :string},
      file: {:meta, :file, :string},
      line: {:meta, :line, :uint32},
    ]

  config :logger,
    baclends: [:console, ClickhouseLogger]
  ```

  ## Common configuration options

  ### database

  The Clickhouse database to write logs to. Default value: `"default"`.

  ### table

  The Clickhouse table to write logs to. Default value: `"logs"`.

  ### fields (required)

  A keyword list of logged fields. The keys is a Clickhouse table field names. The values can be one of:

  - `:timestamp`, `:message` atoms to log appropriate log fields.
  - `{:meta, field_name, type}` to log appropriate field from metadata. Currently supported types: `:string`,
    `:uint8/16/32/64`, `:int8/16/32/64` and `{:array, type}`.
  - `{CustomModule, :custom_func, type}` to custom encode logged value. `CustomModule.custom_func/4` will
    be called with `level`, `timestamp`, `message` and `metadata` arguments and should return logged value.

  ### buffer_size

  The number of message that will be buffered in memory before sending to Clickhouse. Default value: `1_000`.

  ### buffer_timeout

  The number of milliseconds after which all buffered messages will be sent to Clickhouse even if they
  not exceeds `buffer_size` value. Default value: `3_000`.

  ### buffer_limit

  The maximum number of messages in memory buffer. This option needed for cases when Clickhouse server
  is not available for some reasons. If this limit is reached futher log messages will be dropped.
  Default value: `10_000`.

  ## HTTP client configuration options

  ### base_uri (required)

  the HTTP endpoint of Clickhouse server URL.
  """

  require Logger

  defstruct [
    level: nil,
    meta: [],
    buffer: [],
    buffer_length: 0,
    buffer_size: nil,
    buffer_timeout: nil,
    buffer_limit: nil,
    client: nil,
    client_state: nil,
    fields: nil,
    timer: nil,
  ]

  @behaviour :gen_event

  @type level :: :debug | :info | :warn | :error | atom
  @type message :: iodata
  @type timestamp :: {
    {non_neg_integer, non_neg_integer, non_neg_integer},
    {non_neg_integer, non_neg_integer, non_neg_integer, non_neg_integer}
  }
  @type metadata :: keyword

  @default_config [
    buffer_size: 1_000,
    buffer_timeout: 3_000,
    buffer_limit: 10_000,
  ]

  @unix_seconds ~N[1970-01-01 00:00:00] |> NaiveDateTime.to_gregorian_seconds() |> elem(0)

  @impl true
  def init(__MODULE__) do
    config = Keyword.merge(@default_config, Application.get_env(:logger, __MODULE__, []))
    level = Keyword.get(config, :level, Application.get_env(:logger, :level, :debug))
    client = Keyword.get(config, :client, ClickhouseLogger.HttpClient)
    st = %__MODULE__{level: level, client: client}
    with {:ok, client_state} <- client.init(config),
         {:ok, st} <- configure(config, %{st | client_state: client_state}) do
      {:ok, st}
    else
      {:error, error} ->
        Logger.error(
          "ClickhouseLogger: Initialization error: #{inspect error}",
          ignore_clickhouse_backend: true
        )
        {:error, :ignore}
    end
  end

  @impl true
  def handle_call({:configure, config}, st) do
    with {:ok, st} <- configure(config, st) do
      {:ok, :ok, restart_timer(st)}
    end
  end

  @impl true
  def handle_event({_level, gl, _event}, st) when node(gl) != node() do
    {:ok, st}
  end
  def handle_event({level, _gl, {Logger, msg, ts, meta}}, %{level: min_level, meta: app_meta} = st) do
    ignore_clickhouse_backend = Keyword.get(meta, :ignore_clickhouse_backend, false)
    if not ignore_clickhouse_backend and right_log_level?(min_level, level) do
      level
      |> encode_message(ts, msg, Keyword.merge(app_meta, meta), st)
      |> enqueue_message(st)
    else
      {:ok, st}
    end
  rescue
    error ->
      Logger.error(
        "ClickhouseLogger: Logging failed: #{inspect error}",
        ignore_clickhouse_backend: true
      )
      {:ok, st}
  end
  def handle_event(:flush, st) do
    {:ok, st |> restart_timer() |> send_messages()}
  end
  def handle_event(_evt, st) do
    {:ok, st}
  end

  @impl true
  def handle_info({__MODULE__, :flush}, st) do
    {:ok, st |> restart_timer() |> send_messages()}
  end
  def handle_info(_other, st) do
    {:ok, st}
  end

  @impl true
  def terminate(_args, st) do
    send_messages(st)
    :ok
  end

  defp configure(config, %{fields: prev_fields, buffer_timeout: prev_timeout} = st) do
    st
    |> parse_meta(Keyword.get(config, :metadata))
    |> parse_buffer_size(Keyword.get(config, :buffer_size))
    |> parse_buffer_limit(Keyword.get(config, :buffer_limit))
    |> parse_buffer_timeout(Keyword.get(config, :buffer_timeout))
    |> parse_fields(Keyword.get(config, :fields))
    |> case do
      {:ok, %{fields: ^prev_fields} = st} ->
        configure_client(config, st)

      {:ok, %{fields: [_ | _] = fields} = st} ->
        client_fields = Enum.map(fields, fn {field, _path, type} -> {field, type} end)
        configure_client(Keyword.put(config, :fields, client_fields), st)

      other ->
        other
    end
    |> case do
      {:ok, %{buffer_timeout: ^prev_timeout} = st} -> {:ok, st}
      {:ok, st} -> {:ok, restart_timer(st)}
      other -> other
    end
  end

  defp configure_client(config, %{client: client, client_state: client_state} = st) do
    case client.configure(config, client_state) do
      {:ok, client_state} -> {:ok, %{st | client_state: client_state}}
      {:error, error} -> {:error, {:client_configuration, error}}
    end
  end

  defp right_log_level?(nil, _level), do: true
  defp right_log_level?(min_level, level), do: Logger.compare_levels(level, min_level) != :lt

  defp parse_meta(st, nil), do: {:ok, st}
  defp parse_meta(st, meta) when is_list(meta) do
    if Keyword.keyword?(meta) do
      {:ok, %{st | meta: meta}}
    else
      {:error, :invalid_meta}
    end
  end
  defp parse_meta(_st, _meta), do: {:error, :invalid_meta}

  defp parse_buffer_size({:ok, st}, nil), do: {:ok, st}
  defp parse_buffer_size({:ok, st}, size) when is_integer(size) and size >= 0, do: {:ok, %{st | buffer_size: size}}
  defp parse_buffer_size({:ok, _st}, _size), do: {:error, :invalid_buffer_size}
  defp parse_buffer_size(error, _size), do: error

  defp parse_buffer_limit({:ok, st}, nil), do: {:ok, st}
  defp parse_buffer_limit({:ok, st}, limit) when is_integer(limit) and limit >= 0 do
    {:ok, %{st | buffer_limit: limit}}
  end
  defp parse_buffer_limit({:ok, _st}, _limit), do: {:error, :invalid_buffer_limit}
  defp parse_buffer_limit(error, _limit), do: error

  defp parse_buffer_timeout({:ok, st}, nil), do: {:ok, st}
  defp parse_buffer_timeout({:ok, st}, timeout) when is_integer(timeout) and timeout >= 0 do
    {:ok, %{st | buffer_timeout: timeout}}
  end
  defp parse_buffer_timeout({:ok, _st}, _timeout), do: {:error, :invalid_buffer_timeout}
  defp parse_buffer_timeout(error, _timeout), do: error

  defp parse_fields({:ok, st}, nil), do: {:ok, st}
  defp parse_fields({:ok, st}, [_ | _] = fields) do
    with {:ok, fields} <- Enum.reduce_while(fields, {:ok, []}, &parse_field/2) do
      {:ok, %{st | fields: fields}}
    end
  end
  defp parse_fields({:ok, st}, []), do: {:ok, Map.put(st, :fields, [])}
  defp parse_fields({:ok, _st}, _fields), do: {:error, :invalid_fields}
  defp parse_fields(error, _fields), do: error

  defp parse_field({field, :message}, {:ok, acc}), do: {:cont, {:ok, [{field, [:message], :string} | acc]}}
  defp parse_field({field, :timestamp}, {:ok, acc}), do: {:cont, {:ok, [{field, [:timestamp], :uint64} | acc]}}
  defp parse_field({field, {:meta, meta_field, type}}, {:ok, acc}) do
    {:cont, {:ok, [{field, [:meta, meta_field], type} | acc]}}
  end
  defp parse_field({field, {mod, fun, type}}, {:ok, acc}) when is_atom(mod) and is_atom(fun) do
    Code.ensure_compiled!(mod)
    if function_exported?(mod, fun, 4) do
      {:cont, {:ok, [{field, Function.capture(mod, fun, 4), type} | acc]}}
    else
      {:halt, {:error, :invalid_field_converter}}
    end
  end
  defp parse_field(_other, _result), do: {:halt, {:error, :invalid_field}}

  defp enqueue_message(
    msg,
    %{
      buffer: buffer,
      buffer_length: buffer_length,
      buffer_size: buffer_size,
      buffer_limit: buffer_limit,
    } = st
  ) when buffer_length < buffer_size and buffer_length < buffer_limit do
    {:ok, %{st | buffer: [msg | buffer], buffer_length: buffer_length + 1}}
  end
  defp enqueue_message(
    msg,
    %{
      buffer: buffer,
      buffer_length: buffer_length,
      buffer_limit: buffer_limit,
    } = st
  ) when buffer_length < buffer_limit do
    {:ok, send_messages(%{st | buffer: [msg | buffer], buffer_length: buffer_length + 1})}
  end
  defp enqueue_message(_msg, %{buffer_limit: buffer_limit} = st) do
    Logger.error(
      "ClickhouseLogger: buffer limit (for #{buffer_limit} messages) was overflown",
      ignore_clickhouse_backend: true
    )
    {:ok, %{st | buffer: [], buffer_length: 0}}
  end

  defp encode_message(level, timestamp, message, meta, %{fields: fields}) do
    Enum.map(fields, fn
      {_, [:timestamp], _} -> encode_timestamp(timestamp)
      {_, [:message], _} -> message
      {_, [:meta | path], type} -> encode_field(get_in(meta, path), type)
      {_, f, _} when is_function(f, 4) -> f.(level, timestamp, message, meta)
    end)
  end

  defp encode_timestamp({date, {hh, mm, ss, ms}}) do
    case NaiveDateTime.from_erl({date, {hh, mm, ss}}) do
      {:ok, timestamp} ->
        {s, _ms} = NaiveDateTime.to_gregorian_seconds(timestamp)
        (s - @unix_seconds) * 1000 + ms

      _ ->
        0
    end
  end
  defp encode_timestamp(_), do: ""

  defp encode_field(nil, :string), do: ""
  defp encode_field(nil, :uint8), do: 0
  defp encode_field(nil, :uint16), do: 0
  defp encode_field(nil, :uint32), do: 0
  defp encode_field(nil, :uint64), do: 0
  defp encode_field(nil, :int8), do: 0
  defp encode_field(nil, :int16), do: 0
  defp encode_field(nil, :int32), do: 0
  defp encode_field(nil, :int64), do: 0
  defp encode_field(nil, {:array, _subtype}), do: []
  defp encode_field(v, :string) when is_pid(v), do: inspect(v)
  defp encode_field(v, :string), do: to_string(v)
  defp encode_field(v, _), do: v

  defp send_messages(%{buffer: [_ | _] = data, client: client, client_state: client_state} = st) do
    case client.send(data, client_state) do
      {:ok, client_state} ->
        %{st | buffer: [], buffer_length: 0, client_state: client_state}

      {:error, error, client_state} ->
        Logger.error(
          "ClickhouseLogger: Error sending logs to clickhouse server: #{inspect error}",
          ignore_clickhouse_errors: true
        )
        %{st | client_state: client_state}
    end
  end
  defp send_messages(st), do: st

  defp restart_timer(%{timer: timer} = st) when not is_nil(timer) do
    Process.cancel_timer(timer, async: true, info: false)
    restart_timer(%{st | timer: nil})
  end
  defp restart_timer(%{buffer_timeout: timeout} = st) when timeout > 0 do
    %{st | timer: Process.send_after(self(), {__MODULE__, :flush}, timeout)}
  end
  defp restart_timer(st) do
    st
  end
end