Skip to main content

lib/commanded/event_store/adapters/eventsourcingdb/event_mapper.ex

defmodule Commanded.EventStore.Adapters.EventSourcingDB.EventMapper do
  @moduledoc false

  alias Commanded.EventStore.RecordedEvent
  alias Commanded.EventStore.TypeProvider
  alias Commanded.EventStore.Adapters.EventSourcingDB.StreamMapper
  alias EventSourcingDB.Event

  @commanded_metadata_key "__commanded_metadata__"

  def to_recorded_event(
        %Event{} = event,
        stream_version \\ 0,
        stream_prefix \\ "",
        event_number \\ nil
      ) do
    {data, correlation_id, causation_id, metadata} =
      extract_commanded_metadata(event.data, event.type)

    %RecordedEvent{
      # event_id is a unique identifier for the event. See generate_event_id()
      # DO NOT CHANGE this field
      event_id: generate_event_id(event),
      # event_number is either a global or subscription based counter
      # DO NOT CHANGE this field
      event_number: get_event_number(event, event_number),
      # Position of event within ONE specific stream
      stream_version: stream_version,
      # The part of the stream_id relevant for commanded
      # DO NOT CHANGE this field
      stream_id: StreamMapper.get_stream_id(event.subject, stream_prefix),
      event_type: event.type,
      data: data,
      correlation_id: correlation_id,
      causation_id: causation_id,
      metadata: metadata,
      created_at: to_date_time(event.time)
    }
  end

  defp get_event_number(%Event{} = _, event_number) when is_integer(event_number),
    do: event_number

  defp get_event_number(%Event{} = event, _), do: to_global_event_number(event)

  @spec to_global_event_number(Event.t()) :: non_neg_integer()
  def to_global_event_number(%Event{} = event), do: String.to_integer(event.id) + 1

  def serialize_event_data(data, correlation_id, causation_id, metadata) do
    commanded_meta = %{
      "correlation_id" => correlation_id,
      "causation_id" => causation_id,
      "metadata" => metadata || %{}
    }

    serialized_data = serialize_data(data)

    Map.put(serialized_data, @commanded_metadata_key, commanded_meta)
  end

  defp extract_commanded_metadata(data, type) do
    {commanded_meta, clean_data} = Map.pop(data, @commanded_metadata_key)

    {correlation_id, causation_id, metadata} =
      if commanded_meta do
        {
          Map.get(commanded_meta, "correlation_id"),
          Map.get(commanded_meta, "causation_id"),
          Map.get(commanded_meta, "metadata", %{})
        }
      else
        {nil, nil, %{}}
      end

    deserialized = deserialize_data(clean_data, type)

    {deserialized, correlation_id, causation_id, metadata}
  end

  defp deserialize_data(data, type) do
    struct_template = TypeProvider.to_struct(type)

    case struct_template do
      %module{} ->
        atom_keyed_data = to_atom_keys(data)
        struct(module, atom_keyed_data)

      _ ->
        data
    end
  rescue
    _ -> data
  end

  defp to_atom_keys(map) when is_map(map) do
    Map.new(map, fn {key, val} ->
      atom_key =
        case is_binary(key) do
          true -> String.to_existing_atom(key)
          false -> key
        end

      {atom_key, val}
    end)
  end

  defp to_atom_keys(other), do: other

  defp serialize_data(%_{} = struct), do: Map.from_struct(struct)
  defp serialize_data(data), do: data

  defp to_date_time(iso8601_datestring) do
    {:ok, datetime, _offset} = DateTime.from_iso8601(iso8601_datestring)
    datetime
  end

  # DO NOT CHANGE this function (both variants of it)

  # Generate the event ID
  #
  # ESDB/CloudEvents claim that source + id is a unique event id:
  # https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#source-1
  # We use that concatenation for a unique event id
  #
  # However, the commanded tests explicitely check for a UUID. Likely the tests
  # were written with no other option in mind.
  # So for tests, we hack that in.
  @compile if: Mix.env() == :test
  defp generate_event_id(%Event{} = _event), do: Commanded.UUID.uuid4()

  @compile if: Mix.env() != :test
  defp generate_event_id(%Event{} = event), do: "#{event.source}/#{event.id}"
end