defmodule FLAMEDockerBackend do
@moduledoc """
Docker-out-of-Docker backend for [FLAME](https://github.com/phoenixframework/flame).
The parent runs inside a container and provisions runners via the Docker Engine API
through a mounted socket. Runners join the same user-defined network as the parent.
## Required configuration
config :flame, FLAMEDockerBackend,
image: "my-app:latest",
network: "my_network"
The parent container must use a distributed release, a stable `--name` on the Docker
network, and the Docker socket mounted. See the project README for deployment details.
## Optional configuration
* `:env` — extra runner env vars (`PHX_SERVER` may be overridden; `FLAME_PARENT` may not)
* `:host_config`, `:mounts`, `:cmd` — Docker create payload passthrough
* `:docker_socket_path` — path to the Docker socket (auto-detected when omitted)
* `:boot_timeout` — boot wait in ms (default `30_000`)
* `:keep_runners` — when `true`, leave runner containers after exit for log inspection
(default `false`, containers are removed)
* `:runner_hostname_env` — env var for runner hostname (default `"HOSTNAME"`)
* `:parent_hostname` — override parent Erlang hostname (defaults to current node host)
Per-pool overrides:
{FLAME.Pool, name: MyRunner, backend: {FLAMEDockerBackend, image: "...", network: "..."}}
"""
require Logger
alias FLAMEDockerBackend.DockerAPI
@behaviour FLAME.Backend
@config_keys [
:image,
:network,
:env,
:host_config,
:mounts,
:cmd,
:docker_socket_path,
:keep_runners,
:boot_timeout,
:runner_hostname_env,
:parent_hostname,
# auto-injected by FLAME
:terminator_sup,
:log
]
defstruct [
# TODO: can this be inferred automatically?
# Application name
:app,
# TODO: can we capture this from the Docker API and set to the current image by default?
# Docker image to use for runners
:image,
# Docker network to connect Parent with Runners
:network,
# Env vars to set on the runner node.
# `PHX_SERVER=false` and `FLAME_PARENT` (with encoded parent info) env vars will be automatically added to this map.
# `ERL_AFLAGS` and `ERL_ZFLAGS` will be copied from the Parent node and passed here too, if not explicitly defined.
:env,
# Docker HostConfig map for runner containers (resource limits, binds, etc.).
:host_config,
# Top-level Docker Mounts list for runner containers.
:mounts,
# Docker Cmd override for runner containers (list of strings).
:cmd,
# Path to the Docker API Unix socket. This socket needs to be mounted into the Parent's docker container.
# If not provided, a default value based on the operating system will be used.
:docker_socket_path,
# When true, leave exited runner containers for log inspection.
:keep_runners,
# How long to wait for the Runner to boot, in milliseconds. Defaults to 30 seconds.
:boot_timeout,
# Environment variable used to lookup Runner's node hostaname for node's longname. Defaults to "HOSTNAME".
:runner_hostname_env,
##
## Data setup during Runner's `init`
##
# Auto-generated runner container name / runner node basename (part of the node name before '@')
:runner_node_base,
# Auto-inferred Parent hostname
:parent_hostname,
# Auto-created Parent reference
:parent_ref,
##
## Data received on Runner's successful boot
##
# Docker container_id of the Runner
:runner_container_id,
# PID of the remote Terminator process
:remote_terminator_pid,
# Full Runner node name
:runner_node_name
]
@type t() :: %__MODULE__{
app: String.t(),
image: String.t(),
network: String.t(),
env: map(),
host_config: map() | nil,
mounts: list() | nil,
cmd: [String.t()] | nil,
docker_socket_path: String.t() | nil,
keep_runners: boolean(),
boot_timeout: pos_integer(),
runner_hostname_env: String.t(),
runner_node_base: String.t() | nil,
parent_hostname: String.t(),
parent_ref: reference() | nil,
runner_container_id: String.t() | nil,
remote_terminator_pid: pid() | nil,
runner_node_name: node() | nil
}
@impl true
@spec init(Keyword.t()) :: {:ok, t()} | {:error, any()}
def init(opts) do
[app, parent_hostname] = node() |> to_string() |> String.split("@")
conf = Application.get_env(:flame, __MODULE__) || []
with {:ok, opts} <- Keyword.merge(conf, opts) |> Keyword.validate(@config_keys),
# TODO: make this nicer
:ok <-
(case Keyword.get(opts, :image) do
image when is_binary(image) and image != "" -> :ok
_ -> {:error, {:missing_config, :image}}
end),
:ok <-
(case Keyword.get(opts, :network) do
network when is_binary(network) and network != "" -> :ok
_ -> {:error, {:missing_config, :network}}
end) do
default_opts = %__MODULE__{
app: app,
env: %{},
keep_runners: false,
boot_timeout: 30_000,
runner_hostname_env: "HOSTNAME",
parent_hostname: parent_hostname
}
opts = opts |> Map.new()
state = Map.merge(default_opts, opts)
runner_node_base = "#{state.app}-flame-#{rand_id(20)}"
state = %{state | runner_node_base: runner_node_base}
parent_ref = make_ref()
encoded_parent =
FLAME.Parent.new(parent_ref, self(), __MODULE__, state.runner_node_base, state.runner_hostname_env)
|> FLAME.Parent.encode()
env =
state.env
|> Map.put("FLAME_PARENT", encoded_parent)
|> Map.put_new("PHX_SERVER", "false")
|> then(fn e ->
if cookie = System.get_env("RELEASE_COOKIE"), do: Map.put_new(e, "RELEASE_COOKIE", cookie), else: e
end)
|> add_erl_flags()
state = %{state | env: env, parent_ref: parent_ref}
{:ok, state}
end
end
@impl true
@spec remote_boot(t()) :: {:ok, pid(), t()} | {:error, any()}
def remote_boot(%__MODULE__{parent_ref: parent_ref} = state) do
with {:ok, _httpc_profile_pid} <- DockerAPI.init(state.docker_socket_path),
{:ok, _version} <- DockerAPI.version(),
:ok <- maybe_pull_image(state.image),
{:ok, runner_container_id} <- DockerAPI.create_container(build_create_body(state)) do
case DockerAPI.start_container(runner_container_id) do
:ok ->
receive do
{^parent_ref, {:remote_up, remote_terminator_pid}} ->
unless state.keep_runners do
runner_pid = self()
spawn(fn ->
ref = Process.monitor(runner_pid)
receive do
{:DOWN, ^ref, :process, _, _} -> :ok
end
result = DockerAPI.stop_and_remove_container(runner_container_id)
Logger.debug("Removal of Runner via Docker API: #{inspect(result)}.")
end)
end
state = %{
state
| runner_container_id: runner_container_id,
remote_terminator_pid: remote_terminator_pid,
runner_node_name: node(remote_terminator_pid)
}
{:ok, remote_terminator_pid, state}
after
state.boot_timeout ->
Logger.error("Didn't receive terminator pid from the Runner container after #{state.boot_timeout} ms.")
unless state.keep_runners do
result = DockerAPI.stop_and_remove_container(runner_container_id)
Logger.debug("Removal of failed Runner via Docker API: #{inspect(result)}.")
end
{:error, :timeout}
end
{:error, _} = error ->
unless state.keep_runners do
result = DockerAPI.stop_and_remove_container(runner_container_id)
Logger.debug("Removal of failed Runner via Docker API: #{inspect(result)}.")
end
error
end
end
end
@impl true
@spec remote_spawn_monitor(t(), {atom(), atom(), list()} | (-> any())) :: {:ok, {pid(), reference()}}
def remote_spawn_monitor(state, func)
def remote_spawn_monitor(%__MODULE__{} = state, {mod, fun, args})
when is_atom(mod) and is_atom(fun) and is_list(args) do
{pid, ref} = Node.spawn_monitor(state.runner_node_name, mod, fun, args)
{:ok, {pid, ref}}
end
def remote_spawn_monitor(%__MODULE__{} = state, func) when is_function(func, 0) do
{pid, ref} = Node.spawn_monitor(state.runner_node_name, func)
{:ok, {pid, ref}}
end
@impl true
@spec system_shutdown() :: :ok
def system_shutdown() do
System.stop()
end
@doc false
@spec rand_id(pos_integer()) :: binary()
def rand_id(len) when is_integer(len) and len > 0 do
len
|> :crypto.strong_rand_bytes()
|> Base.encode16(case: :lower)
|> binary_part(0, len)
end
@doc false
@spec add_erl_flags(map()) :: map()
def add_erl_flags(env) do
env =
if flags = System.get_env("ERL_AFLAGS") do
Map.put_new(env, "ERL_AFLAGS", flags)
else
env
end
if flags = System.get_env("ERL_ZFLAGS") do
Map.put_new(env, "ERL_ZFLAGS", flags)
else
env
end
end
@spec build_create_body(t()) :: map()
defp build_create_body(%__MODULE__{} = state) do
%{
"Hostname" => state.runner_node_base,
"name" => state.runner_node_base,
"Image" => state.image,
"Env" => state.env |> Map.to_list() |> Enum.map(fn {k, v} -> "#{k}=#{v}" end),
"NetworkingConfig" => %{"EndpointsConfig" => %{state.network => %{}}}
}
|> then(fn body ->
if state.host_config, do: Map.put(body, "HostConfig", state.host_config), else: body
end)
|> then(fn body -> if state.cmd, do: Map.put(body, "Cmd", state.cmd), else: body end)
|> then(fn body -> if state.mounts, do: Map.put(body, "Mounts", state.mounts), else: body end)
end
@spec maybe_pull_image(String.t()) :: :ok | {:error, any()}
defp maybe_pull_image(image) do
if DockerAPI.image_exists?(image) do
Logger.debug("Image #{image} already present, skipping pull")
:ok
else
Logger.info("Pulling image #{image}")
case DockerAPI.pull_image(%{"fromImage" => image}) do
{:ok, _events} -> :ok
{:error, reason} -> {:error, reason}
end
end
end
end