defmodule Mix.Tasks.Agy do
@moduledoc """
Runs `agy` in headless mode with prompts tailored to project workflows.
Examples:
mix agy --mode plan path/to/implementation_plan.md --dry-run
mix agy --mode walkthrough path/to/walkthrough.md --timeout 10m
mix agy --mode commit-review path/to/walkthrough.md --skip-permissions
mix agy --mode plan path/to/implementation_plan.md --save-conversation release-review
mix agy --conversation-name release-review --mode walkthrough path/to/walkthrough.md
mix agy --conversation-name release-review --log-file auto --mode plan path/to/implementation_plan.md
mix agy --conversation-name release-review --watchdog-timeout 30m --mode plan path/to/implementation_plan.md
The task defaults to safe execution: it does not pass
`--dangerously-skip-permissions` unless `--skip-permissions` is explicitly
provided.
"""
use Mix.Task
import Bitwise, only: [band: 2]
@shortdoc "Run agy headless with project workflow prompts"
@readme_path Path.expand("../../../README.md", __DIR__)
@external_resource @readme_path
@readme File.read!(@readme_path)
@switches [
mode: :string,
prompt: :string,
conversation: :string,
conversation_name: :string,
save_conversation: :string,
list_conversations: :boolean,
list_presence: :boolean,
forget_conversation: :string,
conversation_store: :string,
agy_brain_dir: :string,
log_file: :string,
run_log: :string,
watchdog_timeout: :string,
continue: :boolean,
timeout: :string,
add_dir: :keep,
language_server: :boolean,
agy: :string,
cwd: :string,
skip_permissions: :boolean,
dry_run: :boolean,
readme: :boolean,
help: :boolean
]
@aliases [
m: :mode,
p: :prompt
]
@modes ~w(review plan walkthrough refine commit-review ask)
@presence_heartbeat_ms 15_000
@presence_stale_seconds 60
@antigravity_ignore_contents "*\n"
@language_server_env [
{"disable_semantic_codebase_search", "true"},
{"_disable_semantic_codebase_search", "true"},
{"DISABLE_IDE_COMPLETIONS_DEBOUNCE", "1"},
{"SUPERCOMPLETE_DISABLE_TYPING_CACHE", "1"}
]
@impl Mix.Task
def run(argv) do
case parse(argv) do
{:help, help} ->
stdio_info(help)
{:readme, readme} ->
stdio_info(readme)
{:ok, opts, files} ->
opts = put_conversation_paths(opts)
cond do
opts[:list_presence] ->
(opts[:cwd] || File.cwd!())
|> list_presence()
opts[:list_conversations] ->
opts[:conversation_store]
|> read_conversations()
|> format_conversations()
|> stdio_info()
opts[:forget_conversation] ->
case forget_conversation(opts[:conversation_store], opts[:forget_conversation]) do
:ok ->
stdio_info("Forgot agy conversation #{inspect(opts[:forget_conversation])}.")
{:error, reason} ->
Mix.raise(reason)
end
true ->
case resolve_conversation(opts) do
{:ok, opts} ->
prompt = prompt(opts[:mode], files, opts[:prompt])
{command, command_args} = command(prompt, opts)
if opts[:dry_run] do
stdio_info(format_dry_run(command, command_args, prompt))
else
print_log_hints(opts)
prepare_log_files(opts)
run_command(command, command_args, opts, prompt, files)
maybe_save_conversation(opts)
end
{:error, reason} ->
Mix.raise(reason)
end
end
{:error, reason} ->
Mix.raise(reason)
end
end
def parse(argv) do
{opts, files, invalid} = OptionParser.parse(argv, strict: @switches, aliases: @aliases)
cond do
opts[:help] ->
{:help, usage()}
opts[:readme] ->
{:readme, readme()}
invalid != [] ->
{:error, "Invalid option(s): #{Enum.map_join(invalid, ", ", &elem(&1, 0))}"}
conversation_selector_count(opts) > 1 ->
{:error, "Choose only one of --conversation, --conversation-name, or --continue."}
opts[:mode] && opts[:mode] not in @modes ->
{:error,
"Unknown mode #{inspect(opts[:mode])}. Supported modes: #{Enum.join(@modes, ", ")}"}
files == [] && blank?(opts[:prompt]) && !conversation_action?(opts) ->
{:error, "Provide at least one file path or --prompt text."}
true ->
opts = Keyword.put_new(opts, :mode, default_mode(files))
{:ok, opts, files}
end
end
def readme, do: @readme
def prompt(mode, files, extra_prompt \\ nil) when mode in @modes do
files_block =
case files do
[] ->
"No explicit files were provided."
paths ->
paths
|> Enum.map(&format_prompt_path/1)
|> Enum.map_join("\n", &"- #{&1}")
end
extra_block =
if blank?(extra_prompt) do
"No extra instruction."
else
String.trim(extra_prompt)
end
"""
You are running inside the current project through `agy` headless mode as a
bounded sidecar reviewer/implementer. Treat the listed files and the local
repository as the source of truth. Do not invent file contents or assume
context you have not inspected.
Mode: #{mode}
Files:
#{files_block}
Extra instruction:
#{extra_block}
#{mode_instruction(mode)}
Operating contract:
- Read the listed files first. If the task needs nearby context, inspect the
smallest relevant set of local files before acting.
- Keep all work scoped to the requested mode, files, and explicit extra
instruction. Avoid adjacent refactors, formatting churn, generated-output
noise, and dependency changes unless they are required to finish.
- Preserve unrelated dirty work. Never use destructive git commands such as
`git reset --hard`, `git checkout --`, `git clean`, force-push, or broad
rebase operations unless the user explicitly requested that exact action.
- In `review`, `plan`, `walkthrough`, and `ask` modes, do not edit files
unless the extra instruction explicitly asks for implementation, fixes, or
a commit.
- In `refine` mode, edit only the requested prose/documentation surface and
preserve the main thesis and information content.
- Do not commit unless the mode is `commit-review` or the extra instruction
explicitly requests a commit. If committing, stage only intended files and
use a focused commit message.
- Prefer bounded verification. If you run commands, report the exact command,
the outcome, and any known warnings or skipped checks.
- If blocked by permissions, missing files, auth, network, or uncertainty
that would make edits risky, stop and report the concrete blocker.
Response contract:
- Lead with findings when reviewing.
- For implementation work, summarize changed files, verification, and any
residual risk.
- Be concise and concrete. Cite local file paths when useful.
"""
|> String.trim()
end
def command(prompt, opts \\ []) do
agy = opts[:agy] || "agy"
args =
["-p", prompt]
|> add_timeout(opts[:timeout])
|> add_continue(opts[:continue])
|> add_conversation(opts[:resolved_conversation] || opts[:conversation])
|> add_log_file(opts[:log_file])
|> add_dirs(Keyword.get_values(opts, :add_dir))
|> add_skip_permissions(opts[:skip_permissions])
{agy, args}
end
defp format_prompt_path(path), do: inspect(Path.expand(path))
def usage do
"""
Usage:
mix agy [PATH ...] [options]
Options:
--mode MODE review | plan | walkthrough | refine | commit-review | ask
--prompt TEXT, -p TEXT Extra instruction appended to the generated prompt
--timeout DURATION Forwarded as --print-timeout, for example 10m
--conversation UUID Continue a specific agy conversation
--conversation-name NAME Continue a named conversation from the local registry
--save-conversation NAME Save the selected or latest agy conversation under NAME
--list-conversations List named agy conversations stored for this repo
--list-presence List active agy presence sessions
--forget-conversation N Remove a named agy conversation from the registry
--conversation-store P Registry path, defaults to .antigravitycli/agy_conversations.tsv
--agy-brain-dir PATH agy brain directory used to discover latest conversations
--log-file PATH|auto Forward --log-file to agy; "auto" writes under .antigravitycli/logs
--run-log PATH|auto|none Wrapper audit log, defaults to "auto"
--watchdog-timeout DUR Kill agy after no stdout/log/transcript activity for a duration
--continue Forward --continue to agy for its most recent conversation
--add-dir PATH Forwarded to agy; may be repeated
--agy PATH agy executable path, defaults to "agy"
--cwd PATH Working directory for the agy process
--skip-permissions Forward --dangerously-skip-permissions
--dry-run Print command shape and prompt without executing agy
--readme Print the embedded README for agent-facing context
Examples:
mix agy --readme
mix agy --mode plan path/to/implementation_plan.md --dry-run
mix agy --mode walkthrough path/to/walkthrough.md --timeout 10m
mix agy --mode commit-review path/to/walkthrough.md --skip-permissions
mix agy --mode plan implementation_plan.md --save-conversation release-review
mix agy --conversation-name release-review --mode walkthrough walkthrough.md
mix agy --conversation-name release-review --log-file auto --mode plan implementation_plan.md
mix agy --conversation-name release-review --watchdog-timeout 30m --mode plan implementation_plan.md
mix agy --list-conversations
mix agy --list-presence
"""
|> String.trim()
end
defp run_command(command, command_args, opts, prompt, files) do
cwd = opts[:cwd] || File.cwd!()
executable = resolve_runtime!(command, cwd)
log_path = opts[:run_log]
watchdog_ms = parse_watchdog_timeout!(opts[:watchdog_timeout])
started_at = System.monotonic_time(:millisecond)
activity_paths = activity_paths(opts)
activity_state = activity_state(activity_paths)
{manifest_path, manifest} = write_presence_manifest(opts, files, cwd)
try do
{conflict_res, warned_sessions} = evaluate_presence_conflicts(manifest, cwd)
case conflict_res do
:ok -> :ok
{:conflict, msg} -> Mix.raise(msg)
end
append_run_log(log_path, """
[#{timestamp()}] RUN-START
cwd=#{cwd}
command=#{format_command_shape(command, command_args, prompt)}
agy_log=#{opts[:log_file] || "none"}
transcript=#{transcript_path(opts) || "unknown"}
watchdog_idle_timeout=#{opts[:watchdog_timeout] || "none"}
""")
{spawn_executable, spawn_args} = spawn_with_closed_stdin(executable, command_args)
port =
Port.open({:spawn_executable, spawn_executable}, [
:binary,
:exit_status,
:stderr_to_stdout,
{:args, spawn_args},
{:cd, cwd},
{:env, runtime_env(opts)}
])
result =
collect_port(
port,
log_path,
watchdog_ms,
started_at,
started_at,
started_at,
activity_state,
manifest_path,
manifest,
started_at,
warned_sessions,
<<>>
)
case result do
{:exit, 0} ->
append_run_log(log_path, "[#{timestamp()}] RUN-END status=0\n")
:ok
{:exit, status} ->
append_run_log(log_path, "[#{timestamp()}] RUN-END status=#{status}\n")
Mix.raise("agy exited with status #{status}")
{:timeout, idle_ms} ->
append_run_log(
log_path,
"[#{timestamp()}] RUN-END watchdog_idle_timeout idle_ms=#{idle_ms}\n"
)
Mix.raise("agy watchdog idle timeout after #{idle_ms}ms")
{:conflict, msg} ->
append_run_log(log_path, "[#{timestamp()}] RUN-END conflict\n")
Mix.raise(msg)
end
after
File.rm(manifest_path)
end
end
defp collect_port(
port,
log_path,
watchdog_ms,
started_at,
last_activity_at,
last_heartbeat_at,
activity_state,
manifest_path,
manifest,
last_manifest_touch_at,
warned_sessions,
child_output_pending
) do
receive do
{^port, {:data, data}} ->
child_output_pending = write_child_output(child_output_pending, data)
append_run_log(log_path, data)
now = System.monotonic_time(:millisecond)
{manifest, last_manifest_touch_at, warned_sessions, conflict_res} =
maybe_touch_and_evaluate(
manifest_path,
manifest,
now,
last_manifest_touch_at,
warned_sessions
)
case conflict_res do
{:conflict, msg} ->
flush_child_output_pending(child_output_pending)
kill_port(port)
{:conflict, msg}
:ok ->
collect_port(
port,
log_path,
watchdog_ms,
started_at,
now,
last_heartbeat_at,
activity_state,
manifest_path,
manifest,
last_manifest_touch_at,
warned_sessions,
child_output_pending
)
end
{^port, {:exit_status, status}} ->
flush_child_output_pending(child_output_pending)
{:exit, status}
after
1_000 ->
now = System.monotonic_time(:millisecond)
elapsed_ms = now - started_at
{activity?, activity_state} = activity_changed?(activity_state)
last_activity_at = if activity?, do: now, else: last_activity_at
{manifest, last_manifest_touch_at, warned_sessions, conflict_res} =
maybe_touch_and_evaluate(
manifest_path,
manifest,
now,
last_manifest_touch_at,
warned_sessions
)
case conflict_res do
{:conflict, msg} ->
flush_child_output_pending(child_output_pending)
kill_port(port)
{:conflict, msg}
:ok ->
cond do
watchdog_ms && now - last_activity_at >= watchdog_ms ->
flush_child_output_pending(child_output_pending)
kill_port(port)
{:timeout, now - last_activity_at}
activity? ->
append_run_log(
log_path,
"[#{timestamp()}] RUN-ACTIVITY elapsed_ms=#{elapsed_ms}\n"
)
collect_port(
port,
log_path,
watchdog_ms,
started_at,
last_activity_at,
last_heartbeat_at,
activity_state,
manifest_path,
manifest,
last_manifest_touch_at,
warned_sessions,
child_output_pending
)
now - last_heartbeat_at >= 30_000 ->
append_run_log(log_path, "[#{timestamp()}] RUN-WAIT elapsed_ms=#{elapsed_ms}\n")
collect_port(
port,
log_path,
watchdog_ms,
started_at,
last_activity_at,
now,
activity_state,
manifest_path,
manifest,
last_manifest_touch_at,
warned_sessions,
child_output_pending
)
true ->
collect_port(
port,
log_path,
watchdog_ms,
started_at,
last_activity_at,
last_heartbeat_at,
activity_state,
manifest_path,
manifest,
last_manifest_touch_at,
warned_sessions,
child_output_pending
)
end
end
end
end
defp write_child_output(pending, data) do
case :unicode.characters_to_binary(pending <> data, :utf8, :utf8) do
text when is_binary(text) ->
unicode_stdio_write(text)
<<>>
{:incomplete, text, rest} ->
unicode_stdio_write(text)
rest
{:error, text, rest} ->
unicode_stdio_write(text)
IO.binwrite(rest)
<<>>
end
end
defp flush_child_output_pending(<<>>), do: :ok
defp flush_child_output_pending(pending), do: IO.binwrite(pending)
defp kill_port(port) do
os_pid =
case Port.info(port, :os_pid) do
{:os_pid, pid} when is_integer(pid) -> pid
_ -> nil
end
if os_pid do
System.cmd("kill", ["-TERM", Integer.to_string(os_pid)], stderr_to_stdout: true)
Process.sleep(100)
if process_alive?(os_pid) do
System.cmd("kill", ["-KILL", Integer.to_string(os_pid)], stderr_to_stdout: true)
end
end
Port.close(port)
rescue
ArgumentError -> :ok
end
defp spawn_with_closed_stdin(executable, args) do
shell = System.find_executable("sh") || "/bin/sh"
script = "exec </dev/null; exec \"$0\" \"$@\""
{shell, ["-c", script, executable | args]}
end
defp process_alive?(pid) do
case System.cmd("kill", ["-0", Integer.to_string(pid)], stderr_to_stdout: true) do
{_output, 0} -> true
{_output, _status} -> false
end
end
defp maybe_save_conversation(opts) do
case opts[:save_conversation] do
nil ->
:ok
name ->
conversation_id =
opts[:resolved_conversation] || opts[:conversation] ||
latest_conversation_id(opts[:agy_brain_dir])
if blank?(conversation_id) do
Mix.raise("Could not discover an agy conversation id to save as #{inspect(name)}.")
end
case save_conversation(opts[:conversation_store], name, conversation_id) do
:ok ->
stdio_info("Saved agy conversation #{conversation_id} as #{inspect(name)}.")
{:error, reason} ->
Mix.raise(reason)
end
end
end
defp print_log_hints(opts) do
if opts[:run_log] do
stdio_info("Wrapper run log: #{opts[:run_log]}")
stdio_info("tail -f #{shell_quote(opts[:run_log])}")
end
if opts[:log_file] do
stdio_info("agy log file: #{opts[:log_file]}")
stdio_info("tail -f #{shell_quote(opts[:log_file])}")
end
case transcript_path(opts) do
nil ->
:ok
path ->
stdio_info("agy transcript: #{path}")
stdio_info("tail -f #{shell_quote(path)}")
end
end
defp prepare_log_files(opts) do
Enum.each([opts[:run_log], opts[:log_file]], fn
nil -> :ok
path -> prepare_runtime_path(path)
end)
end
defp prepare_runtime_path(path) do
ensure_antigravity_gitignore(path)
File.mkdir_p!(Path.dirname(path))
end
defp ensure_antigravity_gitignore(path) do
with root when is_binary(root) <- antigravity_root(path) do
File.mkdir_p!(root)
gitignore_path = Path.join(root, ".gitignore")
if File.read(gitignore_path) != {:ok, @antigravity_ignore_contents} do
File.write!(gitignore_path, @antigravity_ignore_contents)
end
end
end
defp antigravity_root(path) do
path
|> Path.expand()
|> Path.split()
|> antigravity_root_from_parts()
end
defp antigravity_root_from_parts(parts) do
case Enum.find_index(parts, &(&1 == ".antigravitycli")) do
nil -> nil
index -> parts |> Enum.take(index + 1) |> Path.join()
end
end
defp stdio_info(message) do
unicode_stdio_write(message <> "\n")
end
defp unicode_stdio_write(message) do
previous_encoding = stdio_encoding()
:io.setopts(:standard_io, encoding: :unicode)
try do
IO.write(message)
after
if previous_encoding do
:io.setopts(:standard_io, encoding: previous_encoding)
end
end
end
defp stdio_encoding do
case :io.getopts(:standard_io) do
opts when is_list(opts) -> Keyword.get(opts, :encoding)
_ -> nil
end
end
defp mode_instruction("review") do
"""
Review the listed file(s) against the current project state. Lead with
actionable findings ordered by severity, then summarize residual risks.
Edit only if the extra instruction asks you to edit.
"""
end
defp mode_instruction("plan") do
"""
Read the implementation plan, answer open questions, tighten the engineering
boundary, and make the plan directly actionable. Prefer explicit decisions
over broad options.
"""
end
defp mode_instruction("walkthrough") do
"""
Read the walkthrough, inspect the corresponding project changes, review
for correctness gaps, and propose or apply focused fixes only when the extra
instruction asks for fixes.
"""
end
defp mode_instruction("refine") do
"""
Refine the listed Markdown or notes in place. Preserve the thesis, remove
chat residue, tighten claims, and keep the result bounded.
"""
end
defp mode_instruction("commit-review") do
"""
Review the project against the listed walkthrough or plan, fix actionable
issues, run focused verification, and create a focused git commit. Do not
include unrelated dirty work. If verification cannot be run, say exactly why
before committing.
"""
end
defp mode_instruction("ask") do
"""
Answer the extra instruction using the listed files as context. Prefer a
direct answer unless code edits are explicitly requested.
"""
end
defp default_mode(files) do
case Enum.map(files, &Path.basename/1) do
["implementation_plan.md" | _] -> "plan"
["walkthrough.md" | _] -> "walkthrough"
_ -> "review"
end
end
defp add_timeout(args, nil), do: args
defp add_timeout(args, timeout), do: args ++ ["--print-timeout", timeout]
defp add_continue(args, true), do: args ++ ["--continue"]
defp add_continue(args, _), do: args
defp add_conversation(args, nil), do: args
defp add_conversation(args, conversation), do: args ++ ["--conversation", conversation]
defp add_log_file(args, nil), do: args
defp add_log_file(args, log_file), do: args ++ ["--log-file", log_file]
defp add_dirs(args, dirs) do
Enum.reduce(dirs, args, fn dir, acc -> acc ++ ["--add-dir", dir] end)
end
defp add_skip_permissions(args, true), do: args ++ ["--dangerously-skip-permissions"]
defp add_skip_permissions(args, _), do: args
defp runtime_env(opts) do
if language_server_enabled?(opts) do
[]
else
Enum.map(@language_server_env, fn {name, value} ->
{String.to_charlist(name), String.to_charlist(value)}
end)
end
end
defp language_server_enabled?(opts), do: opts[:language_server] == true
defp format_dry_run(command, command_args, prompt) do
safe_args =
command_args
|> Enum.map(fn
^prompt -> "<prompt>"
arg -> arg
end)
"""
Command:
#{shell_join([command | safe_args])}
Prompt:
#{prompt}
"""
|> String.trim()
end
defp put_conversation_paths(opts) do
cwd = opts[:cwd] || File.cwd!()
opts
|> Keyword.put(
:conversation_store,
opts[:conversation_store] ||
Path.expand(".antigravitycli/agy_conversations.tsv", cwd)
)
|> Keyword.put(
:agy_brain_dir,
opts[:agy_brain_dir] ||
Path.join([System.user_home!(), ".gemini", "antigravity-cli", "brain"])
)
|> put_log_file_path(cwd)
|> put_run_log_path(cwd)
end
defp put_log_file_path(opts, cwd) do
case opts[:log_file] do
nil ->
opts
"auto" ->
timestamp =
DateTime.utc_now()
|> DateTime.truncate(:second)
|> Calendar.strftime("%Y%m%dT%H%M%SZ")
mode = opts[:mode] || "agy"
path = Path.expand(".antigravitycli/logs/#{timestamp}-#{mode}.log", cwd)
Keyword.put(opts, :log_file, path)
path ->
Keyword.put(opts, :log_file, Path.expand(path, cwd))
end
end
defp put_run_log_path(opts, cwd) do
case opts[:run_log] do
"none" ->
Keyword.delete(opts, :run_log)
nil ->
Keyword.put(opts, :run_log, auto_log_path(opts, cwd, "run.log"))
"auto" ->
Keyword.put(opts, :run_log, auto_log_path(opts, cwd, "run.log"))
path ->
Keyword.put(opts, :run_log, Path.expand(path, cwd))
end
end
defp auto_log_path(opts, cwd, suffix) do
timestamp =
DateTime.utc_now()
|> DateTime.truncate(:second)
|> Calendar.strftime("%Y%m%dT%H%M%SZ")
mode = opts[:mode] || "agy"
Path.expand(".antigravitycli/logs/#{timestamp}-#{mode}.#{suffix}", cwd)
end
defp resolve_conversation(opts) do
conversations = read_conversations(opts[:conversation_store])
cond do
opts[:conversation_name] ->
case find_conversation(conversations, opts[:conversation_name]) do
nil ->
{:error, "No agy conversation named #{inspect(opts[:conversation_name])}."}
%{id: conversation_id} ->
{:ok, Keyword.put(opts, :resolved_conversation, conversation_id)}
end
opts[:conversation] ->
conversation_id =
case find_conversation(conversations, opts[:conversation]) do
nil -> opts[:conversation]
%{id: id} -> id
end
{:ok, Keyword.put(opts, :resolved_conversation, conversation_id)}
true ->
{:ok, opts}
end
end
defp read_conversations(path) do
case File.read(path) do
{:ok, contents} ->
contents
|> String.split("\n", trim: true)
|> Enum.flat_map(&parse_conversation_line/1)
|> Enum.sort_by(& &1.name)
{:error, :enoent} ->
[]
{:error, reason} ->
Mix.raise("Could not read agy conversation store #{path}: #{:file.format_error(reason)}")
end
end
defp parse_conversation_line(line) do
case String.split(line, "\t") do
[name, id, updated_at] when name != "" and id != "" ->
[%{name: name, id: id, updated_at: updated_at}]
[name, id] when name != "" and id != "" ->
[%{name: name, id: id, updated_at: ""}]
_ ->
[]
end
end
defp save_conversation(path, name, conversation_id) do
cond do
blank?(name) ->
{:error, "Conversation name cannot be blank."}
blank?(conversation_id) ->
{:error, "Conversation id cannot be blank."}
unsafe_registry_field?(name) ->
{:error, "Conversation name cannot contain tabs or newlines."}
unsafe_registry_field?(conversation_id) ->
{:error, "Conversation id cannot contain tabs or newlines."}
true ->
updated_at = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()
conversations =
path
|> read_conversations()
|> Enum.reject(&(&1.name == name))
|> then(&[%{name: name, id: conversation_id, updated_at: updated_at} | &1])
|> Enum.sort_by(& &1.name)
prepare_runtime_path(path)
File.write(path, encode_conversations(conversations))
end
end
defp unsafe_registry_field?(value) do
String.contains?(to_string(value), ["\t", "\n", "\r"])
end
defp forget_conversation(path, name) do
conversations = read_conversations(path)
if Enum.any?(conversations, &(&1.name == name)) do
conversations =
conversations
|> Enum.reject(&(&1.name == name))
|> Enum.sort_by(& &1.name)
prepare_runtime_path(path)
File.write(path, encode_conversations(conversations))
else
{:error, "No agy conversation named #{inspect(name)}."}
end
end
defp encode_conversations(conversations) do
conversations
|> Enum.map_join("\n", fn conversation ->
Enum.join([conversation.name, conversation.id, conversation.updated_at], "\t")
end)
|> case do
"" -> ""
contents -> contents <> "\n"
end
end
defp format_conversations([]), do: "No named agy conversations are stored."
defp format_conversations(conversations) do
conversations
|> Enum.map_join("\n", fn conversation ->
suffix =
if blank?(conversation.updated_at) do
""
else
" updated_at=#{conversation.updated_at}"
end
"#{conversation.name}\t#{conversation.id}#{suffix}"
end)
end
defp latest_conversation_id(nil), do: nil
defp latest_conversation_id(brain_dir) do
brain_dir = Path.expand(brain_dir)
with {:ok, entries} <- File.ls(brain_dir) do
entries
|> Enum.map(&Path.join(brain_dir, &1))
|> Enum.filter(&File.dir?/1)
|> Enum.flat_map(fn path ->
name = Path.basename(path)
if uuid?(name) do
case File.stat(path, time: :posix) do
{:ok, stat} -> [{stat.mtime, name}]
{:error, _reason} -> []
end
else
[]
end
end)
|> Enum.sort_by(&elem(&1, 0), :desc)
|> case do
[{_mtime, conversation_id} | _] -> conversation_id
[] -> nil
end
else
{:error, _reason} -> nil
end
end
defp transcript_path(opts) do
conversation_id = opts[:resolved_conversation] || opts[:conversation]
if uuid?(to_string(conversation_id)) do
Path.join([
opts[:agy_brain_dir],
conversation_id,
".system_generated",
"logs",
"transcript.jsonl"
])
end
end
defp activity_paths(opts) do
[opts[:log_file], transcript_path(opts)]
|> Enum.reject(&blank?/1)
end
defp activity_state(paths) do
Map.new(paths, fn path -> {path, file_signature(path)} end)
end
defp activity_changed?(state) do
new_state =
Map.new(state, fn {path, _signature} ->
{path, file_signature(path)}
end)
{new_state != state, new_state}
end
defp file_signature(path) do
case File.stat(path, time: :posix) do
{:ok, stat} -> {stat.size, stat.mtime}
{:error, _reason} -> :missing
end
end
defp append_run_log(nil, _data), do: :ok
defp append_run_log(path, data), do: File.write!(path, data, [:append])
defp format_command_shape(command, command_args, prompt) do
safe_args =
command_args
|> Enum.map(fn
^prompt -> "<prompt>"
arg -> arg
end)
shell_join([command | safe_args])
end
defp shell_join(args), do: Enum.map_join(args, " ", &shell_quote/1)
defp shell_quote(arg) do
arg = to_string(arg)
if Regex.match?(~r|\A[A-Za-z0-9_@%+=:,./-]+\z|, arg) do
arg
else
"'" <> String.replace(arg, "'", "'\"'\"'") <> "'"
end
end
defp resolve_runtime!(command, cwd) do
unless File.dir?(cwd) do
Mix.raise("--cwd must be an existing directory: #{cwd}")
end
cond do
path = System.find_executable(command) ->
path
explicit_command_path?(command) ->
path = Path.expand(command, cwd)
if executable_file?(path) do
path
else
Mix.raise(
"Could not find agy executable #{inspect(command)}. Use --agy PATH to configure it."
)
end
true ->
Mix.raise(
"Could not find agy executable #{inspect(command)}. Use --agy PATH to configure it."
)
end
end
defp explicit_command_path?(command) do
Path.type(command) != :relative or String.contains?(command, "/")
end
defp executable_file?(command) do
Path.type(command) != :relative and File.regular?(command) and executable_mode?(command)
end
defp executable_mode?(path) do
case File.stat(path) do
{:ok, %File.Stat{mode: mode}} -> band(mode, 0o111) != 0
{:error, _reason} -> false
end
end
defp parse_watchdog_timeout!(nil), do: nil
defp parse_watchdog_timeout!("none"), do: nil
defp parse_watchdog_timeout!(duration) do
duration = String.trim(to_string(duration))
case Regex.run(~r/\A(\d+)(ms|s|m|h)?\z/, duration, capture: :all_but_first) do
[amount] ->
String.to_integer(amount) * 1_000
[amount, unit] ->
String.to_integer(amount) * duration_factor(unit)
_ ->
Mix.raise(
"Invalid --watchdog-timeout #{inspect(duration)}. Use forms like 500ms, 30s, 20m, or 1h."
)
end
end
defp duration_factor("ms"), do: 1
defp duration_factor("s"), do: 1_000
defp duration_factor("m"), do: 60_000
defp duration_factor("h"), do: 3_600_000
defp timestamp do
DateTime.utc_now()
|> DateTime.truncate(:second)
|> DateTime.to_iso8601()
end
defp find_conversation(conversations, name) do
Enum.find(conversations, &(&1.name == name))
end
defp uuid?(value) do
Regex.match?(
~r/\A[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\z/i,
value
)
end
defp conversation_selector_count(opts) do
[opts[:conversation], opts[:conversation_name], opts[:continue]]
|> Enum.count(fn value -> value not in [nil, false] end)
end
defp conversation_action?(opts) do
opts[:list_conversations] || opts[:forget_conversation] || opts[:list_presence]
end
defp blank?(value), do: is_nil(value) or String.trim(to_string(value)) == ""
defp write_presence_manifest(opts, files, cwd) do
session_id = Base.encode16(:crypto.strong_rand_bytes(16), case: :lower)
presence_dir = Path.expand(".antigravitycli/presence/sessions", cwd)
manifest_path = Path.join(presence_dir, "#{session_id}.json")
now = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()
files = normalize_presence_files(files, cwd)
manifest = %{
"session_id" => session_id,
"pid" => String.to_integer(System.pid()),
"mode" => opts[:mode],
"intent" => mode_intent(opts[:mode]),
"topics" => mode_topics(opts[:mode], files, opts),
"cwd" => cwd,
"files" => files,
"run_log" => opts[:run_log] || "none",
"log_file" => opts[:log_file] || "none",
"git_head" => get_git_head(cwd),
"dirty_files" => get_dirty_files(cwd),
"started_at" => now,
"updated_at" => now
}
prepare_runtime_path(manifest_path)
File.write!(manifest_path, JSON.encode!(manifest))
{manifest_path, manifest}
end
defp maybe_touch_and_evaluate(
manifest_path,
manifest,
now,
last_manifest_touch_at,
warned_sessions
) do
if now - last_manifest_touch_at >= @presence_heartbeat_ms do
manifest = touch_manifest(manifest_path, manifest)
{conflict_res, warned_sessions} =
evaluate_presence_conflicts(manifest, manifest["cwd"], warned_sessions)
{manifest, now, warned_sessions, conflict_res}
else
{manifest, last_manifest_touch_at, warned_sessions, :ok}
end
end
defp evaluate_presence_conflicts(current_manifest, cwd, warned_sessions \\ MapSet.new()) do
presence_dir = Path.expand(".antigravitycli/presence/sessions", cwd)
active_manifests =
case File.ls(presence_dir) do
{:ok, files} ->
files
|> Enum.filter(&String.ends_with?(&1, ".json"))
|> Enum.map(&Path.join(presence_dir, &1))
|> Enum.map(&read_and_prune_manifest/1)
|> Enum.reject(&is_nil/1)
|> Enum.reject(&(&1["session_id"] == current_manifest["session_id"]))
_ ->
[]
end
Enum.reduce_while(active_manifests, {:ok, warned_sessions}, fn other_manifest,
{_acc_res, acc_warned} ->
case evaluate_conflict(current_manifest, other_manifest) do
:ok ->
{:cont, {:ok, acc_warned}}
{:warning, msg} ->
other_id = other_manifest["session_id"]
acc_warned =
if not MapSet.member?(acc_warned, other_id) do
stdio_info(msg)
MapSet.put(acc_warned, other_id)
else
acc_warned
end
{:cont, {:ok, acc_warned}}
{:conflict, msg} ->
{:halt, {{:conflict, msg}, acc_warned}}
end
end)
end
defp evaluate_conflict(m1, m2) do
overlapping_topics =
MapSet.intersection(MapSet.new(manifest_topics(m1)), MapSet.new(manifest_topics(m2)))
|> MapSet.filter(&conflict_topic?/1)
if MapSet.size(overlapping_topics) > 0 do
intent1 = m1["intent"]
intent2 = m2["intent"]
cond do
intent1 == "read" and intent2 == "read" ->
:ok
intent1 in ["write", "commit"] and intent2 in ["write", "commit"] ->
if session_yields?(m1, m2) do
{:conflict, conflict_message(m1, m2, overlapping_topics)}
else
:ok
end
true ->
{:warning, warning_message(m1, m2, overlapping_topics)}
end
else
:ok
end
end
defp manifest_topics(%{"topics" => topics}) when is_list(topics) do
Enum.filter(topics, &is_binary/1)
end
defp manifest_topics(_manifest), do: []
defp conflict_topic?("file:" <> _path), do: true
defp conflict_topic?("git:" <> _name), do: true
defp conflict_topic?("conversation:" <> _name), do: true
defp conflict_topic?(_topic), do: false
defp session_yields?(m1, m2) do
presence_order_key(m1) > presence_order_key(m2)
end
defp presence_order_key(manifest) do
{parse_presence_started_at(manifest["started_at"]), to_string(manifest["session_id"])}
end
defp parse_presence_started_at(value) do
case DateTime.from_iso8601(to_string(value)) do
{:ok, datetime, _offset} -> DateTime.to_unix(datetime, :microsecond)
_ -> 0
end
end
defp format_session_details(m, overlapping_topics) do
topics_str = overlapping_topics |> MapSet.to_list() |> Enum.sort() |> Enum.join(", ")
"session=#{m["session_id"]} pid=#{m["pid"]} mode=#{m["mode"]} intent=#{m["intent"]} run_log=#{m["run_log"]} overlap=[#{topics_str}]"
end
defp warning_message(_m1, m2, overlapping_topics) do
"Warning: Active session overlap. #{format_session_details(m2, overlapping_topics)}"
end
defp conflict_message(_m1, m2, overlapping_topics) do
"Conflict: Yielding to active writer. #{format_session_details(m2, overlapping_topics)}"
end
defp touch_manifest(manifest_path, manifest) do
now = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()
manifest = Map.put(manifest, "updated_at", now)
File.write!(manifest_path, JSON.encode!(manifest))
manifest
rescue
_ -> manifest
end
defp list_presence(cwd) do
presence_dir = Path.expand(".antigravitycli/presence/sessions", cwd)
case File.ls(presence_dir) do
{:ok, files} ->
manifests =
files
|> Enum.filter(&String.ends_with?(&1, ".json"))
|> Enum.map(&Path.join(presence_dir, &1))
|> Enum.map(&read_and_prune_manifest/1)
|> Enum.reject(&is_nil/1)
if manifests == [] do
stdio_info("No active agy sessions.")
else
stdio_info(format_presence(manifests))
end
{:error, :enoent} ->
stdio_info("No active agy sessions.")
{:error, reason} ->
Mix.raise("Could not read presence dir: #{:file.format_error(reason)}")
end
end
defp read_and_prune_manifest(path) do
case File.read(path) do
{:ok, contents} ->
case JSON.decode(contents) do
{:ok, manifest} ->
if active_manifest?(manifest) do
manifest
else
File.rm(path)
nil
end
_ ->
File.rm(path)
nil
end
_ ->
nil
end
end
defp active_manifest?(%{"pid" => pid, "updated_at" => updated_at}) do
with {:ok, pid} <- parse_presence_pid(pid),
true <- process_alive?(pid) do
case DateTime.from_iso8601(updated_at) do
{:ok, updated_dt, _} ->
now = DateTime.utc_now()
diff = DateTime.diff(now, updated_dt, :second)
diff >= 0 and diff < @presence_stale_seconds
_ ->
false
end
else
_ -> false
end
end
defp active_manifest?(_), do: false
defp parse_presence_pid(pid) when is_integer(pid) and pid > 0, do: {:ok, pid}
defp parse_presence_pid(pid) when is_binary(pid) do
case Integer.parse(pid) do
{pid, ""} when pid > 0 -> {:ok, pid}
_ -> :error
end
end
defp parse_presence_pid(_pid), do: :error
defp format_presence(manifests) do
manifests
|> Enum.sort_by(& &1["started_at"])
|> Enum.map_join("\n", fn m ->
topics = Enum.join(m["topics"] || [], ", ")
"#{m["session_id"]}\tpid=#{m["pid"]}\tmode=#{m["mode"]}\tintent=#{m["intent"]}\ttopics=[#{topics}]\tupdated_at=#{m["updated_at"]}\trun_log=#{m["run_log"]}"
end)
end
defp mode_intent("ask"), do: "read"
defp mode_intent("review"), do: "read"
defp mode_intent("plan"), do: "read"
defp mode_intent("walkthrough"), do: "read"
defp mode_intent("refine"), do: "write"
defp mode_intent("commit-review"), do: "commit"
defp mode_intent(_), do: "read"
defp mode_topics(mode, files, opts) do
topics = ["repo:presence"]
topics =
case mode do
"commit-review" -> ["git:index", "git:HEAD" | topics]
_ -> topics
end
file_topics = Enum.map(files, &"file:#{&1}")
topics = topics ++ file_topics
topics =
if opts[:resolved_conversation] do
["conversation:#{opts[:resolved_conversation]}" | topics]
else
topics
end
Enum.uniq(topics)
end
defp normalize_presence_files(files, cwd) do
Enum.map(files, &normalize_presence_file(&1, cwd))
end
defp normalize_presence_file(file, cwd) do
cwd = Path.expand(cwd)
expanded = Path.expand(file, cwd)
relative = Path.relative_to(expanded, cwd)
if Path.type(relative) == :relative and not String.starts_with?(relative, "..") do
relative
else
expanded
end
end
defp get_git_head(cwd) do
case System.cmd("git", ["rev-parse", "HEAD"], cd: cwd, stderr_to_stdout: true) do
{output, 0} -> String.trim(output)
_ -> ""
end
rescue
_ -> ""
end
defp get_dirty_files(cwd) do
case System.cmd("git", ["status", "--porcelain"], cd: cwd, stderr_to_stdout: true) do
{output, 0} ->
output
|> String.split("\n", trim: true)
|> Enum.map(fn line -> String.slice(line, 3..-1//1) |> String.trim() end)
_ ->
[]
end
rescue
_ -> []
end
end