Skip to main content

lib/flame_docker_backend.ex

defmodule FLAMEDockerBackend do
  @moduledoc """
  Docker-out-of-Docker backend for [FLAME](https://github.com/phoenixframework/flame).

  The parent runs inside a container and provisions runners via the Docker Engine API
  through a mounted socket. Runners join the same user-defined network as the parent.

  ## Required configuration

      config :flame, FLAMEDockerBackend,
        image: "my-app:latest",
        network: "my_network"

  The parent container must use a distributed release, a stable `--name` on the Docker
  network, and the Docker socket mounted. See the project README for deployment details.

  ## Optional configuration

    * `:env` — extra runner env vars (`PHX_SERVER` may be overridden; `FLAME_PARENT` may not)
    * `:host_config`, `:mounts`, `:cmd` — Docker create payload passthrough
    * `:docker_socket_path` — path to the Docker socket (auto-detected when omitted)
    * `:boot_timeout` — boot wait in ms (default `30_000`)
    * `:keep_runners` — when `true`, leave runner containers after exit for log inspection
      (default `false`, containers are removed)
    * `:runner_hostname_env` — env var for runner hostname (default `"HOSTNAME"`)
    * `:parent_hostname` — override parent Erlang hostname (defaults to current node host)

  Per-pool overrides:

      {FLAME.Pool, name: MyRunner, backend: {FLAMEDockerBackend, image: "...", network: "..."}}
  """
  require Logger
  alias FLAMEDockerBackend.DockerAPI

  @behaviour FLAME.Backend

  @config_keys [
    :image,
    :network,
    :env,
    :host_config,
    :mounts,
    :cmd,
    :docker_socket_path,
    :keep_runners,
    :boot_timeout,
    :runner_hostname_env,
    :parent_hostname,
    # auto-injected by FLAME
    :terminator_sup,
    :log
  ]

  defstruct [
    # TODO: can this be inferred automatically?
    # Application name
    :app,
    # TODO: can we capture this from the Docker API and set to the current image by default?
    # Docker image to use for runners
    :image,
    # Docker network to connect Parent with Runners
    :network,
    # Env vars to set on the runner node.
    # `PHX_SERVER=false` and `FLAME_PARENT` (with encoded parent info) env vars will be automatically added to this map.
    # `ERL_AFLAGS` and `ERL_ZFLAGS` will be copied from the Parent node and passed here too, if not explicitly defined.
    :env,
    # Docker HostConfig map for runner containers (resource limits, binds, etc.).
    :host_config,
    # Top-level Docker Mounts list for runner containers.
    :mounts,
    # Docker Cmd override for runner containers (list of strings).
    :cmd,
    # Path to the Docker API Unix socket. This socket needs to be mounted into the Parent's docker container.
    # If not provided, a default value based on the operating system will be used.
    :docker_socket_path,
    # When true, leave exited runner containers for log inspection.
    :keep_runners,
    # How long to wait for the Runner to boot, in milliseconds. Defaults to 30 seconds.
    :boot_timeout,
    # Environment variable used to lookup Runner's node hostaname for node's longname. Defaults to "HOSTNAME".
    :runner_hostname_env,
    ##
    ## Data setup during Runner's `init`
    ##
    # Auto-generated runner container name / runner node basename (part of the node name before '@')
    :runner_node_base,
    # Auto-inferred Parent hostname
    :parent_hostname,
    # Auto-created Parent reference
    :parent_ref,
    ##
    ## Data received on Runner's successful boot
    ##
    # Docker container_id of the Runner
    :runner_container_id,
    # PID of the remote Terminator process
    :remote_terminator_pid,
    # Full Runner node name
    :runner_node_name
  ]

  @type t() :: %__MODULE__{
          app: String.t(),
          image: String.t(),
          network: String.t(),
          env: map(),
          host_config: map() | nil,
          mounts: list() | nil,
          cmd: [String.t()] | nil,
          docker_socket_path: String.t() | nil,
          keep_runners: boolean(),
          boot_timeout: pos_integer(),
          runner_hostname_env: String.t(),
          runner_node_base: String.t() | nil,
          parent_hostname: String.t(),
          parent_ref: reference() | nil,
          runner_container_id: String.t() | nil,
          remote_terminator_pid: pid() | nil,
          runner_node_name: node() | nil
        }

  @impl true
  @spec init(Keyword.t()) :: {:ok, t()} | {:error, any()}
  def init(opts) do
    [app, parent_hostname] = node() |> to_string() |> String.split("@")

    conf = Application.get_env(:flame, __MODULE__) || []

    with {:ok, opts} <- Keyword.merge(conf, opts) |> Keyword.validate(@config_keys),
         # TODO: make this nicer
         :ok <-
           (case Keyword.get(opts, :image) do
              image when is_binary(image) and image != "" -> :ok
              _ -> {:error, {:missing_config, :image}}
            end),
         :ok <-
           (case Keyword.get(opts, :network) do
              network when is_binary(network) and network != "" -> :ok
              _ -> {:error, {:missing_config, :network}}
            end) do
      default_opts = %__MODULE__{
        app: app,
        env: %{},
        keep_runners: false,
        boot_timeout: 30_000,
        runner_hostname_env: "HOSTNAME",
        parent_hostname: parent_hostname
      }

      opts = opts |> Map.new()
      state = Map.merge(default_opts, opts)

      runner_node_base = "#{state.app}-flame-#{rand_id(20)}"
      state = %{state | runner_node_base: runner_node_base}

      parent_ref = make_ref()

      encoded_parent =
        FLAME.Parent.new(parent_ref, self(), __MODULE__, state.runner_node_base, state.runner_hostname_env)
        |> FLAME.Parent.encode()

      env =
        state.env
        |> Map.put("FLAME_PARENT", encoded_parent)
        |> Map.put_new("PHX_SERVER", "false")
        |> then(fn e ->
          if cookie = System.get_env("RELEASE_COOKIE"), do: Map.put_new(e, "RELEASE_COOKIE", cookie), else: e
        end)
        |> add_erl_flags()

      state = %{state | env: env, parent_ref: parent_ref}
      {:ok, state}
    end
  end

  @impl true
  @spec remote_boot(t()) :: {:ok, pid(), t()} | {:error, any()}
  def remote_boot(%__MODULE__{parent_ref: parent_ref} = state) do
    with {:ok, _httpc_profile_pid} <- DockerAPI.init(state.docker_socket_path),
         {:ok, _version} <- DockerAPI.version(),
         :ok <- maybe_pull_image(state.image),
         {:ok, runner_container_id} <- DockerAPI.create_container(build_create_body(state)) do
      case DockerAPI.start_container(runner_container_id) do
        :ok ->
          receive do
            {^parent_ref, {:remote_up, remote_terminator_pid}} ->
              unless state.keep_runners do
                runner_pid = self()

                spawn(fn ->
                  ref = Process.monitor(runner_pid)

                  receive do
                    {:DOWN, ^ref, :process, _, _} -> :ok
                  end

                  result = DockerAPI.stop_and_remove_container(runner_container_id)
                  Logger.debug("Removal of Runner via Docker API: #{inspect(result)}.")
                end)
              end

              state = %{
                state
                | runner_container_id: runner_container_id,
                  remote_terminator_pid: remote_terminator_pid,
                  runner_node_name: node(remote_terminator_pid)
              }

              {:ok, remote_terminator_pid, state}
          after
            state.boot_timeout ->
              Logger.error("Didn't receive terminator pid from the Runner container after #{state.boot_timeout} ms.")

              unless state.keep_runners do
                result = DockerAPI.stop_and_remove_container(runner_container_id)
                Logger.debug("Removal of failed Runner via Docker API: #{inspect(result)}.")
              end

              {:error, :timeout}
          end

        {:error, _} = error ->
          unless state.keep_runners do
            result = DockerAPI.stop_and_remove_container(runner_container_id)
            Logger.debug("Removal of failed Runner via Docker API: #{inspect(result)}.")
          end

          error
      end
    end
  end

  @impl true
  @spec remote_spawn_monitor(t(), {atom(), atom(), list()} | (-> any())) :: {:ok, {pid(), reference()}}
  def remote_spawn_monitor(state, func)

  def remote_spawn_monitor(%__MODULE__{} = state, {mod, fun, args})
      when is_atom(mod) and is_atom(fun) and is_list(args) do
    {pid, ref} = Node.spawn_monitor(state.runner_node_name, mod, fun, args)
    {:ok, {pid, ref}}
  end

  def remote_spawn_monitor(%__MODULE__{} = state, func) when is_function(func, 0) do
    {pid, ref} = Node.spawn_monitor(state.runner_node_name, func)
    {:ok, {pid, ref}}
  end

  @impl true
  @spec system_shutdown() :: :ok
  def system_shutdown() do
    System.stop()
  end

  @doc false
  @spec rand_id(pos_integer()) :: binary()
  def rand_id(len) when is_integer(len) and len > 0 do
    len
    |> :crypto.strong_rand_bytes()
    |> Base.encode16(case: :lower)
    |> binary_part(0, len)
  end

  @doc false
  @spec add_erl_flags(map()) :: map()
  def add_erl_flags(env) do
    env =
      if flags = System.get_env("ERL_AFLAGS") do
        Map.put_new(env, "ERL_AFLAGS", flags)
      else
        env
      end

    if flags = System.get_env("ERL_ZFLAGS") do
      Map.put_new(env, "ERL_ZFLAGS", flags)
    else
      env
    end
  end

  @spec build_create_body(t()) :: map()
  defp build_create_body(%__MODULE__{} = state) do
    %{
      "Hostname" => state.runner_node_base,
      "name" => state.runner_node_base,
      "Image" => state.image,
      "Env" => state.env |> Map.to_list() |> Enum.map(fn {k, v} -> "#{k}=#{v}" end),
      "NetworkingConfig" => %{"EndpointsConfig" => %{state.network => %{}}}
    }
    |> then(fn body ->
      if state.host_config, do: Map.put(body, "HostConfig", state.host_config), else: body
    end)
    |> then(fn body -> if state.cmd, do: Map.put(body, "Cmd", state.cmd), else: body end)
    |> then(fn body -> if state.mounts, do: Map.put(body, "Mounts", state.mounts), else: body end)
  end

  @spec maybe_pull_image(String.t()) :: :ok | {:error, any()}
  defp maybe_pull_image(image) do
    if DockerAPI.image_exists?(image) do
      Logger.debug("Image #{image} already present, skipping pull")
      :ok
    else
      Logger.info("Pulling image #{image}")

      case DockerAPI.pull_image(%{"fromImage" => image}) do
        {:ok, _events} -> :ok
        {:error, reason} -> {:error, reason}
      end
    end
  end
end