defmodule Pi.Mirror.QuackDB do
@moduledoc """
Optional built-in DuckDB mirror for pi session/plugin events.
Enable with `PI_ELIXIR_MIRROR=quackdb`. The mirror is intentionally not a
model-facing tool: it is loaded by `Pi.Plugin.Manager` as a built-in plugin and
receives the same lifecycle/tool-hook events as other BEAM plugins.
"""
use Pi.Plugin
alias Pi.Plugin.UI
@table "pi_events"
@files_table "pi_session_files"
@default_batch_size 1
@sync_batch_size 5_000
@sync_progress_key :elixir_quack_sync
@fts_columns [
:event_type,
:cwd,
:session_file,
:session_name,
:leaf_id,
:tool_name,
:tool_call_id,
:payload_json
]
@session_fields %{
"cwd" => :cwd,
"sessionFile" => :session_file,
"sessionName" => :session_name,
"leafId" => :leaf_id
}
@columns [
id: :varchar,
event_type: :varchar,
cwd: :varchar,
session_file: :varchar,
session_name: :varchar,
leaf_id: :varchar,
turn_index: :bigint,
tool_name: :varchar,
tool_call_id: :varchar,
is_error: :boolean,
occurred_at: :timestamp,
payload_json: :varchar
]
def enabled?, do: Pi.Features.env_enabled?("PI_ELIXIR_MIRROR")
@impl true
def init(_opts) do
if enabled?() do
start_mirror()
else
{:ok, %{enabled?: false}}
end
end
command(name: :quack, description: "Show QuackDB mirror status")
command(name: :"quack.status", description: "Show QuackDB mirror status")
command(name: :"quack.sync", description: "Backfill pi sessions into the QuackDB mirror")
command(name: :"quack.index", description: "Rebuild the QuackDB mirror FTS index")
command(name: :"quack.search", description: "Search mirrored pi sessions with QuackDB FTS")
@impl true
def handle_event(event, %{enabled?: true} = state) when is_map(event) do
state = remember_session(state, event)
append(state, event_row(event, event["type"] || "event", event))
end
def handle_event(_event, state), do: {:noreply, state}
@impl true
def handle_command(:quack, args, state) do
state = ensure_enabled(state)
args
|> String.split(~r/\s+/, trim: true)
|> case do
["sync" | rest] ->
start_sync(state, rest)
["index" | _rest] ->
rebuild_fts_command(state)
["search" | query] ->
search_command(state, Enum.join(query, " "))
["status" | _rest] ->
{{:ok, status_text(state)}, state}
[] ->
{{:ok, status_text(state)}, state}
_other ->
{{:error, "Usage: /elixir:quack[.status|.sync [current|PATH]|.index|.search QUERY]"},
state}
end
end
def handle_command(:"quack.status", _args, state) do
state = ensure_enabled(state)
{{:ok, status_text(state)}, state}
end
def handle_command(:"quack.sync", args, state) do
state = ensure_enabled(state)
start_sync(state, String.split(args, ~r/\s+/, trim: true))
end
def handle_command(:"quack.index", _args, state) do
state = ensure_enabled(state)
rebuild_fts_command(state)
end
def handle_command(:"quack.search", args, state) do
state = ensure_enabled(state)
search_command(state, args)
end
@impl true
def tool_call(call, context, %{enabled?: true} = state) do
payload = %{"call" => call, "context" => context}
state =
state
|> remember_session(context)
|> append_row(%{
event_type: "tool_call_hook",
cwd: context["cwd"],
session_file: context["sessionFile"],
session_name: context["sessionName"],
leaf_id: context["leafId"],
tool_name: call["toolName"],
tool_call_id: call["toolCallId"],
is_error: false,
payload_json: encode_payload(payload)
})
{:ok, state}
end
def tool_call(_call, _context, state), do: {:ok, state}
@impl true
def tool_result(result, context, %{enabled?: true} = state) do
payload = %{"result" => result, "context" => context}
state =
state
|> remember_session(context)
|> append_row(%{
event_type: "tool_result_hook",
cwd: context["cwd"],
session_file: context["sessionFile"],
session_name: context["sessionName"],
leaf_id: context["leafId"],
tool_name: result["toolName"],
tool_call_id: result["toolCallId"],
is_error: result["isError"] == true,
payload_json: encode_payload(payload)
})
{:ok, state}
end
def tool_result(_result, _context, state), do: {:ok, state}
@impl true
def shutdown(%{supervisor: supervisor} = state) when is_pid(supervisor) do
_state = flush(state)
if Process.alive?(supervisor), do: Supervisor.stop(supervisor)
:ok
catch
:exit, _reason -> :ok
end
def shutdown(_state), do: :ok
defp start_mirror do
with :ok <- ensure_quackdb(),
{:ok, supervisor, conn} <- start_quackdb(),
:ok <- ensure_schema(conn) do
{:ok, %{enabled?: true, supervisor: supervisor, conn: conn, buffer: []}}
else
{:error, reason} ->
{:ok, %{enabled?: false, error: inspect(reason)}}
end
end
defp ensure_enabled(%{enabled?: true} = state), do: state
defp ensure_enabled(state) do
if enabled?() do
session_state = Map.take(state, Map.values(@session_fields))
case start_mirror() do
{:ok, %{enabled?: true} = next_state} -> Map.merge(next_state, session_state)
{:ok, next_state} -> next_state
end
else
state
end
end
defp ensure_quackdb do
case Application.ensure_all_started(:quackdb) do
{:ok, _apps} -> :ok
{:error, reason} -> {:error, reason}
end
end
defp start_quackdb do
server_name = __MODULE__.Server
client_name = __MODULE__.Client
token = "pi_elixir_mirror_#{System.unique_integer([:positive])}"
port = mirror_port()
endpoint = "quack:localhost:#{port}"
uri = System.get_env("PI_ELIXIR_MIRROR_QUACKDB_URI") || "http://localhost:#{port}"
server_opts =
[
name: server_name,
duckdb: mirror_duckdb(),
database: mirror_database(),
endpoint: endpoint,
uri: uri,
token: mirror_token(token),
wait_timeout: mirror_wait_timeout(),
poll_interval: 25
]
|> compact_keyword()
client_opts =
[
name: client_name,
uri: uri,
token: mirror_token(token),
pool_size: mirror_pool_size()
]
|> compact_keyword()
children =
if System.get_env("PI_ELIXIR_MIRROR_QUACKDB_URI") do
[{QuackDB, client_opts}]
else
QuackDB.Server.child_specs(server: server_opts, client: client_opts)
end
case Supervisor.start_link(children, strategy: :one_for_one) do
{:ok, supervisor} -> {:ok, supervisor, client_name}
{:error, reason} -> {:error, reason}
end
end
defp ensure_schema(conn) do
QuackDB.query!(conn, QuackDB.DDL.create_table(@table, @columns, if_not_exists: true))
QuackDB.query!(
conn,
QuackDB.DDL.create_table(@files_table, file_columns(), if_not_exists: true)
)
QuackDB.query!(conn, create_session_file_index())
:ok
rescue
exception in [QuackDB.Error, DBConnection.ConnectionError, RuntimeError, ArgumentError] ->
{:error, exception}
end
defp create_session_file_index do
[
"CREATE INDEX IF NOT EXISTS ",
QuackDB.Type.quote_identifier("pi_events_session_entry_file_idx"),
" ON ",
QuackDB.Type.quote_identifier(@table),
"(",
QuackDB.Type.quote_identifier(:event_type),
", ",
QuackDB.Type.quote_identifier(:session_file),
")"
]
end
defp file_columns do
[
{:session_file, :varchar, primary_key: true},
file_size: :bigint,
mtime_seconds: :bigint,
synced_entries: :bigint,
synced_at: :timestamp
]
end
defp status_text(%{enabled?: true} = state) do
session_file = Map.get(state, :session_file) || "none yet"
db = mirror_database()
"QuackDB mirror enabled · db=#{db} · session=#{session_file}"
end
defp status_text(%{error: error}), do: "QuackDB mirror unavailable · #{error}"
defp status_text(_state), do: "QuackDB mirror disabled · PI_ELIXIR_MIRROR disables it"
defp rebuild_fts_command(%{enabled?: true, conn: conn} = state) do
case refresh_fts_index(conn) do
:ok -> {{:ok, "🦆 FTS index rebuilt for #{@table}"}, state}
{:error, reason} -> {{:error, "🦆 FTS index failed · #{Exception.message(reason)}"}, state}
end
end
defp rebuild_fts_command(state), do: {{:error, status_text(state)}, state}
defp search_command(%{enabled?: true, conn: conn} = state, query) do
query = String.trim(query)
if query == "" do
{{:error, "Usage: /elixir:quack search QUERY"}, state}
else
case search_fts(conn, query) do
{:ok, []} -> {{:ok, "🦆 no matches"}, state}
{:ok, rows} -> {{:ok, format_search_results(rows)}, state}
{:error, reason} -> {{:error, "🦆 search failed · #{Exception.message(reason)}"}, state}
end
end
end
defp search_command(state, _query), do: {{:error, status_text(state)}, state}
defp refresh_fts_index(conn) do
QuackDB.query!(conn, QuackDB.FTS.install())
QuackDB.query!(conn, QuackDB.FTS.load())
QuackDB.query!(conn, QuackDB.FTS.create_index(@table, :id, @fts_columns, overwrite: true))
:ok
rescue
exception in [QuackDB.Error, DBConnection.ConnectionError, RuntimeError, ArgumentError] ->
{:error, exception}
end
defp search_fts(conn, query) do
score =
QuackDB.FTS.match_bm25(QuackDB.Type.quote_identifier(:id), query, schema: fts_schema())
result =
QuackDB.query!(
conn,
[
"SELECT ",
QuackDB.Type.quote_identifier(:id),
", ",
QuackDB.Type.quote_identifier(:event_type),
", ",
QuackDB.Type.quote_identifier(:session_file),
", ",
QuackDB.Type.quote_identifier(:tool_name),
", ",
QuackDB.Type.quote_identifier(:payload_json),
", ",
score,
" AS ",
QuackDB.Type.quote_identifier(:score),
" FROM ",
QuackDB.Type.quote_identifier(@table),
" WHERE ",
score,
" IS NOT NULL AND ",
score,
" > 0 ORDER BY ",
QuackDB.Type.quote_identifier(:score),
" DESC LIMIT ?"
],
[10],
timeout: 30_000
)
{:ok, result.rows || []}
rescue
exception in [QuackDB.Error, DBConnection.ConnectionError, RuntimeError, ArgumentError] ->
{:error, exception}
end
defp fts_schema, do: QuackDB.FTS.schema_name("main.#{@table}")
defp format_search_results(rows) do
rows
|> Enum.with_index(1)
|> Enum.map_join("\n", fn {[id, event_type, session_file, tool_name, payload_json, score],
index} ->
snippet =
payload_json |> to_string() |> String.replace(~r/\s+/, " ") |> String.slice(0, 160)
file = session_file_label(to_string(session_file || ""))
tool = if tool_name in [nil, ""], do: "", else: " · #{tool_name}"
"#{index}. #{Float.round(score * 1.0, 3)} · #{event_type}#{tool} · #{file} · #{id}\n #{snippet}"
end)
|> then(&"🦆 search results\n#{&1}")
end
defp start_sync(%{enabled?: true} = state, args) do
state = flush(state)
Task.start(fn -> run_sync(state, args) end)
{{:ok, "🦆 sync started in background"}, state}
end
defp start_sync(state, _args), do: {{:error, status_text(state)}, state}
defp run_sync(state, args) do
case sync_files(state, args) do
{:ok, files} ->
sync_session_files(state, files)
{:error, message} ->
UI.notify(message, type: :error)
UI.clear_status(@sync_progress_key)
end
catch
kind, reason ->
message = Exception.format(kind, reason, __STACKTRACE__)
UI.notify("🦆 sync crashed · #{inspect(reason)}", type: :error)
UI.clear_status(@sync_progress_key)
File.write(Path.join(System.tmp_dir!(), "pi-elixir-quack-sync-crash.log"), message)
:ok
end
defp sync_files(_state, []), do: {:ok, discover_session_files()}
defp sync_files(%{session_file: session_file}, ["current" | _rest])
when is_binary(session_file) and session_file != "" do
{:ok, [session_file]}
end
defp sync_files(_state, ["current" | _rest]) do
{:error,
"No current session file observed yet; use /elixir:quack sync to backfill all sessions."}
end
defp sync_files(_state, [path | _rest]) do
path = Path.expand(path)
cond do
File.dir?(path) -> {:ok, session_files_under(path)}
File.regular?(path) -> {:ok, [path]}
true -> {:error, "Session path not found: #{path}"}
end
end
defp sync_session_files(_state, []) do
message = "🦆 synced 0 entries from 0 files"
UI.notify(message)
UI.clear_status(@sync_progress_key)
:ok
end
defp sync_session_files(state, files) do
files = Enum.uniq(files)
total = length(files)
started_at = System.monotonic_time(:millisecond)
UI.set_progress(@sync_progress_key, title: "🦆 sync", current: 0, total: total)
UI.set_status(@sync_progress_key, "🦆 sync starting · #{total} files")
{entries, failed} =
files
|> Enum.with_index(1)
|> Enum.reduce({0, 0}, fn {file, index}, {entries, failed} ->
UI.set_progress(@sync_progress_key,
title: "🦆 sync #{session_file_label(file)}",
current: index,
total: total
)
case sync_session_file(state, file, index, total) do
{:ok, count} ->
synced = entries + count
UI.set_status(
@sync_progress_key,
"🦆 sync · #{index}/#{total} files · #{synced} rows"
)
{synced, failed}
:skipped ->
UI.set_status(
@sync_progress_key,
"🦆 sync · #{index}/#{total} files · skipped unchanged · #{entries} rows"
)
{entries, failed}
{:error, _reason} ->
UI.set_status(
@sync_progress_key,
"🦆 sync · #{index}/#{total} files · #{failed + 1} failed"
)
{entries, failed + 1}
end
end)
elapsed = max(System.monotonic_time(:millisecond) - started_at, 1)
ok_files = total - failed
rows_per_second = div(entries * 1_000, elapsed)
message =
"🦆 synced #{entries} entries from #{ok_files}/#{total} files · #{rows_per_second} rows/s"
if entries > 0 and failed == 0 do
UI.set_status(@sync_progress_key, "🦆 sync · rebuilding FTS index")
case refresh_fts_index(state.conn) do
:ok ->
:ok
{:error, reason} ->
UI.notify("🦆 FTS index failed · #{Exception.message(reason)}", type: :error)
end
end
UI.notify(message)
UI.clear_status(@sync_progress_key)
:ok
rescue
exception in [File.Error, RuntimeError, QuackDB.Error, DBConnection.ConnectionError] ->
message = Exception.message(exception)
UI.notify("🦆 sync failed · #{message}", type: :error)
UI.clear_status(@sync_progress_key)
:ok
end
defp sync_session_file(state, session_file, index, total) do
if File.regular?(session_file) do
sync_regular_session_file(state, session_file, index, total)
else
{:error, "Session file not found: #{session_file}"}
end
rescue
exception in [File.Error, RuntimeError, QuackDB.Error, DBConnection.ConnectionError] ->
{:error, Exception.message(exception)}
end
defp sync_regular_session_file(%{conn: conn} = state, session_file, index, total) do
state = flush(state)
label = session_file_label(session_file)
UI.set_status(@sync_progress_key, "🦆 sync #{label} · #{index}/#{total} files · checking")
before_stat = session_file_stat!(session_file)
case sync_plan(conn, session_file, before_stat) do
:skip ->
:skipped
{:append, offset, previous_count} ->
UI.set_status(@sync_progress_key, "🦆 sync #{label} · #{index}/#{total} files · appending")
import_session_file(conn, session_file, offset, previous_count, before_stat, index, total)
:replace ->
UI.set_status(
@sync_progress_key,
"🦆 sync #{label} · #{index}/#{total} files · deleting old rows"
)
delete_session_entries(state, session_file)
UI.set_status(@sync_progress_key, "🦆 sync #{label} · #{index}/#{total} files · importing")
import_session_file(conn, session_file, 0, 0, before_stat, index, total)
end
end
defp import_session_file(conn, session_file, offset, previous_count, before_stat, index, total) do
count =
session_file
|> stream_session_lines(offset)
|> Stream.chunk_every(sync_batch_size())
|> Enum.reduce(0, fn lines, count ->
QuackDB.insert_columns!(conn, @table, session_entry_columns(session_file, lines),
columns: @columns,
timeout: :infinity
)
count = count + length(lines)
UI.set_status(
@sync_progress_key,
"🦆 sync #{session_file_label(session_file)} · #{index}/#{total} files · #{count} rows"
)
count
end)
after_stat = session_file_stat!(session_file)
if before_stat == after_stat,
do: remember_synced_file(conn, session_file, after_stat, previous_count + count)
{:ok, count}
end
defp stream_session_lines(session_file, offset) do
Stream.resource(
fn ->
io = File.open!(session_file, [:read, :binary, read_ahead: 1_000_000])
{:ok, ^offset} = :file.position(io, offset)
io
end,
fn io ->
case IO.binread(io, :line) do
:eof -> {:halt, io}
line -> {[String.trim_trailing(line, "\n")], io}
end
end,
&File.close/1
)
|> Stream.reject(&(&1 == ""))
end
defp session_entry_columns(session_file, lines) do
size = length(lines)
now = NaiveDateTime.utc_now(:microsecond)
nils = List.duplicate(nil, size)
[
id: Enum.map(1..size, fn _ -> unique_id() end),
event_type: List.duplicate("session_entry", size),
cwd: nils,
session_file: List.duplicate(session_file, size),
session_name: nils,
leaf_id: nils,
turn_index: nils,
tool_name: nils,
tool_call_id: nils,
is_error: List.duplicate(false, size),
occurred_at: List.duplicate(now, size),
payload_json: lines
]
end
defp session_file_stat!(session_file) do
stat = File.stat!(session_file, time: :posix)
%{size: stat.size, mtime_seconds: stat.mtime}
end
defp sync_plan(conn, session_file, %{size: size, mtime_seconds: mtime_seconds}) do
case synced_file_metadata(conn, session_file) do
%{size: ^size, mtime_seconds: ^mtime_seconds} ->
:skip
%{size: old_size, synced_entries: synced_entries}
when is_integer(old_size) and old_size < size ->
{:append, old_size, synced_entries || 0}
_metadata ->
:replace
end
end
defp synced_file_metadata(conn, session_file) do
rows =
QuackDB.query!(
conn,
[
"SELECT ",
QuackDB.Type.quote_identifier(:file_size),
", ",
QuackDB.Type.quote_identifier(:mtime_seconds),
", ",
QuackDB.Type.quote_identifier(:synced_entries),
" FROM ",
QuackDB.Type.quote_identifier(@files_table),
" WHERE ",
QuackDB.Type.quote_identifier(:session_file),
" = ? LIMIT 1"
],
[session_file],
timeout: 30_000
).rows || []
case rows do
[[size, mtime_seconds, synced_entries] | _rest] ->
%{size: size, mtime_seconds: mtime_seconds, synced_entries: synced_entries}
_other ->
nil
end
end
defp remember_synced_file(
conn,
session_file,
%{size: size, mtime_seconds: mtime_seconds},
count
) do
delete_file_metadata(conn, session_file)
QuackDB.insert_rows!(
conn,
@files_table,
[
%{
session_file: session_file,
file_size: size,
mtime_seconds: mtime_seconds,
synced_entries: count,
synced_at: NaiveDateTime.utc_now(:microsecond)
}
],
columns: [
session_file: :varchar,
file_size: :bigint,
mtime_seconds: :bigint,
synced_entries: :bigint,
synced_at: :timestamp
],
timeout: :infinity
)
:ok
end
defp discover_session_files do
session_roots()
|> Enum.flat_map(&session_files_under/1)
|> Enum.uniq()
end
defp session_roots do
[
System.get_env("PI_CODING_AGENT_SESSION_DIR"),
Path.join([System.user_home!(), ".pi", "agent", "sessions"])
]
|> Enum.reject(&(&1 in [nil, ""]))
|> Enum.map(&Path.expand/1)
|> Enum.uniq()
end
defp session_files_under(root) do
root
|> Path.join("**/*.jsonl")
|> Path.wildcard()
|> Enum.filter(&File.regular?/1)
end
defp session_file_label(file) do
file
|> Path.basename(".jsonl")
|> String.split("_", parts: 2)
|> List.last()
end
defp delete_session_entries(%{conn: conn}, session_file) do
QuackDB.query!(
conn,
[
"DELETE FROM ",
QuackDB.Type.quote_identifier(@table),
" WHERE ",
QuackDB.Type.quote_identifier(:event_type),
" = ? AND ",
QuackDB.Type.quote_identifier(:session_file),
" = ?"
],
["session_entry", session_file],
timeout: :infinity
)
:ok
end
defp delete_session_entries(_state, _session_file), do: :ok
defp delete_file_metadata(conn, session_file) do
QuackDB.query!(
conn,
[
"DELETE FROM ",
QuackDB.Type.quote_identifier(@files_table),
" WHERE ",
QuackDB.Type.quote_identifier(:session_file),
" = ?"
],
[session_file],
timeout: :infinity
)
end
defp remember_session(state, event) when is_map(event) do
Map.merge(state, session_attrs(event))
end
defp session_attrs(event) do
@session_fields
|> Map.new(fn {source, target} -> {target, event[source]} end)
|> Enum.reject(fn {_key, value} -> value in [nil, ""] end)
|> Map.new()
end
defp event_row(event, event_type, payload) do
%{
event_type: event_type,
cwd: event["cwd"],
session_file: event["sessionFile"],
session_name: event["sessionName"],
leaf_id: event["leafId"],
turn_index: event["turnIndex"],
tool_name: event["name"] || event["toolName"],
tool_call_id: event["toolCallId"],
is_error: event["isError"] == true,
payload_json: encode_payload(payload)
}
end
defp append(state, row), do: {:noreply, append_row(state, row)}
defp append_row(%{buffer: buffer} = state, row) do
buffer = [normalize_row(row) | buffer]
if length(buffer) >= mirror_batch_size() do
flush(%{state | buffer: buffer})
else
%{state | buffer: buffer}
end
end
defp flush(%{conn: conn, buffer: buffer} = state) when buffer != [] do
rows = Enum.reverse(buffer)
QuackDB.insert_rows!(conn, @table, rows, columns: @columns, batch_size: mirror_batch_size())
%{state | buffer: []}
rescue
_exception in [QuackDB.Error, DBConnection.ConnectionError, RuntimeError, ArgumentError] ->
%{state | buffer: []}
end
defp flush(state), do: state
defp normalize_row(row) do
now = NaiveDateTime.utc_now(:microsecond)
@columns
|> Keyword.keys()
|> Map.new(fn key -> {key, normalize_value(key, Map.get(row, key), now)} end)
end
defp normalize_value(:id, nil, _now), do: unique_id()
defp normalize_value(:occurred_at, nil, now), do: now
defp normalize_value(:turn_index, value, _now) when is_integer(value), do: value
defp normalize_value(:turn_index, _value, _now), do: nil
defp normalize_value(:is_error, value, _now), do: value == true
defp normalize_value(_key, value, _now) when is_binary(value) or is_nil(value), do: value
defp normalize_value(_key, value, _now), do: to_string(value)
defp encode_payload(payload) do
payload
|> JSONCodec.dump()
|> Jason.encode!()
rescue
_exception in [Jason.EncodeError, Protocol.UndefinedError, FunctionClauseError, ArgumentError] ->
inspect(payload, limit: 50, printable_limit: 1_000)
end
defp unique_id do
System.unique_integer([:positive, :monotonic])
|> Integer.to_string()
end
defp mirror_database do
path =
System.get_env("PI_ELIXIR_MIRROR_DB") ||
Path.join([System.user_home!(), ".pi", "elixir", "session-mirror.duckdb"])
File.mkdir_p!(Path.dirname(path))
path
end
defp mirror_duckdb do
case System.get_env("PI_ELIXIR_MIRROR_DUCKDB") do
nil -> :managed
"managed" -> :managed
path -> path
end
end
defp mirror_token(default), do: System.get_env("PI_ELIXIR_MIRROR_QUACKDB_TOKEN") || default
defp mirror_port do
case System.get_env("PI_ELIXIR_MIRROR_QUACKDB_PORT") do
nil -> 20_000 + rem(System.unique_integer([:positive]), 30_000)
port -> String.to_integer(port)
end
end
defp mirror_pool_size do
System.get_env("PI_ELIXIR_MIRROR_POOL_SIZE", "1") |> String.to_integer()
end
defp mirror_batch_size do
System.get_env("PI_ELIXIR_MIRROR_BATCH_SIZE", Integer.to_string(@default_batch_size))
|> String.to_integer()
end
defp sync_batch_size do
System.get_env("PI_ELIXIR_MIRROR_SYNC_BATCH_SIZE", Integer.to_string(@sync_batch_size))
|> String.to_integer()
end
defp mirror_wait_timeout do
System.get_env("PI_ELIXIR_MIRROR_WAIT_TIMEOUT", "10000") |> String.to_integer()
end
defp compact_keyword(keyword), do: Enum.reject(keyword, fn {_key, value} -> is_nil(value) end)
end