defmodule Mix.Tasks.PhoenixMicro.Gen.Consumer do
use Mix.Task
@shortdoc "Generates a PhoenixMicro consumer module"
@moduledoc """
Generates a PhoenixMicro consumer module with the correct DSL boilerplate.
## Usage
mix phoenix_micro.gen.consumer MyApp.Payments.CreatedConsumer payments.created
## Options
--concurrency N Processor pool size (default: 5)
--batch-size N Batch size for handle_batch/4 (default: 1)
--batch-timeout N Batch flush timeout in ms (default: 1000)
--retry N Max retry attempts (default: 3)
--transport NAME Transport to use: rabbitmq|kafka|nats|redis_streams|memory
--queue-group NAME Queue group / consumer group name
--dlq TOPIC Dead-letter topic (default: dlq.<topic>)
--middleware MOD,... Comma-separated middleware modules
--no-middleware Skip default middleware
## Example
mix phoenix_micro.gen.consumer MyApp.Orders.PlacedConsumer orders.placed \\
--concurrency 10 \\
--retry 5 \\
--transport rabbitmq
Generates `lib/my_app/orders/placed_consumer.ex` and a matching
test file `test/my_app/orders/placed_consumer_test.exs`.
"""
@switches [
concurrency: :integer,
batch_size: :integer,
batch_timeout: :integer,
retry: :integer,
transport: :string,
queue_group: :string,
dlq: :string,
middleware: :string,
no_middleware: :boolean
]
@aliases [
c: :concurrency,
t: :transport
]
@spec run([String.t()]) :: any()
@impl Mix.Task
def run(argv) do
{opts, args, _errors} = OptionParser.parse(argv, switches: @switches, aliases: @aliases)
case args do
[module_name, topic] ->
generate(module_name, topic, opts)
[_module_name] ->
IO.puts(:stderr, "Missing --topic argument.")
IO.puts(:stderr, "Usage: mix phoenix_micro.gen.consumer MODULE --topic TOPIC")
exit({:shutdown, 1})
[] ->
IO.puts(:stderr, "Usage: mix phoenix_micro.gen.consumer <ModuleName> <topic>")
exit({:shutdown, 1})
end
end
defp generate(module_name, topic, opts) do
_validated_mod = Module.safe_concat([module_name])
file_path = module_to_path(module_name)
test_path = module_to_test_path(module_name)
concurrency = Keyword.get(opts, :concurrency, 5)
batch_size = Keyword.get(opts, :batch_size, 1)
batch_timeout = Keyword.get(opts, :batch_timeout, 1_000)
retry = Keyword.get(opts, :retry, 3)
transport = Keyword.get(opts, :transport)
queue_group = Keyword.get(opts, :queue_group)
dlq = Keyword.get(opts, :dlq, "dlq.#{topic}")
no_middleware = Keyword.get(opts, :no_middleware, false)
middlewares =
if no_middleware do
[]
else
case Keyword.get(opts, :middleware) do
nil -> ["PhoenixMicro.Middleware.Logger", "PhoenixMicro.Middleware.Metrics"]
mods -> String.split(mods, ",") |> Enum.map(&String.trim/1)
end
end
source =
render_consumer(module_name, topic, %{
concurrency: concurrency,
batch_size: batch_size,
batch_timeout: batch_timeout,
retry: retry,
transport: transport,
queue_group: queue_group,
dlq: dlq,
middlewares: middlewares
})
test_source = render_consumer_test(module_name, topic)
create_file(file_path, source)
create_file(test_path, test_source)
IO.puts("""
Consumer generated successfully!
Files created:
#{file_path}
#{test_path}
Add to your config:
config :phoenix_micro,
consumers: [#{module_name}]
Or register at runtime:
PhoenixMicro.register_consumer(#{module_name})
""")
end
defp render_consumer(module_name, topic, cfg) do
middleware_str =
case cfg.middlewares do
[] ->
" middleware []\n"
mods ->
mod_list = mods |> Enum.map(&" #{&1}") |> Enum.join(",\n")
" middleware [\n#{mod_list}\n ]\n"
end
transport_str = if cfg.transport, do: " transport :#{cfg.transport}\n", else: ""
queue_group_str = if cfg.queue_group, do: " queue_group \"#{cfg.queue_group}\"\n", else: ""
batch_str =
if cfg.batch_size > 1 do
" batch_size #{cfg.batch_size}\n batch_timeout #{cfg.batch_timeout}\n"
else
""
end
"""
defmodule #{module_name} do
@moduledoc \"\"\"
Consumer for `#{topic}` messages.
Generated by `mix phoenix_micro.gen.consumer`.
\"\"\"
use PhoenixMicro.Consumer
topic "#{topic}"
concurrency #{cfg.concurrency}
retry max_attempts: #{cfg.retry}, base_delay: 500, max_delay: 30_000, jitter: true
#{middleware_str}#{transport_str}#{queue_group_str}
#{batch_str} dead_letter_topic "#{cfg.dlq}"
#{middleware_str}#{transport_str}#{queue_group_str}
#{batch_str} dead_letter_topic "#{cfg.dlq}"
@impl PhoenixMicro.Consumer
def handle(%PhoenixMicro.Message{} = message, _context) do
# TODO: implement message handling
# message.payload contains the decoded message body
# Return :ok to ack, {:error, reason} to retry, :nack to DLQ immediately
_message = message
:ok
end
@impl PhoenixMicro.Consumer
def handle_error(message, error, _context) do
# Optional: customise retry vs DLQ decision
# {:retry, message} | :nack | :ok
{:retry, message}
end
end
"""
end
defp render_consumer_test(module_name, topic) do
"""
defmodule #{module_name}Test do
use ExUnit.Case, async: true
alias PhoenixMicro.{Consumer, Message}
@consumer #{module_name}
describe "handle/2" do
test "returns :ok for a valid message" do
message = Message.new("#{topic}", %{"key" => "value"})
context = %{transport: :memory, topic: "#{topic}", attempt: 1}
assert :ok = Consumer.dispatch(@consumer, message, context)
end
test "consumer config is correct" do
cfg = @consumer.__consumer_config__()
assert cfg.topic == "#{topic}"
assert is_integer(cfg.concurrency)
assert is_list(cfg.retry_opts)
end
end
end
"""
end
defp module_to_path(module_name) do
path =
module_name
|> String.replace(".", "/")
|> Macro.underscore()
"lib/#{path}.ex"
end
defp module_to_test_path(module_name) do
path =
module_name
|> String.replace(".", "/")
|> Macro.underscore()
"test/#{path}_test.exs"
end
defp create_file(path, content) do
dir = Path.dirname(path)
File.mkdir_p!(dir)
if File.exists?(path) do
if String.downcase(IO.gets("#{path} already exists. Overwrite? [y/N] ")) =~ ~r/^y/i do
File.write!(path, content)
IO.puts(" * overwrite #{path}")
end
else
File.write!(path, content)
IO.puts(" * create #{path}")
end
end
end