lib/kvasir/event/encoding/topic.ex

defmodule Kvasir.Event.Encoding.Topic do
  @spec generate(Kvasir.Topic.t(), runtime? :: boolean, term) :: term
  def generate(topic, runtime?, extra \\ nil) do
    {_mod, code} = generate_module(topic, runtime?, extra: extra, overwrite: true, events: :all)
    code
  end

  @spec create(Kvasir.Topic.t(), runtime? :: boolean, Keyword.t()) :: module
  def create(topic, runtime?, opts \\ []) do
    {mod, code} = generate_module(topic, runtime?, opts)

    if code do
      Code.compiler_options(ignore_module_conflict: true)
      Code.compile_quoted(code)
      Code.compiler_options(ignore_module_conflict: false)
    end

    mod
  end

  @spec generate_module(Kvasir.Topic.t(), runtime? :: boolean, Keyword.t()) :: {module, term}
  defp generate_module(topic, runtime?, opts) do
    {events, mod} =
      case opts[:only] do
        all when all in [nil, :all] ->
          {topic.events, topic.module}

        e ->
          es = if(is_list(e), do: e, else: [e])
          types = es |> Enum.map(& &1.__event__(:type)) |> Enum.sort()
          hash = :md5 |> :crypto.hash(types) |> Base.encode16()

          {es, Module.concat(topic.module, "F" <> hash)}
      end

    if not Keyword.get(opts, :overwrite, false) and Code.ensure_loaded?(mod) do
      {mod, nil}
    else
      key = topic.key
      decoder = Kvasir.Event.Encoding
      {bin_decode, event} = gen_event_matchers(key, events)

      {encrypter, encrypter_opts} = topic.encryption_opts
      {compressor, compressor_opts} = topic.compression_opts

      {encryption, decryption} =
        case topic.encryption do
          false ->
            {quote do
               defp encrypt(_, data), do: {:ok, data}
             end,
             quote do
               defp decrypt(_, data), do: {:ok, data}
             end}

          :always ->
            {quote do
               defp encrypt(_, data),
                 do: unquote(encrypter).encrypt(data, unquote(runtime?), unquote(encrypter_opts))
             end,
             quote do
               defp decrypt(_, data),
                 do: unquote(encrypter).decrypt(data, unquote(runtime?), unquote(encrypter_opts))
             end}

          :sensitive_only ->
            # For now always
            Enum.reduce(
              topic.events,
              {quote do
                 defp encrypt(_, data), do: {:ok, data}
               end,
               quote do
                 defp decrypt(_, data), do: {:ok, data}
               end},
              fn e, acc = {a, b} ->
                if e.__event__(:sensitive) != [] do
                  {quote do
                     defp encrypt(unquote(e), data),
                       do:
                         unquote(encrypter).encrypt(
                           data,
                           unquote(runtime?),
                           unquote(encrypter_opts)
                         )

                     unquote(a)
                   end,
                   quote do
                     defp decrypt(unquote(e), data),
                       do:
                         unquote(encrypter).decrypt(
                           data,
                           unquote(runtime?),
                           unquote(encrypter_opts)
                         )

                     unquote(b)
                   end}
                else
                  acc
                end
              end
            )
        end

      {compression, decompression} =
        case topic.compression do
          false ->
            {quote do
               defp compress(_, data), do: {:ok, data}
             end,
             quote do
               defp decompress(_, data), do: {:ok, data}
             end}

          :always ->
            {quote do
               defp compress(_, data),
                 do: unquote(compressor).compress(data, unquote(compressor_opts))
             end,
             quote do
               defp decompress(_, data),
                 do: unquote(compressor).decompress(data, unquote(compressor_opts))
             end}

          :event ->
            # For now always
            Enum.reduce(
              topic.events,
              {quote do
                 defp compress(_, data), do: {:ok, data}
               end,
               quote do
                 defp decompress(_, data), do: {:ok, data}
               end},
              fn e, {a, b} ->
                if e.__event__(:compress) do
                  {quote do
                     defp compress(unquote(e), data),
                       do: unquote(compressor).compress(data, unquote(compressor_opts))

                     unquote(a)
                   end,
                   quote do
                     defp decompress(unquote(e), data),
                       do: unquote(compressor).decompress(data, unquote(compressor_opts))

                     unquote(b)
                   end}
                else
                  {a, b}
                end
              end
            )
        end

      {mod,
       quote do
         defmodule unquote(mod) do
           @moduledoc """
           Event encoding and decoding for the `"#{unquote(topic.topic)}"` topic.
           """
           import unquote(decoder), only: [encode: 3, binary_decode: 4, decode: 4]

           require unquote(compressor)
           require unquote(encrypter)
           require Logger

           ### Encoders ###

           @doc ~S"""
           Encode an event to JSON.

           ## Examples

           ```elixir
           iex> encode(⊰Event{...}⊱)
           {:ok, "{...}"}
           ```
           """
           @spec encode(Kvasir.Event.t()) :: {:ok, binary} | {:error, term}
           def encode(event)

           def encode(event = %t{}) do
             with {:ok, data} <- encode(unquote(Macro.escape(topic)), event, []),
                  {:ok, encoded} <- Jason.encode(data),
                  {:ok, compressed} <- encrypt(t, encoded),
                  do: encrypt(t, compressed)
           end

           @doc ~S"""
           Encode an event to binary.

           ## Examples

           ```elixir
           iex> bin_encode(⊰Event{...}⊱)
           {:ok, <<...>>}
           ```
           """
           @spec bin_encode(Kvasir.Event.t()) :: {:ok, binary} | {:error, term}
           def bin_encode(event)

           def bin_encode(event = %t{__meta__: meta}) do
             with {:ok, m} <- Kvasir.Event.Meta.encode(meta, unquote(key)),
                  {:ok, p} <- Kvasir.Type.Serializer.encode(t.__event__(:fields), event),
                  unpacked <- if(m == %{}, do: p, else: [m, p]),
                  {:ok, packed} <- Msgpax.pack(unpacked, iodata: false),
                  {:ok, compressed} <- compress(t, packed),
                  {:ok, data} <- encrypt(t, compressed) do
               type = t.__event__(:type)
               %{major: major, minor: minor, patch: patch} = t.__event__(:version)

               {:ok,
                <<type::binary, 0, major::size(16), minor::size(16), patch::size(16),
                  data::binary>>}
             end
           end

           defp compress(event, data)
           unquote(compression)

           defp encrypt(event, data)
           unquote(encryption)

           ### Decoders ###

           @doc ~S"""
           Decode a JSON encoded event.

           ## Examples

           ```elixir
           iex> decode("{...}")
           {:ok, ⊰Event{...}⊱}
           ```
           """
           @spec decode(binary) :: {:ok, Kvasir.Event.t()} | {:error, term}
           def decode(event) do
             with {:ok, unpacked} <- Jason.decode(event),
                  {:ok, e} <- event(MapX.get(unpacked, :type)) do
               decode(e, unquote(key), unpacked, [])
             end
           end

           @doc ~S"""
           Decode a binary encoded event.

           ## Examples

           ```elixir
           iex> bin_decode(<<...>>)
           {:ok, ⊰Event{...}⊱}
           ```
           """
           @spec bin_decode(binary) :: {:ok, Kvasir.Event.t()} | {:error, term}
           def bin_decode(event)
           unquote(bin_decode)
           def bin_decode(_), do: {:error, :unknown_event_type}

           defp event(type)
           unquote(event)
           defp event(_), do: {:error, :unknown_event_type}

           defp bin_decode(event, key, version, data) do
             with err = {:error, %Msgpax.UnpackError{}} <-
                    do_bin_decode(event, key, version, data) do
               Logger.error(
                 fn -> "Kvasir: Invalid Event Payload For: #{inspect(event)}" end,
                 event: event,
                 key: key,
                 data: data
               )

               {:error, :unknown_event_type}
             end
           end

           defp do_bin_decode(event, key, version, data) do
             with {:ok, decrypted} <- decrypt(event, data),
                  {:ok, decompressed} <- decompress(event, decrypted) do
               binary_decode(event, key, version, decompressed)
             end
           end

           defp decompress(event, data)
           unquote(decompression)

           defp decrypt(event, data)
           unquote(decryption)

           unquote(opts[:extra])
         end
       end}
    end
  end

  defp gen_event_matchers(key, events) do
    {Enum.reduce(events, nil, fn e, acc ->
       type = e.__event__(:type)
       event = e.__event__(:replaced_by) || e

       quote do
         unquote(acc)

         def bin_decode(
               <<unquote(type), 0, major::size(16), minor::size(16), patch::size(16),
                 data::binary>>
             ),
             do:
               bin_decode(
                 unquote(event),
                 unquote(key),
                 %Version{major: major, minor: minor, patch: patch},
                 data
               )
       end
     end),
     Enum.reduce(events, nil, fn e, acc ->
       type = e.__event__(:type)
       event = e.__event__(:replaced_by) || e

       quote do
         unquote(acc)

         defp event(unquote(type)), do: {:ok, unquote(event)}
       end
     end)}
  end
end