lib/chronicle/projections/registrar.ex

# Copyright (c) Cratis. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.

defmodule Chronicle.Projections.Registrar do
  @moduledoc false

  # GenServer that ensures the event store and namespace exist, then registers
  # event types and model-bound projections (defined via `use Chronicle.ReadModel`
  # with `from/2`, `join/2`, etc.) with Chronicle on startup.

  use GenServer, restart: :permanent

  require Logger

  alias Chronicle.Connections.Connection
  alias Chronicle.EventTypes

  alias Cratis.Chronicle.Contracts.{EventStores, Namespaces, EnsureEventStore, EnsureNamespace}

  alias Cratis.Chronicle.Contracts.Projections.{
    Projections,
    RegisterRequest,
    ProjectionDefinition,
    FromDefinition,
    JoinDefinition,
    RemovedWithDefinition,
    FromEveryDefinition,
    KeyValuePair_EventType_FromDefinition,
    KeyValuePair_EventType_JoinDefinition,
    KeyValuePair_EventType_RemovedWithDefinition
  }

  alias Cratis.Chronicle.Contracts.Projections.EventType, as: ProtoEventType

  alias Cratis.Chronicle.Contracts.ReadModels.{
    ReadModels,
    RegisterManyRequest,
    ReadModelDefinition,
    ReadModelType,
    SinkDefinition
  }

  alias Bcl.Guid, as: BclGuid

  # MongoDB sink type ID: "22202c41-2be1-4547-9c00-f0b1f797fd75"
  # Computed from .NET Guid.ToByteArray() split into lo/hi fixed64 little-endian
  @mongodb_sink_type_id %BclGuid{lo: 0x45472BE122202C41, hi: 0x75FD97F7B1F0009C}

  @retry_delay 5_000

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl true
  def init(opts) do
    state = %{
      connection: Keyword.fetch!(opts, :connection),
      event_store: Keyword.fetch!(opts, :event_store),
      namespace: Keyword.get(opts, :namespace, "Default"),
      event_types: Keyword.get(opts, :event_types, []),
      read_models: Keyword.get(opts, :read_models, []),
      reducers: Keyword.get(opts, :reducers, [])
    }

    send(self(), :register)
    {:ok, state}
  end

  @impl true
  def handle_info(:register, state) do
    case Connection.channel(state.connection) do
      {:ok, channel} ->
        case register_all(channel, state) do
          :ok ->
            {:noreply, state}

          {:error, reason} ->
            Logger.warning("Chronicle registration failed: #{inspect(reason)}, retrying in #{@retry_delay}ms")
            Process.send_after(self(), :register, @retry_delay)
            {:noreply, state}
        end

      {:error, _} ->
        Process.send_after(self(), :register, @retry_delay)
        {:noreply, state}
    end
  end

  def handle_info(_msg, state), do: {:noreply, state}

  defp register_all(channel, state) do
    # Collect event types from projection read models
    read_model_event_types =
      state.read_models
      |> Enum.flat_map(fn rm ->
        (rm.__chronicle_read_model__(:from) |> Enum.map(&elem(&1, 0))) ++
          (rm.__chronicle_read_model__(:join) |> Enum.map(&elem(&1, 0))) ++
          (rm.__chronicle_read_model__(:removed_with) |> Enum.map(&elem(&1, 0)))
      end)

    # Also collect event types from reducers
    reducer_event_types =
      state.reducers
      |> Enum.flat_map(fn r -> r.__chronicle_reducer__(:handles) end)

    all_event_types = Enum.uniq(state.event_types ++ read_model_event_types ++ reducer_event_types)

    with :ok <- ensure_event_store(channel, state.event_store),
         :ok <- ensure_namespace(channel, state.event_store, state.namespace),
         :ok <- EventTypes.register(channel, state.event_store, all_event_types),
         :ok <- register_read_models(channel, state),
         :ok <- register_projections(channel, state) do
      :ok
    end
  end

  defp ensure_event_store(channel, event_store) do
    case EventStores.Stub.ensure(channel, %EnsureEventStore{Name: event_store}) do
      {:ok, _} -> :ok
      {:error, reason} -> {:error, {:ensure_event_store, reason}}
    end
  end

  defp ensure_namespace(channel, event_store, namespace) do
    case Namespaces.Stub.ensure(channel, %EnsureNamespace{EventStore: event_store, Name: namespace}) do
      {:ok, _} -> :ok
      {:error, reason} -> {:error, {:ensure_namespace, reason}}
    end
  end

  defp register_read_models(channel, state) do
    projection_definitions =
      state.read_models
      |> Enum.filter(& &1.__chronicle_read_model__(:has_projection?))
      |> Enum.map(fn rm ->
        model_id = rm.__chronicle_read_model__(:id)

        %ReadModelDefinition{
          Type: %ReadModelType{Identifier: model_id, Generation: 1},
          ContainerName: model_id,
          DisplayName: model_id,
          Sink: %SinkDefinition{ConfigurationId: %BclGuid{}, TypeId: @mongodb_sink_type_id},
          Schema: generate_read_model_schema(rm),
          ObserverType: 2,
          ObserverIdentifier: model_id,
          Owner: 2,
          Source: 1
        }
      end)

    reducer_definitions =
      state.reducers
      |> Enum.map(fn reducer_module ->
        model_module = reducer_module.__chronicle_reducer__(:model)
        model_id = model_module.__chronicle_read_model__(:id)
        reducer_id = reducer_module.__chronicle_reducer__(:id)

        %ReadModelDefinition{
          Type: %ReadModelType{Identifier: model_id, Generation: 1},
          ContainerName: model_id,
          DisplayName: model_id,
          Sink: %SinkDefinition{ConfigurationId: %BclGuid{}, TypeId: @mongodb_sink_type_id},
          Schema: generate_read_model_schema(model_module),
          ObserverType: 1,
          ObserverIdentifier: reducer_id,
          Owner: 2,
          Source: 1
        }
      end)

    all_definitions = projection_definitions ++ reducer_definitions

    if Enum.empty?(all_definitions) do
      :ok
    else
      request = %RegisterManyRequest{
        EventStore: state.event_store,
        Owner: 2,
        ReadModels: all_definitions,
        Source: 1
      }

      case ReadModels.Stub.register_many(channel, request) do
        {:ok, _} -> :ok
        {:error, reason} -> {:error, {:register_read_models, reason}}
      end
    end
  end

  # Builds the initial model state JSON from struct defaults.
  # Numeric/boolean defaults (like balance: 0) must be non-null in the JSON so that
  # Chronicle's PerformAdd/PerformSubtract receive 0.0 instead of null — Convert.ChangeType
  # throws for null double (a non-nullable CLR value type).
  defp initial_model_state(module) do
    if function_exported?(module, :__struct__, 0) do
      defaults =
        module.__struct__()
        |> Map.to_list()
        |> Enum.reject(fn {k, _} -> k == :__struct__ end)
        |> Map.new(fn {k, v} -> {Atom.to_string(k), v} end)

      Jason.encode!(defaults)
    else
      "{}"
    end
  end

  defp generate_read_model_schema(module) do
    fields =
      if function_exported?(module, :__struct__, 0) do
        module.__struct__()
        |> Map.to_list()
        |> Enum.reject(fn {k, _} -> k == :__struct__ end)
        |> Enum.map(fn {key, default_val} ->
          {Atom.to_string(key), read_model_property_schema(default_val)}
        end)
        |> Map.new()
      else
        %{}
      end

    Jason.encode!(%{"type" => "object", "properties" => fields})
  end

  defp read_model_property_schema(v) when is_integer(v), do: %{"type" => "number"}
  defp read_model_property_schema(v) when is_float(v), do: %{"type" => "number"}
  defp read_model_property_schema(v) when is_boolean(v), do: %{"type" => "boolean"}
  defp read_model_property_schema(_), do: %{"type" => "string"}

  defp register_projections(_channel, %{read_models: []}), do: :ok

  defp register_projections(channel, state) do
    projection_read_models = Enum.filter(state.read_models, & &1.__chronicle_read_model__(:has_projection?))

    if Enum.empty?(projection_read_models) do
      :ok
    else
      definitions = Enum.map(projection_read_models, &build_projection_definition(&1))

      request = %RegisterRequest{
        EventStore: state.event_store,
        Owner: 1,
        Projections: definitions
      }

      case Projections.Stub.register(channel, request) do
        {:ok, _} ->
          Logger.info("Registered #{length(definitions)} projection(s) with Chronicle")
          :ok

        {:error, reason} ->
          {:error, {:register_projections, reason}}
      end
    end
  end

  defp build_projection_definition(read_model_module) do
    identifier = read_model_module.__chronicle_read_model__(:id)
    model_name = identifier

    from_entries =
      read_model_module.__chronicle_read_model__(:from)
      |> Enum.map(fn {event_module, opts} ->
        properties = build_properties(opts)
        key = Keyword.get(opts, :key, "$eventSourceId")
        parent_key = Keyword.get(opts, :parent_key, "")

        %KeyValuePair_EventType_FromDefinition{
          Key: proto_event_type(event_module),
          Value: %FromDefinition{
            Key: key,
            Properties: properties,
            ParentKey: parent_key
          }
        }
      end)

    join_entries =
      read_model_module.__chronicle_read_model__(:join)
      |> Enum.map(fn {event_module, opts} ->
        properties = build_properties(opts)
        key = Keyword.get(opts, :key, "$eventSourceId")
        on = Keyword.fetch!(opts, :on)

        %KeyValuePair_EventType_JoinDefinition{
          Key: proto_event_type(event_module),
          Value: %JoinDefinition{
            On: on,
            Key: key,
            Properties: properties
          }
        }
      end)

    removed_with_entries =
      read_model_module.__chronicle_read_model__(:removed_with)
      |> Enum.map(fn {event_module, opts} ->
        key = Keyword.get(opts, :key, "$eventSourceId")
        parent_key = Keyword.get(opts, :parent_key, "")

        %KeyValuePair_EventType_RemovedWithDefinition{
          Key: proto_event_type(event_module),
          Value: %RemovedWithDefinition{Key: key, ParentKey: parent_key}
        }
      end)

    from_every =
      case read_model_module.__chronicle_read_model__(:from_every) do
        [] ->
          nil

        [opts | _] ->
          %FromEveryDefinition{
            Properties: build_properties(opts),
            IncludeChildren: Keyword.get(opts, :include_children, false)
          }
      end

    %ProjectionDefinition{
      Identifier: identifier,
      ReadModel: model_name,
      EventSequenceId: "event-log",
      IsActive: true,
      IsRewindable: true,
      From: from_entries,
      Join: join_entries,
      RemovedWith: removed_with_entries,
      All: from_every,
      InitialModelState: initial_model_state(read_model_module)
    }
  end

  # Converts set/add/subtract/count opts into a Chronicle properties map.
  # The keys are read model field names (strings), the values are
  # Chronicle property expressions.
  defp build_properties(opts) do
    set_props =
      opts
      |> Keyword.get(:set, [])
      |> Enum.map(fn {field, expr} ->
        {to_string(field), resolve_expression(expr)}
      end)

    add_props =
      opts
      |> Keyword.get(:add, [])
      |> Enum.map(fn {field, expr} ->
        {to_string(field), "$add(#{resolve_expression(expr)})"}
      end)

    subtract_props =
      opts
      |> Keyword.get(:subtract, [])
      |> Enum.map(fn {field, expr} ->
        {to_string(field), "$subtract(#{resolve_expression(expr)})"}
      end)

    count_fields =
      case Keyword.get(opts, :count) do
        nil -> []
        field when is_atom(field) -> [{to_string(field), "$count"}]
        fields when is_list(fields) -> Enum.map(fields, fn f -> {to_string(f), "$count"} end)
      end

    (set_props ++ add_props ++ subtract_props ++ count_fields) |> Map.new()
  end

  defp resolve_expression(atom) when is_atom(atom) do
    case atom do
      :event_source_id -> "$eventSourceId"
      :occurred -> "$occurred"
      _ -> "$#{atom}"
    end
  end

  defp resolve_expression(int) when is_integer(int), do: "$value(#{int})"
  defp resolve_expression(str) when is_binary(str), do: str

  defp proto_event_type(event_module) do
    %ProtoEventType{
      Id: event_module.__chronicle_event_type__(:id),
      Generation: event_module.__chronicle_event_type__(:generation)
    }
  end
end