lib/phoenix_micro.ex

defmodule PhoenixMicro do
  @moduledoc """
  PhoenixMicro — a production-grade microservices toolkit for Elixir/Phoenix.

  Phoenix Microservices, built natively for OTP and the BEAM VM.

  ## Quick start

      # 1. Configure in config/config.exs
      config :phoenix_micro,
        transport: :rabbitmq,
        consumers: [MyApp.Payments.CreatedConsumer],
        transports: [
          rabbitmq: [url: "amqp://localhost"]
        ]

      # 2. Define a consumer
      defmodule MyApp.Payments.CreatedConsumer do
        use PhoenixMicro.Consumer

        topic "payments.created"
        concurrency 5
        retry max_attempts: 3

        @impl PhoenixMicro.Consumer
        def handle(message, _ctx) do
          Logger.info("Received payload: \#{inspect(message.payload)}\")
          :ok
        end
      end

      # 3. Publish from anywhere
      PhoenixMicro.publish("payments.created", %{amount: 100, currency: "USD"})

      # 4. RPC — two calling conventions
      {:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])
      {:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])

  ## Architecture

      PhoenixMicro.Application
        └── PhoenixMicro.Supervisor (one_for_one)
              ├── Registry
              ├── Transport.Memory
              ├── Transport.* (configured)
              ├── Producer
              ├── RPC
              └── ConsumerManager (DynamicSupervisor)
                    └── Pipeline (Broadway) per consumer
  """

  alias PhoenixMicro.{Config, Producer, RPC}
  alias PhoenixMicro.Supervisor.ConsumerManager

  # ---------------------------------------------------------------------------
  # Publishing
  # ---------------------------------------------------------------------------

  @doc """
  Publishes a message to `topic` asynchronously (fire-and-forget).

  Options: `:transport`, `:headers`, `:correlation_id`.
  """
  @spec publish(String.t(), term(), keyword()) :: :ok
  defdelegate publish(topic, payload, opts \\ []), to: Producer

  @doc """
  Publishes to `topic` synchronously. Returns `:ok` or `{:error, reason}`.
  """
  @spec publish_sync(String.t(), term(), keyword()) :: :ok | {:error, term()}
  defdelegate publish_sync(topic, payload, opts \\ []), to: Producer

  @doc """
  Publishes a batch of `[{topic, payload}]` tuples.
  """
  @spec publish_batch([{String.t(), term()}], keyword()) :: :ok
  defdelegate publish_batch(messages, opts \\ []), to: Producer

  # ---------------------------------------------------------------------------
  # RPC — supports both rpc(topic, payload, opts) and rpc(service, pattern, payload)
  # ---------------------------------------------------------------------------

  @doc """
  Synchronous RPC call. Supports two signatures:

      # topic + payload form
      {:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])

      # service + pattern + payload form (spec-compliant)
      {:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])

      # with options
      {:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3], timeout: 3_000)

  Options: `:timeout` (ms, default 5000), `:retry` (attempts, default 0).
  """
  @spec rpc(String.t(), term(), keyword() | term()) :: {:ok, term()} | {:error, term()}
  def rpc(topic, payload, opts \\ [])

  def rpc(topic, payload, opts) when is_binary(topic) and is_list(opts) do
    RPC.call(topic, payload, opts)
  end

  # rpc(service, pattern, payload) — pattern is binary, payload is not opts list
  def rpc(service, pattern, payload)
      when is_binary(service) and is_binary(pattern) do
    RPC.call("#{service}.#{pattern}", payload, [])
  end

  @doc """
  4-argument RPC: `rpc(service, pattern, payload, opts)`.

      {:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3], timeout: 3_000)
  """
  @spec rpc(String.t(), String.t(), term(), keyword()) :: {:ok, term()} | {:error, term()}
  def rpc(service, pattern, payload, opts)
      when is_binary(service) and is_binary(pattern) and is_list(opts) do
    RPC.call("#{service}.#{pattern}", payload, opts)
  end

  # ---------------------------------------------------------------------------
  # Consumer management
  # ---------------------------------------------------------------------------

  @doc "Dynamically registers and starts a consumer module."
  @spec register_consumer(module()) :: {:ok, pid()} | {:error, term()}
  defdelegate register_consumer(module), to: ConsumerManager, as: :start_consumer

  @doc "Stops a running consumer."
  @spec deregister_consumer(module()) :: :ok | {:error, :not_found}
  defdelegate deregister_consumer(module), to: ConsumerManager, as: :stop_consumer

  @doc "Returns all currently running consumer modules."
  @spec consumers() :: [module()]
  defdelegate consumers(), to: ConsumerManager, as: :running_consumers

  @doc "Returns the active transport module."
  @spec transport() :: module()
  def transport, do: Config.active_transport()
end