lib/chronicle/client.ex

# Copyright (c) Cratis. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.

defmodule Chronicle.Client do
  @moduledoc """
  Supervisor that manages a Chronicle connection and all registered observers.

  `Chronicle.Client` is the main entry point for the Chronicle Elixir client.
  Start it in your application's supervision tree, providing a connection
  string and the list of event types, reactors, reducers, and read models to
  register.

  ## Usage

  In your `Application.start/2`:

      def start(_type, _args) do
        children = [
          {Chronicle.Client,
            connection_string: "chronicle://localhost:35000?disableTls=true",
            event_store: "my-store",
            event_types: [
              MyApp.Events.AccountOpened,
              MyApp.Events.FundsDeposited
            ],
            reactors: [MyApp.Reactors.NotificationReactor],
            reducers: [MyApp.Reducers.AccountReducer],
            read_models: [MyApp.ReadModels.Account]}
        ]

        Supervisor.start_link(children, strategy: :one_for_one)
      end

  Read models that contain `from/2`, `join/2`, or `removed_with/2` declarations
  are automatically registered as server-side projections.

  ## Multiple clients

  You can start multiple clients with different names:

      {Chronicle.Client,
        name: :bank_chronicle,
        connection_string: "chronicle://bank-server:35000",
        event_store: "bank"}

      {Chronicle.Client,
        name: :crm_chronicle,
        connection_string: "chronicle://crm-server:35000",
        event_store: "crm"}

  ## Options

    * `:name` — registered name for this client. Defaults to `Chronicle.Client`.
    * `:connection_string` — a connection string binary or
      `Chronicle.Connections.ConnectionString` struct. Defaults to
      `ConnectionString.default/0` (localhost:35000).
    * `:event_store` — the event store name. Defaults to `"default"`.
    * `:namespace` — the namespace within the event store. Defaults to `"default"`.
    * `:event_types` — list of event type modules to register with the event store.
    * `:reactors` — list of reactor modules to start (each `use Chronicle.Reactor`).
    * `:reducers` — list of reducer modules to start (each `use Chronicle.Reducer`).
    * `:read_models` — list of read model modules (each `use Chronicle.ReadModel`).
      Modules that contain `from/2` declarations are registered as projections.

  ## Convenience functions

  Once started, use `Chronicle.append/3` and `Chronicle.read_model/3` for the
  most common operations, or use the subsystem modules directly:

    * `Chronicle.EventLog` — append and query events
    * `Chronicle.ReadModels` — query read model instances
    * `Chronicle.EventTypes` — manage event type registrations
  """

  use Supervisor

  alias Chronicle.Connections.Connection

  @doc """
  Starts a Chronicle client supervisor linked to the current process.
  """
  @spec start_link(keyword()) :: Supervisor.on_start()
  def start_link(opts \\ []) do
    name = Keyword.get(opts, :name, __MODULE__)
    Supervisor.start_link(__MODULE__, opts, name: name)
  end

  @doc """
  Returns the stored configuration for the given client name.

  Used internally by `Chronicle.EventLog`, `Chronicle.ReadModels`, etc.
  """
  @spec config(atom() | pid()) :: map()
  def config(client \\ __MODULE__) do
    :persistent_term.get({__MODULE__, client})
  end

  @impl true
  def init(opts) do
    name = Keyword.get(opts, :name, __MODULE__)
    conn_name = connection_name(name)

    event_store = Keyword.get(opts, :event_store, "default")
    namespace = Keyword.get(opts, :namespace, "Default")
    event_types = Keyword.get(opts, :event_types, [])
    reactors = Keyword.get(opts, :reactors, [])
    reducers = Keyword.get(opts, :reducers, [])
    read_models = Keyword.get(opts, :read_models, [])

    :persistent_term.put({__MODULE__, name}, %{
      connection: conn_name,
      event_store: event_store,
      namespace: namespace
    })

    connection_opts =
      opts
      |> Keyword.take([:connection_string, :server_address, :grpc_options, :retry_attempts,
                       :reconnect_base_delay, :reconnect_max_delay])
      |> Keyword.put(:name, conn_name)

    session_name = :"#{name}.Session"

    observer_opts = [
      connection: conn_name,
      session: session_name,
      event_store: event_store,
      namespace: namespace
    ]

    reactor_children =
      Enum.map(reactors, fn module ->
        {Chronicle.Reactors.Handler, Keyword.put(observer_opts, :module, module)}
      end)

    reducer_children =
      Enum.map(reducers, fn module ->
        {Chronicle.Reducers.Handler, Keyword.put(observer_opts, :module, module)}
      end)

    children =
      [GRPC.Client.Supervisor,
       {Connection, connection_opts},
       {Chronicle.Session, connection: conn_name, client_name: name}] ++
        [{Chronicle.Projections.Registrar,
          Keyword.merge(observer_opts, event_types: event_types, read_models: read_models, reducers: reducers)}] ++
        reactor_children ++
        reducer_children

    Supervisor.init(children, strategy: :one_for_one)
  end

  @doc false
  def connection_name(client_name), do: :"#{client_name}.Connection"
end