defmodule Mix.Tasks.Bloccs.Run do
@shortdoc "Compile and start a `.bloccs` network supervision tree"
@moduledoc """
mix bloccs.run path/to/network.bloccs [--message <json>] [--port <name>]
Compiles the network, starts the supervisor, and (optionally) feeds a
single JSON message into the named exposed input port (defaults to the
first `[expose].in` entry, or the first port of the first node).
Without `--message`, the task starts the tree and runs an IEx-like wait
loop so you can poke at it from outside (use IEx attaches or test scripts).
"""
use Mix.Task
alias Bloccs.{Parser, Validator, Compiler, Router, Producer, Trace}
@impl Mix.Task
def run(args) do
{opts, rest, _} =
OptionParser.parse(args,
strict: [message: :string, port: :string, wait: :integer, trace: :string]
)
case rest do
[] ->
Mix.raise(
"usage: mix bloccs.run <network.bloccs> [--message <json>] [--port <name>] [--wait <secs>]"
)
[path | _] ->
Mix.Task.run("app.start")
run_network(path, opts)
end
end
defp run_network(path, opts) do
{:ok, network} = Parser.parse_network(path)
:ok = Validator.validate_network(network)
{:ok, sup_module} = Compiler.compile_and_load(network)
{:ok, _pid} = sup_module.start_link([])
Mix.shell().info([:green, "✓ ", :reset, "network `#{network.id}` running"])
print_exposed(network)
case Keyword.get(opts, :message) do
nil ->
secs = Keyword.get(opts, :wait, 5)
Mix.shell().info(" (waiting #{secs}s for messages; interrupt with ^C)")
Process.sleep(secs * 1000)
json ->
feed_message(network, opts, json)
end
end
defp print_exposed(network) do
expose = network.expose
if map_size(expose.in) > 0 do
Mix.shell().info(" exposed inputs:")
Enum.each(expose.in, fn {name, {node, port}} ->
Mix.shell().info(" #{name} → #{node}.#{port}")
end)
end
if map_size(expose.out) > 0 do
Mix.shell().info(" exposed outputs:")
Enum.each(expose.out, fn {name, {node, port}} ->
Mix.shell().info(" #{name} ← #{node}.#{port}")
end)
end
:ok
end
defp feed_message(network, opts, json) do
payload =
case Jason.decode(json) do
{:ok, p} -> p
{:error, _} -> Mix.raise("--message must be valid JSON")
end
network_id = String.to_atom(network.id)
{node, port} = resolve_target_port(network, Keyword.get(opts, :port))
rec = if opts[:trace], do: Trace.record(network_id)
_ = Producer.push(Router.producer_name(network_id, node, port), payload)
Mix.shell().info("→ pushed message to #{node}.#{port}")
Process.sleep(Keyword.get(opts, :wait, 2) * 1000)
if rec do
:ok = Trace.dump(Trace.stop(rec), network_id, opts[:trace])
Mix.shell().info([:green, "✓ ", :reset, "wrote trace to #{opts[:trace]}"])
end
end
defp resolve_target_port(network, nil) do
case network.expose.in do
empty when map_size(empty) == 0 ->
first_node = Enum.at(network.nodes, 0) |> elem(1)
first_port = Map.keys(first_node.manifest.ports_in) |> List.first()
{first_node.local_id, first_port}
ins ->
{_name, endpoint} = Enum.at(ins, 0)
endpoint
end
end
defp resolve_target_port(network, name) do
expose_atom = String.to_atom(name)
case Map.fetch(network.expose.in, expose_atom) do
{:ok, endpoint} -> endpoint
:error -> Mix.raise("no exposed input port `#{name}` in network #{network.id}")
end
end
end