lib/mix/tasks/phoenix_micro.gen.consumer.ex

defmodule Mix.Tasks.PhoenixMicro.Gen.Consumer do
  use Mix.Task

  @shortdoc "Generates a PhoenixMicro consumer module"

  @moduledoc """
  Generates a PhoenixMicro consumer module with the correct DSL boilerplate.

  ## Usage

      mix phoenix_micro.gen.consumer MyApp.Payments.CreatedConsumer payments.created

  ## Options

      --concurrency N       Processor pool size (default: 5)
      --batch-size N        Batch size for handle_batch/4 (default: 1)
      --batch-timeout N     Batch flush timeout in ms (default: 1000)
      --retry N             Max retry attempts (default: 3)
      --transport NAME      Transport to use: rabbitmq|kafka|nats|redis_streams|memory
      --queue-group NAME    Queue group / consumer group name
      --dlq TOPIC           Dead-letter topic (default: dlq.<topic>)
      --middleware MOD,...  Comma-separated middleware modules
      --no-middleware       Skip default middleware

  ## Example

      mix phoenix_micro.gen.consumer MyApp.Orders.PlacedConsumer orders.placed \\
        --concurrency 10 \\
        --retry 5 \\
        --transport rabbitmq

  Generates `lib/my_app/orders/placed_consumer.ex` and a matching
  test file `test/my_app/orders/placed_consumer_test.exs`.
  """

  @switches [
    concurrency: :integer,
    batch_size: :integer,
    batch_timeout: :integer,
    retry: :integer,
    transport: :string,
    queue_group: :string,
    dlq: :string,
    middleware: :string,
    no_middleware: :boolean
  ]

  @aliases [
    c: :concurrency,
    t: :transport
  ]

  @spec run([String.t()]) :: any()
  @impl Mix.Task
  def run(argv) do
    {opts, args, _errors} = OptionParser.parse(argv, switches: @switches, aliases: @aliases)

    case args do
      [module_name, topic] ->
        generate(module_name, topic, opts)

      [_module_name] ->
        IO.puts(:stderr, "Missing --topic argument.")
        IO.puts(:stderr, "Usage: mix phoenix_micro.gen.consumer MODULE --topic TOPIC")

        exit({:shutdown, 1})

      [] ->
        IO.puts(:stderr, "Usage: mix phoenix_micro.gen.consumer <ModuleName> <topic>")
        exit({:shutdown, 1})
    end
  end

  defp generate(module_name, topic, opts) do
    _validated_mod = Module.safe_concat([module_name])
    file_path = module_to_path(module_name)
    test_path = module_to_test_path(module_name)

    concurrency = Keyword.get(opts, :concurrency, 5)
    batch_size = Keyword.get(opts, :batch_size, 1)
    batch_timeout = Keyword.get(opts, :batch_timeout, 1_000)
    retry = Keyword.get(opts, :retry, 3)
    transport = Keyword.get(opts, :transport)
    queue_group = Keyword.get(opts, :queue_group)
    dlq = Keyword.get(opts, :dlq, "dlq.#{topic}")
    no_middleware = Keyword.get(opts, :no_middleware, false)

    middlewares =
      if no_middleware do
        []
      else
        case Keyword.get(opts, :middleware) do
          nil -> ["PhoenixMicro.Middleware.Logger", "PhoenixMicro.Middleware.Metrics"]
          mods -> String.split(mods, ",") |> Enum.map(&String.trim/1)
        end
      end

    source =
      render_consumer(module_name, topic, %{
        concurrency: concurrency,
        batch_size: batch_size,
        batch_timeout: batch_timeout,
        retry: retry,
        transport: transport,
        queue_group: queue_group,
        dlq: dlq,
        middlewares: middlewares
      })

    test_source = render_consumer_test(module_name, topic)

    create_file(file_path, source)
    create_file(test_path, test_source)

    IO.puts("""

    Consumer generated successfully!

    Files created:
      #{file_path}
      #{test_path}

    Add to your config:

      config :phoenix_micro,
        consumers: [#{module_name}]

    Or register at runtime:

      PhoenixMicro.register_consumer(#{module_name})
    """)
  end

  defp render_consumer(module_name, topic, cfg) do
    middleware_str =
      case cfg.middlewares do
        [] ->
          "  middleware []\n"

        mods ->
          mod_list = mods |> Enum.map(&"    #{&1}") |> Enum.join(",\n")
          "  middleware [\n#{mod_list}\n  ]\n"
      end

    transport_str = if cfg.transport, do: "  transport :#{cfg.transport}\n", else: ""
    queue_group_str = if cfg.queue_group, do: "  queue_group \"#{cfg.queue_group}\"\n", else: ""

    batch_str =
      if cfg.batch_size > 1 do
        "  batch_size #{cfg.batch_size}\n  batch_timeout #{cfg.batch_timeout}\n"
      else
        ""
      end

    """
    defmodule #{module_name} do
      @moduledoc \"\"\"
      Consumer for `#{topic}` messages.

      Generated by `mix phoenix_micro.gen.consumer`.
      \"\"\"

      use PhoenixMicro.Consumer

      topic "#{topic}"
      concurrency #{cfg.concurrency}
      retry max_attempts: #{cfg.retry}, base_delay: 500, max_delay: 30_000, jitter: true
    #{middleware_str}#{transport_str}#{queue_group_str}
    #{batch_str}  dead_letter_topic "#{cfg.dlq}"
    #{middleware_str}#{transport_str}#{queue_group_str}
    #{batch_str}  dead_letter_topic "#{cfg.dlq}"
      @impl PhoenixMicro.Consumer
      def handle(%PhoenixMicro.Message{} = message, _context) do
        # TODO: implement message handling
        # message.payload contains the decoded message body
        # Return :ok to ack, {:error, reason} to retry, :nack to DLQ immediately
        _message = message
        :ok
      end

      @impl PhoenixMicro.Consumer
      def handle_error(message, error, _context) do
        # Optional: customise retry vs DLQ decision
        # {:retry, message} | :nack | :ok
        {:retry, message}
      end
    end
    """
  end

  defp render_consumer_test(module_name, topic) do
    """
    defmodule #{module_name}Test do
      use ExUnit.Case, async: true

      alias PhoenixMicro.{Consumer, Message}

      @consumer #{module_name}

      describe "handle/2" do
        test "returns :ok for a valid message" do
          message = Message.new("#{topic}", %{"key" => "value"})
          context = %{transport: :memory, topic: "#{topic}", attempt: 1}

          assert :ok = Consumer.dispatch(@consumer, message, context)
        end

        test "consumer config is correct" do
          cfg = @consumer.__consumer_config__()
          assert cfg.topic == "#{topic}"
          assert is_integer(cfg.concurrency)
          assert is_list(cfg.retry_opts)
        end
      end
    end
    """
  end

  defp module_to_path(module_name) do
    path =
      module_name
      |> String.replace(".", "/")
      |> Macro.underscore()

    "lib/#{path}.ex"
  end

  defp module_to_test_path(module_name) do
    path =
      module_name
      |> String.replace(".", "/")
      |> Macro.underscore()

    "test/#{path}_test.exs"
  end

  defp create_file(path, content) do
    dir = Path.dirname(path)
    File.mkdir_p!(dir)

    if File.exists?(path) do
      if String.downcase(IO.gets("#{path} already exists. Overwrite? [y/N] ")) =~ ~r/^y/i do
        File.write!(path, content)
        IO.puts("  * overwrite #{path}")
      end
    else
      File.write!(path, content)
      IO.puts("  * create #{path}")
    end
  end
end