lib/application.ex

defmodule Commanded.Application do
  use TelemetryRegistry

  telemetry_event(%{
    event: [:commanded, :application, :dispatch, :start],
    description: "Emitted when an application starts dispatching a command",
    measurements: "%{system_time: integer()}",
    metadata: """
    %{application: Commanded.Application.t(),
      execution_context: Commanded.Aggregates.ExecutionContext.t()}
    """
  })

  telemetry_event(%{
    event: [:commanded, :application, :dispatch, :stop],
    description: "Emitted when an application stops dispatching a command",
    measurements: "%{duration: non_neg_integer()}",
    metadata: """
    %{application: Commanded.Application.t(),
      execution_context: Commanded.Aggregates.ExecutionContext.t(),
      error: nil | any()}
    """
  })

  @moduledoc """
  Defines a Commanded application.

  The application expects at least an `:otp_app` option to be specified. It
  should point to an OTP application that has the application configuration.

  For example, the application:

      defmodule MyApp.Application do
        use Commanded.Application, otp_app: :my_app

        router(MyApp.Router)
      end

  Could be configured with:

      # config/config.exs
      config :my_app, MyApp.Application
        event_store: [
          adapter: Commanded.EventStore.Adapters.EventStore,
          event_store: MyApp.EventStore
        ],
        pubsub: :local,
        registry: :local

  Alternatively, you can include the configuration when defining the
  application:

      defmodule MyApp.Application do
        use Commanded.Application,
          otp_app: :my_app,
          event_store: [
            adapter: Commanded.EventStore.Adapters.EventStore,
            event_store: MyApp.EventStore
          ],
          pubsub: :local,
          registry: :local

        router(MyApp.Router)
      end

  A Commanded application must be started before it can be used:

      {:ok, _pid} = MyApp.Application.start_link()

  Instead of starting the application manually, you should use a
  [Supervisor](supervision.html).

  ## Supervision

  Use a supervisor to start your Commanded application:

      Supervisor.start_link([
        MyApp.Application
      ], strategy: :one_for_one)

  ## Command routing

  Commanded applications are also composite routers allowing you to include
  one or more routers within an application.

  ### Example

      defmodule MyApp.Application do
        use Commanded.Application, otp_app: :my_app

        router(MyApp.Accounts.Router)
        router(MyApp.Billing.Router)
        router(MyApp.Notifications.Router)
      end

  See `Commanded.Commands.CompositeRouter` for details.

  ## Command dispatch

  Once a router has been configured you can dispatch a command via the
  application:

      :ok = MyApp.dispatch(command, opts)

  See `c:dispatch/1` and `c:dispatch/2` for details.

  ## Dynamic named applications

  An application can be provided with a name as an option to `start_link/1`.
  This can be used to start the same application multiple times, each using its
  own separately configured and isolated event store. Each application must be
  started with a unique name.

  Multipe instances of the same event handler or process manager can be
  started by refering to a started application by its name. The event store
  operations can also be scoped to an application by referring to its name.

  ### Example

  Start an application process for each tenant in a multi-tenanted app,
  guaranteeing that the data and processing remains isolated between tenants.

      for tenant <- [:tenant1, :tenant2, :tenant3] do
        {:ok, _app} = MyApp.Application.start_link(name: tenant)
      end

  Typically you would start the applications using a supervisor:

      children =
        for tenant <- [:tenant1, :tenant2, :tenant3] do
          {MyApp.Application, name: tenant}
        end

      Supervisor.start_link(children, strategy: :one_for_one)

  To dispatch a command you must provide the application name:

      :ok = MyApp.Application.dispatch(command, application: :tenant1)

  ## Default dispatch options

  An application can be configured with default command dispatch options such as
  `:consistency`, `:timeout`, and `:returning`. Any defaults will be used
  unless overridden by options provided to the dispatch function.

      defmodule MyApp.Application do
        use Commanded.Application,
          otp_app: :my_app,
          default_dispatch_opts: [
            consistency: :eventual,
            returning: :aggregate_version
          ]
      end

  See the `Commanded.Commands.Router` module for more details about the
  supported options.

  ## Telemetry

  #{telemetry_docs()}

  """

  @type t :: module

  @doc false
  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts] do
      @behaviour Commanded.Application

      {otp_app, config} = Commanded.Application.Supervisor.compile_config(__MODULE__, opts)

      @otp_app otp_app
      @config config

      use Commanded.Commands.CompositeRouter,
        application: __MODULE__,
        default_dispatch_opts: Keyword.get(opts, :default_dispatch_opts, [])

      def config do
        {:ok, config} =
          Commanded.Application.Supervisor.runtime_config(__MODULE__, @otp_app, @config, [])

        config
      end

      def child_spec(opts) do
        %{
          id: name(opts),
          start: {__MODULE__, :start_link, [opts]},
          type: :supervisor
        }
      end

      def start_link(opts \\ []) do
        name = name(opts)

        Commanded.Application.Supervisor.start_link(__MODULE__, @otp_app, @config, name, opts)
      end

      def stop(pid, timeout \\ 5000) do
        Supervisor.stop(pid, :normal, timeout)
      end

      def aggregate_state(aggregate_module, aggregate_uuid, timeout \\ 5000) do
        Commanded.Aggregates.Aggregate.aggregate_state(
          __MODULE__,
          aggregate_module,
          aggregate_uuid,
          timeout
        )
      end

      defp name(opts) do
        case Keyword.get(opts, :name) do
          nil ->
            __MODULE__

          name when is_atom(name) ->
            name

          invalid ->
            raise ArgumentError,
              message:
                "expected :name option to be an atom but got: " <>
                  inspect(invalid)
        end
      end
    end
  end

  ## User callbacks

  @optional_callbacks init: 1

  @doc """
  A callback executed when the application starts.

  It must return `{:ok, keyword}` with the updated list of configuration.
  """
  @callback init(config :: Keyword.t()) :: {:ok, Keyword.t()}

  @doc """
  Returns the application configuration stored in the `:otp_app` environment.
  """
  @callback config() :: Keyword.t()

  @doc """
  Starts the application supervisor.

  Returns `{:ok, pid}` on sucess, `{:error, {:already_started, pid}}` if the
  application is already started, or `{:error, term}` in case anything else goes
  wrong.
  """
  @callback start_link(opts :: Keyword.t()) ::
              {:ok, pid}
              | {:error, {:already_started, pid}}
              | {:error, term}

  @doc """
  Shuts down the application.
  """
  @callback stop(pid, timeout) :: :ok

  @doc """
  Dispatch a registered command.

    - `command` is a command struct which must be registered with a
      `Commanded.Commands.Router` and included in the application.
      
  """
  @callback dispatch(command :: struct()) ::
              :ok
              | {:ok, aggregate_state :: struct()}
              | {:ok, aggregate_version :: non_neg_integer()}
              | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()}
              | {:error, :unregistered_command}
              | {:error, :consistency_timeout}
              | {:error, reason :: term()}

  @doc """
  Dispatch a registered command.

    - `command` is a command struct which must be registered with a
      `Commanded.Commands.Router` and included in the application.

    - `timeout_or_opts` is either an integer timeout or a keyword list of
      options.

      The timeout must be an integer greater than zero which specifies how many
      milliseconds to allow the command to be handled, or the atom `:infinity`
      to wait indefinitely. The default timeout value is five seconds.

      Alternatively, an options keyword list can be provided, it supports the
      following options.

      Options:

        - `causation_id` - an optional UUID used to identify the cause of the
          command being dispatched.

        - `correlation_id` - an optional UUID used to correlate related
          commands/events together.

        - `consistency` - one of `:eventual` (default) or `:strong`. By
          setting the consistency to `:strong` a successful command dispatch
          will block until all strongly consistent event handlers and process
          managers have handled all events created by the command.

        - `metadata` - an optional map containing key/value pairs comprising
          the metadata to be associated with all events created by the
          command.

        - `returning` - to choose what response is returned from a successful
            command dispatch. The default is to return an `:ok`.

            The available options are:

            - `:aggregate_state` - to return the update aggregate state in the
              successful response: `{:ok, aggregate_state}`.

            - `:aggregate_version` - to include the aggregate stream version
              in the successful response: `{:ok, aggregate_version}`.

            - `:execution_result` - to return a `Commanded.Commands.ExecutionResult`
              struct containing the aggregate's identity, version, and any
              events produced from the command along with their associated
              metadata.

            - `false` - don't return anything except an `:ok`.

        - `timeout` - as described above.

  Returns `:ok` on success unless the `:returning` option is specified where
  it returns one of `{:ok, aggregate_state}`, `{:ok, aggregate_version}`, or
  `{:ok, %Commanded.Commands.ExecutionResult{}}`.

  Returns `{:error, reason}` on failure.

  ## Example

      command = %OpenAccount{account_number: "ACC123", initial_balance: 1_000}

      :ok = BankApp.dispatch(command, timeout: 30_000)

  """
  @callback dispatch(
              command :: struct(),
              timeout_or_opts :: non_neg_integer() | :infinity | Keyword.t()
            ) ::
              :ok
              | {:ok, aggregate_state :: struct()}
              | {:ok, aggregate_version :: non_neg_integer()}
              | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()}
              | {:error, :unregistered_command}
              | {:error, :consistency_timeout}
              | {:error, reason :: term()}

  alias Commanded.Application.Config

  @doc false
  def dispatch(application, command, opts \\ [])

  def dispatch(application, command, timeout) when is_integer(timeout),
    do: dispatch(application, command, timeout: timeout)

  def dispatch(application, command, :infinity),
    do: dispatch(application, command, timeout: :infinity)

  def dispatch(application, command, opts) do
    opts = Keyword.put(opts, :application, application)

    application_module(application).dispatch(command, opts)
  end

  @doc false
  def application_module(application), do: Config.get(application, :application)

  @doc false
  @spec event_store_adapter(Commanded.Application.t()) :: {module, map}
  def event_store_adapter(application), do: Config.get(application, :event_store)

  @doc false
  @spec pubsub_adapter(Commanded.Application.t()) :: {module, map}
  def pubsub_adapter(application), do: Config.get(application, :pubsub)

  @doc false
  @spec registry_adapter(Commanded.Application.t()) :: {module, map}
  def registry_adapter(application), do: Config.get(application, :registry)
end