lib/beeline/health_checker.ex

defmodule Beeline.HealthChecker do
  @moduledoc """
  A GenServer which periodically polls a producer's stream positions and
  process

  This GenServer emits `:telemetry` measurements which serve as an interface
  for exporting this health-check information to an external monitoring
  service.

  `Beeline.HealthChecker.Logger` is included as a reasonable default consumer
  for this telemetry. You may wish to export this telemetry to another system
  such as appsignal or grafana in order to create alerts when processors fall
  behind.

  ## Telemetry

  * `[:beeline, :health_check, :stop]` - dispatched by each HealthChecker
    process after polling the producer's position and process information
      * Measurement: `%{duration: native_time}` - the time taken to perform
        the position and process checks
      * Metadata, a map with the following keys:
          * `:producer` (module) - the producer module being measured
          * `:alive?` (boolean) - whether the producer process is alive
          * `:stream_name` (string) - the EventStoreDB stream name from which
            the producer reads
          * `:hostname` (string) - the hostname of the machine on which
            the health checker process is being run
          * `:interval` (integer) - the milliseconds the health checker process
            has waited (minus drift) since the last poll
          * `:drift` (integer) - the milliseconds used for drifting the interval
            for the last poll
          * `:measurement_time` (UTC datetime) - the time when the poll started
          * `:prior_position` (integer) - the `:current_position` from the last
            poll
          * `:current_position` (integer) - the current stream position of
            the producer
          * `:head_position` (integer) - the stream position of the head
            (the latest event) of the EventStoreDB stream
          * `:auto_subscribe` (boolean) - the value of the producer's `:auto_subscribe?`
            flag at the time of polling
  """

  @behaviour GenServer

  defstruct [
    :producer,
    :stream_name,
    :interval_fn,
    :drift_fn,
    :get_stream_position,
    :get_head_position,
    :hostname,
    interval: 0,
    drift: 0,
    current_position: -1,
    auto_subscribe?: false
  ]

  @doc false
  def child_spec({config, key, producer}) do
    %{
      id: {__MODULE__, key},
      start: {__MODULE__, :start_link, [{config, producer}]},
      type: :worker,
      restart: :permanent
    }
  end

  @doc false
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl GenServer
  def init({config, producer}) do
    state =
      %__MODULE__{
        producer: producer.name,
        stream_name: producer.stream_name,
        get_head_position: fn ->
          Beeline.EventStoreDB.latest_event_number(
            producer.adapter,
            producer.connection,
            producer.stream_name
          )
        end,
        get_stream_position:
          wrap_function(config.get_stream_position, producer.name),
        interval_fn: wrap_function(config.health_check_interval),
        drift_fn: wrap_function(config.health_check_drift),
        hostname: hostname(),
        auto_subscribe?: wrap_function(config.auto_subscribe?, producer.name)
      }
      |> schedule_next_poll()

    {:ok, state}
  end

  @impl GenServer
  def handle_info(:poll, state) do
    state =
      state
      |> poll_producer()
      |> schedule_next_poll()

    {:noreply, state}
  end

  defp schedule_next_poll(state) do
    interval = state.interval_fn.()
    drift = state.drift_fn.()

    Process.send_after(self(), :poll, interval + drift)

    %__MODULE__{
      state
      | drift: drift,
        interval: interval
    }
  end

  defp poll_producer(state) do
    metadata = %{
      producer: state.producer,
      stream_name: state.stream_name,
      hostname: state.hostname,
      interval: state.interval,
      drift: state.drift,
      measurement_time: DateTime.utc_now(),
      prior_position: state.current_position,
      auto_subscribe: state.auto_subscribe?.()
    }

    :telemetry.span(
      [:beeline, :health_check],
      metadata,
      fn ->
        current_position = state.get_stream_position.()

        metadata =
          Map.merge(metadata, %{
            current_position: current_position,
            head_position: state.get_head_position.(),
            alive?: alive?(state.producer)
          })

        state = put_in(state.current_position, current_position)

        {state, metadata}
      end
    )
  end

  defp alive?(producer) do
    case GenServer.whereis(producer) do
      nil -> false
      pid -> Process.alive?(pid)
    end
  end

  defp hostname do
    case :inet.gethostname() do
      {:ok, hostname_charlist} ->
        hostname_charlist |> to_string()

      _ ->
        nil
    end
  end

  # coveralls-ignore-start
  defp wrap_function(function) when is_function(function, 0), do: function

  defp wrap_function({m, f, a}), do: fn -> apply(m, f, a) end

  defp wrap_function(value), do: fn -> value end

  defp wrap_function(function, producer_name) when is_function(function, 1) do
    fn -> function.(producer_name) end
  end

  defp wrap_function({m, f, a}, producer_name) do
    fn -> apply(m, f, [producer_name | a]) end
  end

  # coveralls-ignore-stop
end