defmodule BilldogEng.Analytics do
@moduledoc """
Analytics client: capture/identify/groupIdentify/alias + batching.
Events accumulate in an in-memory queue (held in this `GenServer`) and are
flushed as a single batched POST to `/ingest-events` when:
* the queue reaches `flush_at`,
* the background timer fires every `flush_interval`, or
* `flush/1` / `shutdown/1` is called.
A failed flush re-queues its batch (front of the queue) so events are never
lost on a single transient failure; the `BilldogEng.Transport` also retries
with backoff before a flush is considered failed.
Mirrors the Node SDK `analytics.ts`.
"""
use GenServer
require Logger
alias BilldogEng.Transport
defstruct [
:api_key,
:transport,
:flush_at,
:flush_interval,
:max_queue_size,
:group_type_index,
:enable_logging,
:timer_ref,
queue: []
]
# ── Lifecycle ───────────────────────────────────────────────────────────────
@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@impl true
def init(opts) do
state = %__MODULE__{
api_key: Keyword.fetch!(opts, :api_key),
transport: Keyword.fetch!(opts, :transport),
flush_at: Keyword.fetch!(opts, :flush_at),
flush_interval: Keyword.fetch!(opts, :flush_interval),
max_queue_size: Keyword.fetch!(opts, :max_queue_size),
group_type_index: Keyword.get(opts, :group_type_index),
enable_logging: Keyword.get(opts, :enable_logging, false)
}
{:ok, schedule_timer(state)}
end
# ── Public API ──────────────────────────────────────────────────────────────
@doc "Capture an arbitrary event for a user. Batched per the configured policy."
@spec capture(pid(), String.t(), String.t(), map(), map()) :: :ok
def capture(pid, distinct_id, event, properties \\ %{}, groups \\ %{}) do
GenServer.cast(pid, {:enqueue, distinct_id, event, {:with_groups, properties, groups}})
end
@doc "Set person properties. Emits `$identify`."
@spec identify(pid(), String.t(), map()) :: :ok
def identify(pid, distinct_id, properties \\ %{}) do
props = Map.merge(%{"$set" => properties}, properties)
GenServer.cast(pid, {:enqueue, distinct_id, "$identify", {:raw, props}})
end
@doc "Set properties on a group. Emits `$groupidentify`."
@spec group_identify(pid(), String.t(), String.t(), map()) :: :ok
def group_identify(pid, group_type, group_key, properties \\ %{}) do
props = %{
"$group_type" => group_type,
"$group_key" => group_key,
"$group_set" => properties
}
GenServer.cast(pid, {:enqueue, group_key, "$groupidentify", {:raw, props}})
end
@doc "Alias one distinct id to another. Emits `$create_alias`."
@spec alias(pid(), String.t(), String.t()) :: :ok
def alias(pid, distinct_id, alias) do
props = %{"alias" => alias, "distinct_id" => distinct_id}
GenServer.cast(pid, {:enqueue, distinct_id, "$create_alias", {:raw, props}})
end
@doc "Force-flush the queue immediately. Blocks until the in-flight batch settles."
@spec flush(pid()) :: :ok | {:error, BilldogEng.Error.t()}
def flush(pid) do
GenServer.call(pid, :flush, :infinity)
end
@doc "Current number of queued (not-yet-flushed) events."
@spec queue_length(pid()) :: non_neg_integer()
def queue_length(pid) do
GenServer.call(pid, :queue_length, :infinity)
end
@doc "Flush remaining events and stop the background timer."
@spec shutdown(pid()) :: :ok | {:error, BilldogEng.Error.t()}
def shutdown(pid) do
GenServer.call(pid, :shutdown, :infinity)
end
# ── Server callbacks ─────────────────────────────────────────────────────────
@impl true
def handle_cast({:enqueue, distinct_id, event_name, properties_spec}, state) do
properties = resolve_properties(properties_spec, state)
wire_event = %{
"event_name" => event_name,
"event_timestamp" => System.system_time(:millisecond),
"properties" => properties,
"user_id" => distinct_id
}
queue = state.queue ++ [wire_event]
queue =
if length(queue) > state.max_queue_size do
dropped = length(queue) - state.max_queue_size
log(state, "max queue size exceeded, dropped #{dropped} oldest event(s)")
Enum.drop(queue, dropped)
else
queue
end
state = %{state | queue: queue}
state =
if length(state.queue) >= state.flush_at do
{_result, state} = do_flush(state)
state
else
state
end
{:noreply, state}
end
@impl true
def handle_call(:flush, _from, state) do
{result, state} = do_flush(state)
{:reply, result, state}
end
def handle_call(:queue_length, _from, state) do
{:reply, length(state.queue), state}
end
def handle_call(:shutdown, _from, state) do
state = cancel_timer(state)
{result, state} = do_flush(state)
{:reply, result, state}
end
@impl true
def handle_info(:flush_tick, state) do
{_result, state} = do_flush(state)
{:noreply, schedule_timer(state)}
end
# ── Internals ────────────────────────────────────────────────────────────────
defp resolve_properties({:raw, props}, _state), do: props
defp resolve_properties({:with_groups, properties, groups}, state) do
with_groups(properties, groups, state.group_type_index)
end
defp with_groups(properties, groups, _index) when map_size(groups) == 0 do
properties
end
defp with_groups(properties, groups, index) do
out = Map.put(properties, "$groups", groups)
if is_map(index) do
Enum.reduce(groups, out, fn {type, key}, acc ->
idx = Map.get(index, type) || Map.get(index, to_string(type))
if is_integer(idx) and idx >= 0 and idx <= 4 do
Map.put(acc, "$group_#{idx}", key)
else
acc
end
end)
else
out
end
end
defp do_flush(%{queue: []} = state), do: {:ok, state}
defp do_flush(state) do
batch = state.queue
state = %{state | queue: []}
body = %{api_key: state.api_key, events: batch}
case Transport.request(state.transport,
path: "/ingest-events",
body: body,
headers: [{"x-api-key", state.api_key}]
) do
{:ok, _data} ->
log(state, "flushed #{length(batch)} event(s)")
{:ok, state}
{:error, err} ->
# Re-queue the batch at the front so it is retried on the next flush.
log(state, "flush failed, re-queued #{length(batch)} event(s): #{err.message}")
{{:error, err}, %{state | queue: batch ++ state.queue}}
end
end
defp schedule_timer(state) do
ref = Process.send_after(self(), :flush_tick, state.flush_interval)
%{state | timer_ref: ref}
end
defp cancel_timer(%{timer_ref: nil} = state), do: state
defp cancel_timer(%{timer_ref: ref} = state) do
Process.cancel_timer(ref)
%{state | timer_ref: nil}
end
defp log(%{enable_logging: true}, msg), do: Logger.debug("[BilldogEng:analytics] #{msg}")
defp log(_state, _msg), do: :ok
end