Skip to main content

lib/billdog_eng/analytics.ex

defmodule BilldogEng.Analytics do
  @moduledoc """
  Analytics client: capture/identify/groupIdentify/alias + batching.

  Events accumulate in an in-memory queue (held in this `GenServer`) and are
  flushed as a single batched POST to `/ingest-events` when:

    * the queue reaches `flush_at`,
    * the background timer fires every `flush_interval`, or
    * `flush/1` / `shutdown/1` is called.

  A failed flush re-queues its batch (front of the queue) so events are never
  lost on a single transient failure; the `BilldogEng.Transport` also retries
  with backoff before a flush is considered failed.

  Mirrors the Node SDK `analytics.ts`.
  """

  use GenServer
  require Logger
  alias BilldogEng.Transport

  defstruct [
    :api_key,
    :transport,
    :flush_at,
    :flush_interval,
    :max_queue_size,
    :group_type_index,
    :enable_logging,
    :timer_ref,
    queue: []
  ]

  # ── Lifecycle ───────────────────────────────────────────────────────────────

  @doc false
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl true
  def init(opts) do
    state = %__MODULE__{
      api_key: Keyword.fetch!(opts, :api_key),
      transport: Keyword.fetch!(opts, :transport),
      flush_at: Keyword.fetch!(opts, :flush_at),
      flush_interval: Keyword.fetch!(opts, :flush_interval),
      max_queue_size: Keyword.fetch!(opts, :max_queue_size),
      group_type_index: Keyword.get(opts, :group_type_index),
      enable_logging: Keyword.get(opts, :enable_logging, false)
    }

    {:ok, schedule_timer(state)}
  end

  # ── Public API ──────────────────────────────────────────────────────────────

  @doc "Capture an arbitrary event for a user. Batched per the configured policy."
  @spec capture(pid(), String.t(), String.t(), map(), map()) :: :ok
  def capture(pid, distinct_id, event, properties \\ %{}, groups \\ %{}) do
    GenServer.cast(pid, {:enqueue, distinct_id, event, {:with_groups, properties, groups}})
  end

  @doc "Set person properties. Emits `$identify`."
  @spec identify(pid(), String.t(), map()) :: :ok
  def identify(pid, distinct_id, properties \\ %{}) do
    props = Map.merge(%{"$set" => properties}, properties)
    GenServer.cast(pid, {:enqueue, distinct_id, "$identify", {:raw, props}})
  end

  @doc "Set properties on a group. Emits `$groupidentify`."
  @spec group_identify(pid(), String.t(), String.t(), map()) :: :ok
  def group_identify(pid, group_type, group_key, properties \\ %{}) do
    props = %{
      "$group_type" => group_type,
      "$group_key" => group_key,
      "$group_set" => properties
    }

    GenServer.cast(pid, {:enqueue, group_key, "$groupidentify", {:raw, props}})
  end

  @doc "Alias one distinct id to another. Emits `$create_alias`."
  @spec alias(pid(), String.t(), String.t()) :: :ok
  def alias(pid, distinct_id, alias) do
    props = %{"alias" => alias, "distinct_id" => distinct_id}
    GenServer.cast(pid, {:enqueue, distinct_id, "$create_alias", {:raw, props}})
  end

  @doc "Force-flush the queue immediately. Blocks until the in-flight batch settles."
  @spec flush(pid()) :: :ok | {:error, BilldogEng.Error.t()}
  def flush(pid) do
    GenServer.call(pid, :flush, :infinity)
  end

  @doc "Current number of queued (not-yet-flushed) events."
  @spec queue_length(pid()) :: non_neg_integer()
  def queue_length(pid) do
    GenServer.call(pid, :queue_length, :infinity)
  end

  @doc "Flush remaining events and stop the background timer."
  @spec shutdown(pid()) :: :ok | {:error, BilldogEng.Error.t()}
  def shutdown(pid) do
    GenServer.call(pid, :shutdown, :infinity)
  end

  # ── Server callbacks ─────────────────────────────────────────────────────────

  @impl true
  def handle_cast({:enqueue, distinct_id, event_name, properties_spec}, state) do
    properties = resolve_properties(properties_spec, state)

    wire_event = %{
      "event_name" => event_name,
      "event_timestamp" => System.system_time(:millisecond),
      "properties" => properties,
      "user_id" => distinct_id
    }

    queue = state.queue ++ [wire_event]

    queue =
      if length(queue) > state.max_queue_size do
        dropped = length(queue) - state.max_queue_size
        log(state, "max queue size exceeded, dropped #{dropped} oldest event(s)")
        Enum.drop(queue, dropped)
      else
        queue
      end

    state = %{state | queue: queue}

    state =
      if length(state.queue) >= state.flush_at do
        {_result, state} = do_flush(state)
        state
      else
        state
      end

    {:noreply, state}
  end

  @impl true
  def handle_call(:flush, _from, state) do
    {result, state} = do_flush(state)
    {:reply, result, state}
  end

  def handle_call(:queue_length, _from, state) do
    {:reply, length(state.queue), state}
  end

  def handle_call(:shutdown, _from, state) do
    state = cancel_timer(state)
    {result, state} = do_flush(state)
    {:reply, result, state}
  end

  @impl true
  def handle_info(:flush_tick, state) do
    {_result, state} = do_flush(state)
    {:noreply, schedule_timer(state)}
  end

  # ── Internals ────────────────────────────────────────────────────────────────

  defp resolve_properties({:raw, props}, _state), do: props

  defp resolve_properties({:with_groups, properties, groups}, state) do
    with_groups(properties, groups, state.group_type_index)
  end

  defp with_groups(properties, groups, _index) when map_size(groups) == 0 do
    properties
  end

  defp with_groups(properties, groups, index) do
    out = Map.put(properties, "$groups", groups)

    if is_map(index) do
      Enum.reduce(groups, out, fn {type, key}, acc ->
        idx = Map.get(index, type) || Map.get(index, to_string(type))

        if is_integer(idx) and idx >= 0 and idx <= 4 do
          Map.put(acc, "$group_#{idx}", key)
        else
          acc
        end
      end)
    else
      out
    end
  end

  defp do_flush(%{queue: []} = state), do: {:ok, state}

  defp do_flush(state) do
    batch = state.queue
    state = %{state | queue: []}

    body = %{api_key: state.api_key, events: batch}

    case Transport.request(state.transport,
           path: "/ingest-events",
           body: body,
           headers: [{"x-api-key", state.api_key}]
         ) do
      {:ok, _data} ->
        log(state, "flushed #{length(batch)} event(s)")
        {:ok, state}

      {:error, err} ->
        # Re-queue the batch at the front so it is retried on the next flush.
        log(state, "flush failed, re-queued #{length(batch)} event(s): #{err.message}")
        {{:error, err}, %{state | queue: batch ++ state.queue}}
    end
  end

  defp schedule_timer(state) do
    ref = Process.send_after(self(), :flush_tick, state.flush_interval)
    %{state | timer_ref: ref}
  end

  defp cancel_timer(%{timer_ref: nil} = state), do: state

  defp cancel_timer(%{timer_ref: ref} = state) do
    Process.cancel_timer(ref)
    %{state | timer_ref: nil}
  end

  defp log(%{enable_logging: true}, msg), do: Logger.debug("[BilldogEng:analytics] #{msg}")
  defp log(_state, _msg), do: :ok
end