lib/kvasir_kafka/metrics.ex

defmodule Kvasir.Kafka.Metrics.Sender do
  @spec send(metric :: iodata()) :: :ok
  def send(metric)

  def send(metric) do
    require Logger
    Logger.debug(metric)

    :ok
  end
end

defmodule Kvasir.Kafka.Metrics do
  @name :kvasir_kafka_metrics
  @default_port 8125
  @supported_protocols ~W(statsd+udp statsd2+udp statsd statsd2 udp)

  @spec create :: :ok
  def create do
    if url = System.get_env("STATSD_URL") do
      unless Process.whereis(@name) do
        generate(url)
      end
    end

    :ok
  end

  @spec generate(String.t()) :: :ok
  defp generate(url) do
    {:ok, socket, header} = open(url)
    Process.register(socket, @name)

    fqdn =
      :net_adm.localhost()
      |> :net_adm.dns_hostname()
      |> elem(1)
      |> to_string()
      |> String.trim()
      |> String.downcase()

    tags =
      [
        "host:#{fqdn}",
        System.get_env("STATSD_TAGS")
      ]
      |> Enum.reject(&is_nil/1)
      |> Enum.join(",")

    Code.compile_quoted(
      quote do
        defmodule Kvasir.Kafka.Metrics.Sender do
          @spec send(metric :: iodata()) :: :ok
          def send(metric)

          def send(metric) do
            Port.command(unquote(@name), [unquote(header), metric, unquote("," <> tags)])

            :ok
          end
        end
      end,
      "lib/kvasir_kafka/metrics/sender.ex"
    )

    :ok
  end

  @spec open(binary | URI.t()) :: {:ok, port, [byte, ...]}
  def open(url) do
    case URI.parse(url) do
      %URI{scheme: s, host: h, port: p} when s in @supported_protocols ->
        open(h, p || @default_port)

      _ ->
        raise "Invalid metrics url: #{inspect(url)}."
    end
  end

  @spec open(
          host :: binary | charlist | {byte, byte, byte, byte},
          port :: binary | integer
        ) :: {:ok, port, [byte, ...]}
  def open(host, port) do
    true = Code.ensure_loaded?(:gen_udp)

    h = if(is_binary(host), do: String.to_charlist(host), else: host)
    p = if(is_binary(port), do: String.to_integer(port), else: port)

    parent = self()

    spawn_link(fn ->
      {:ok, socket} = :gen_udp.open(0, active: false)
      send(parent, {:socket, socket})
      :timer.sleep(:infinity)
    end)

    socket =
      receive do
        {:socket, s} -> s
      end

    {:ok, socket, build_header(h, p)}
  end

  ### UDP Building ###

  otp_release = :erlang.system_info(:otp_release)
  @addr_family if(otp_release >= '19', do: [1], else: [])

  defp build_header(host, port) do
    {ip1, ip2, ip3, ip4} =
      if is_tuple(host) do
        host
      else
        {:ok, ip} = :inet.getaddr(host, :inet)
        ip
      end

    anc_data_part =
      if function_exported?(:gen_udp, :send, 5) do
        [0, 0, 0, 0]
      else
        []
      end

    @addr_family ++
      [
        :erlang.band(:erlang.bsr(port, 8), 0xFF),
        :erlang.band(port, 0xFF),
        :erlang.band(ip1, 0xFF),
        :erlang.band(ip2, 0xFF),
        :erlang.band(ip3, 0xFF),
        :erlang.band(ip4, 0xFF)
      ] ++ anc_data_part
  end
end