lib/kvasir/event/meta.ex

defmodule Kvasir.Event.Meta do
  @type t :: %__MODULE__{
          source: module,
          topic: String.t(),
          partition: non_neg_integer,
          offset: non_neg_integer,
          key: term,
          sub_key: nil | [atom | {atom, term}],
          timestamp: UTCDateTime.t(),
          meta: map
        }

  defstruct [
    :source,
    :topic,
    :partition,
    :offset,
    :key,
    :key_type,
    :sub_key,
    :timestamp,
    :meta
  ]

  defimpl Inspect do
    def inspect(%{offset: nil}, _opts), do: "#Kvasir.Event.Meta<UnPublished>"
    def inspect(%{offset: offset}, _opts), do: "#Kvasir.Event.Meta<#{offset}>"
  end

  @doc false
  @spec encode(t) :: map
  def encode(meta) do
    meta
    |> Map.from_struct()
    |> Map.delete(:key_type)
    |> Map.update(:timestamp, nil, &ts_encode/1)
    |> MapX.update_if_exists(:sub_key, fn
      nil ->
        nil

      path ->
        Enum.map(path, fn
          {a, b} -> [a, b]
          a -> a
        end)
    end)
    |> Enum.reject(&nil_value/1)
    |> Enum.into(%{})
  end

  @doc false
  @spec decode(map | nil) :: t
  def decode(nil), do: %__MODULE__{}

  def decode(data) when is_map(data) do
    struct!(
      __MODULE__,
      data
      |> Enum.reject(&nil_value/1)
      |> Enum.into(%{}, &atomize/1)
      |> Map.update(:timestamp, nil, &ts_decode/1)
      |> Map.update(:sub_key, nil, fn path ->
        Enum.map(path, fn
          [a, b] -> {String.to_atom(a), b}
          a -> String.to_atom(a)
        end)
      end)
    )
  end

  @doc false
  @spec encode(t, module) :: {:ok, map} | {:error, atom}
  def encode(meta, key) do
    case encode(meta) do
      data = %{key: k} when k != nil ->
        with {:ok, k} <- key.dump(k, []), do: {:ok, %{data | key: k}}

      data ->
        {:ok, data}
    end
  end

  @doc false
  @spec decode(map | nil, module) :: {:ok, t} | {:error, atom}
  def decode(data, key) do
    case decode(data) do
      meta = %{key: nil} -> {:ok, meta}
      meta = %{key: k} -> with {:ok, k} <- key.parse(k, []), do: {:ok, %{meta | key: k}}
    end
  end

  defp ts_decode(nil), do: nil
  defp ts_decode(v), do: UTCDateTime.from_unix!(v, :millisecond)

  defp ts_encode(nil), do: nil
  defp ts_encode(v), do: UTCDateTime.to_unix(v, :millisecond)

  defp atomize(e = {k, _}) when is_atom(k), do: e
  defp atomize({k, v}), do: {String.to_existing_atom(k), v}

  defp nil_value({_, nil}), do: true
  defp nil_value(_), do: false
end