lib/mixpanel/client.ex

defmodule Mixpanel.Client do
  use GenServer

  @moduledoc """
  Mixpanel API Client GenServer.
  """

  require Logger

  alias Mixpanel.Client.State
  alias Mixpanel.HTTP

  @type init_args :: [
          Mixpanel.Config.option() | GenServer.option() | {Keyword.key(), Keyword.value()},
          ...
        ]

  @type event :: String.t() | map
  @type properties :: map
  @type alias_id :: String.t()
  @type distinct_id :: String.t()

  @track_endpoint "/track"
  @engage_endpoint "/engage"
  @alias_endpoint "/track#identity-create-alias"
  @epoch :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}})

  @spec start_link(init_args) :: :ignore | {:error, any} | {:ok, pid}
  def start_link(init_args) do
    {gen_server_opts, opts} =
      Keyword.split(init_args, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])

    opts =
      opts
      |> Keyword.take([:project_token, :base_url, :http_adapter])
      |> Keyword.put(:name, gen_server_opts[:name])

    GenServer.start_link(__MODULE__, opts, gen_server_opts)
  end

  @spec child_spec(init_args) :: %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [init_args, ...]}
        }
  def child_spec(init_args) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [init_args]}
    }
  end

  @doc """
  Tracks a event

  See `Mixpanel.track/3`
  """
  @doc export: true
  @spec track(module, event, properties, Mixpanel.track_options()) :: :ok
  def track(server, event, properties, opts) do
    opts = validate_options(opts, [:distinct_id, :ip, :time], :opts)

    properties =
      properties
      |> Map.drop([:distinct_id, :ip, :time])
      |> maybe_put(:time, to_timestamp(Keyword.get(opts, :time)))
      |> maybe_put(:distinct_id, Keyword.get(opts, :distinct_id))
      |> maybe_put(:ip, convert_ip(Keyword.get(opts, :ip)))

    GenServer.cast(server, {:track, event, properties})
  end

  @doc """
  Updates a user profile.

  See `Mixpanel.engage/4`.
  """
  @doc export: true
  @spec engage(module, [{distinct_id, String.t(), map}], Mixpanel.engage_options()) ::
          :ok
  def engage(server, [{_, _, _} | _] = batch, opts) do
    opts = validate_options(opts, [:ip, :time, :ignore_time], :opts)
    GenServer.cast(server, {:engage, Enum.map(batch, &build_engage_event(&1, opts))})
  end

  @doc export: true
  @spec engage(module, distinct_id, String.t(), map, Mixpanel.engage_options()) :: :ok
  def engage(server, distinct_id, operation, value, opts) do
    opts = validate_options(opts, [:ip, :time, :ignore_time], :opts)
    GenServer.cast(server, {:engage, build_engage_event({distinct_id, operation, value}, opts)})
  end

  @doc """
  Creates an alias for a user profile.

  See `Mixpanel.create_alias/2`.
  """
  @doc export: true
  @spec create_alias(module, alias_id, distinct_id) :: :ok
  def create_alias(server, alias_id, distinct_id) do
    GenServer.cast(server, {:create_alias, alias_id, distinct_id})
  end

  @impl GenServer
  @spec init([Mixpanel.Config.option(), ...]) :: {:ok, State.t()}
  def init(opts) do
    Process.flag(:trap_exit, true)
    state = State.new(opts)

    client_span =
      Mixpanel.Telemetry.start_span(:client, %{}, %{
        name: state.name,
        base_url: state.base_url,
        http_adapter: state.http_adapter
      })

    {:ok, State.attach_span(state, client_span)}
  end

  @spec handle_cast(
          {:track, event, properties}
          | {:engage, event}
          | {:create_alias, alias_id, distinct_id},
          State.t()
        ) :: {:noreply, State.t()}

  @impl GenServer
  def handle_cast({:track, event, properties}, state) do
    data =
      encode_params(%{
        event: event,
        properties: Map.put_new(properties, :token, state.project_token)
      })

    case HTTP.get(state.http_adapter, state.base_url <> @track_endpoint, [], params: [data: data]) do
      {:ok, _, _, _} ->
        Mixpanel.Telemetry.untimed_span_event(
          state.span,
          :send,
          %{
            event: event
            # payload_size: byte_size(payload)
          },
          %{name: state.name}
        )

        :ok

      {:error, reason} ->
        Mixpanel.Telemetry.span_event(
          state.span,
          :send_error,
          %{
            event: event,
            error: reason
            # payload_size: byte_size(payload)
          },
          %{name: state.name}
        )

        Logger.warning(%{message: "Problem tracking event", event: event, properties: properties})
    end

    {:noreply, state}
  end

  @impl GenServer
  def handle_cast({:engage, event}, state) do
    data =
      event
      |> put_token(state.project_token)
      |> encode_params()

    case HTTP.get(state.http_adapter, state.base_url <> @engage_endpoint, [],
           params: [data: data]
         ) do
      {:ok, _, _, _} ->
        :ok

      {:error, _reason} ->
        Logger.warning(%{message: "Problem tracking profile update", event: event})
    end

    {:noreply, state}
  end

  @impl GenServer
  def handle_cast({:create_alias, alias, distinct_id}, state) do
    data =
      %{
        event: "$create_alias",
        properties: %{
          token: state.project_token,
          alias: alias,
          distinct_id: distinct_id
        }
      }
      |> encode_params()

    case HTTP.post(
           state.http_adapter,
           state.base_url <> @alias_endpoint,
           "data=#{data}",
           [
             {"Content-Type", "application/x-www-form-urlencoded"}
           ],
           []
         ) do
      {:ok, _, _, _} ->
        :ok

      {:error, _} ->
        Logger.warning(%{
          message: "Problem creating profile alias",
          alias: alias,
          distinct_id: distinct_id
        })
    end

    {:noreply, state}
  end

  @impl GenServer
  @spec terminate(reason, State.t()) :: :ok
        when reason: :normal | :shutdown | {:shutdown, term} | term
  def terminate(_reason, state),
    do: Mixpanel.Telemetry.stop_span(state.span, %{}, %{name: state.name})

  defp put_token(events, project_token) when is_list(events),
    do: Enum.map(events, &put_token(&1, project_token))

  defp put_token(event, project_token),
    do: Map.put(event, :"$token", project_token)

  defp encode_params(params),
    do: Jason.encode!(params) |> :base64.encode()

  defp build_engage_event({distinct_id, operation, value}, opts) do
    %{"$distinct_id": distinct_id}
    |> Map.put(operation, value)
    |> maybe_put(:"$ip", convert_ip(Keyword.get(opts, :ip)))
    |> maybe_put(:"$time", to_timestamp(Keyword.get(opts, :time)))
    |> maybe_put(:"$ignore_time", Keyword.get(opts, :ignore_time, nil) == true)
  end

  @spec to_timestamp(
          nil
          | DateTime.t()
          | NaiveDateTime.t()
          | :erlang.timestamp()
          | :calendar.datetime()
          | pos_integer
        ) :: nil | integer
  defp to_timestamp(nil), do: nil

  defp to_timestamp(secs) when is_integer(secs),
    do: secs

  defp to_timestamp(%NaiveDateTime{} = dt),
    do: dt |> DateTime.from_naive!("Etc/UTC") |> DateTime.to_unix()

  defp to_timestamp(%DateTime{} = dt),
    do: DateTime.to_unix(dt)

  defp to_timestamp({{_y, _mon, _d}, {_h, _m, _s}} = dt),
    do:
      dt
      |> :calendar.datetime_to_gregorian_seconds()
      |> Kernel.-(@epoch)

  defp to_timestamp({mega_secs, secs, _ms}),
    do: trunc(mega_secs * 1_000_000 + secs)

  @spec convert_ip(nil | {1..255, 1..255, 1..255, 1..255} | String.t()) :: nil | String.t()
  defp convert_ip({a, b, c, d}), do: "#{a}.#{b}.#{c}.#{d}"
  defp convert_ip(ip) when is_binary(ip), do: ip
  defp convert_ip(nil), do: nil

  @dialyzer {:nowarn_function, maybe_put: 3}

  @spec maybe_put(map, any, any) :: map
  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)

  @dialyzer {:nowarn_function, validate_options: 3}

  @spec validate_options(Keyword.t(), [atom(), ...], String.t() | atom()) ::
          Keyword.t() | no_return()
  defp validate_options(options, valid_values, name) do
    case Keyword.split(options, valid_values) do
      {options, []} ->
        options

      {_, illegal_options} ->
        raise "Unsupported keys(s) in #{name}: #{inspect(Keyword.keys(illegal_options))}"
    end
  end
end