lib/pkcs11ex/pkcs12.ex

defmodule Pkcs11ex.PKCS12 do
  @moduledoc """
  Read-only loader for PKCS#12 (`.p12` / `.pfx`) bundles.

  **Never returns private key material**, even if the bundle contains one — only
  a `:has_private_key` boolean. This is a deliberate design choice: `pkcs11ex`
  does not sign with software keys (`specs.md` §10 Non-Goals). Use this loader
  for trust-material extraction (`x5c` certs, trust anchors) and pair it with
  `mix pkcs11ex.import_p12` (Phase 2) when you want to provision the key into a
  PKCS#11 token.

  ## Implementation

  v1 shells out to the `openssl pkcs12` CLI (passwords passed via env vars,
  never command line). OTP's `:public_key` doesn't ship PKCS#12 support, and
  rolling our own ASN.1 parser for the format's many encryption variants is a
  significant undertaking. A native Rust replacement via the existing Rustler
  bridge is on the roadmap; the public API is stable across the swap.

  Requires `openssl` on `PATH`.
  """

  alias SignCore.X509

  defmodule Bundle do
    @moduledoc """
    A parsed PKCS#12 bundle.

    `:leaf` is the first cert in the bundle (the end-entity cert in typical
    tax-certificate bundles); `:chain` is the rest in bundle order. Use
    `SignCore.X509.spki_sha256/1` if you need the leaf's pinning hash.

    `:has_private_key` is `true` when the bundle's structure includes a key
    bag (whether or not the password was correct enough to decrypt it). The
    actual key bytes are never part of this struct.

    `:friendly_name` is `nil` in v1; PKCS#12 friendlyName extraction lands
    with the native parser.
    """
    defstruct [:leaf, :chain, :has_private_key, :friendly_name]

    @type t :: %__MODULE__{
            leaf: X509.t(),
            chain: [X509.t()],
            has_private_key: boolean(),
            friendly_name: String.t() | nil
          }
  end

  @type source :: Path.t() | binary()
  @type opts :: [
          password: binary(),
          max_chain: pos_integer()
        ]

  @doc """
  Loads, parses, and returns a `Pkcs11ex.PKCS12.Bundle`.

  `source` is either a filesystem path (string that resolves to a regular file)
  or raw bundle bytes (binary that does not).

  ## Options

    * `:password` — bundle password. Required for encrypted bundles. Pass `nil`
      or omit only for the rare unencrypted bundle.
    * `:max_chain` — hard cap on chain length (default `8`). Bundles with more
      certificates fail with `:p12_chain_too_long`.

  ## Errors

  Returns one of:
    * `:p12_invalid` — bundle bytes are malformed or unparseable.
    * `:p12_password_incorrect` — decryption failed.
    * `:p12_chain_too_long` — chain exceeded `:max_chain`.
    * `{:p12_unsupported_kdf, oid}` — bundle uses a KDF/cipher OID OpenSSL doesn't recognize.
    * `:openssl_not_found` — the `openssl` CLI is not on `PATH`.
  """
  @spec load(source(), opts()) :: {:ok, Bundle.t()} | {:error, term()}
  def load(source, opts \\ [])

  def load(source, opts) when is_binary(source) do
    case openssl_executable() do
      nil ->
        {:error, :openssl_not_found}

      openssl ->
        bytes =
          if File.regular?(source) do
            File.read!(source)
          else
            source
          end

        do_load(bytes, opts, openssl)
    end
  end

  @doc "Same as `load/2` but raises `Pkcs11ex.Error` on failure."
  @spec load!(source(), opts()) :: Bundle.t()
  def load!(source, opts \\ []) do
    case load(source, opts) do
      {:ok, bundle} -> bundle
      {:error, reason} -> raise Pkcs11ex.Error, reason: reason
    end
  end

  # ---------- Internals ----------

  defp do_load(bytes, opts, openssl) do
    password = Keyword.get(opts, :password) || ""
    max_chain = Keyword.get(opts, :max_chain, 8)

    with_tmp(bytes, fn path ->
      with {:ok, pem_certs} <- run_extract_certs(openssl, path, password),
           {:ok, ders} <- decode_pem_certs(pem_certs),
           :ok <- check_chain_length(ders, max_chain),
           {:ok, certs} <- decode_x509s(ders),
           [leaf | chain] <- certs,
           {:ok, has_pk} <- detect_private_key(openssl, path, password) do
        {:ok, %Bundle{leaf: leaf, chain: chain, has_private_key: has_pk, friendly_name: nil}}
      else
        # do_load expects a single result; tuple-pattern in `with` drops through
        # to the else branch as-is, so existing {:error, _} pass through.
        [] -> {:error, :p12_invalid}
        {:error, _} = err -> err
      end
    end)
  end

  defp run_extract_certs(openssl, path, password) do
    args = [
      "pkcs12",
      "-in",
      path,
      "-password",
      "env:PKCS11EX_P12_PWD",
      "-nokeys",
      "-nodes"
    ]

    case System.cmd(openssl, args,
           env: [{"PKCS11EX_P12_PWD", password}],
           stderr_to_stdout: true
         ) do
      {pem, 0} -> {:ok, pem}
      {output, _} -> {:error, classify_openssl_error(output)}
    end
  end

  defp detect_private_key(openssl, path, password) do
    args = [
      "pkcs12",
      "-in",
      path,
      "-password",
      "env:PKCS11EX_P12_PWD",
      "-info",
      "-nokeys",
      "-noout"
    ]

    case System.cmd(openssl, args,
           env: [{"PKCS11EX_P12_PWD", password}],
           stderr_to_stdout: true
         ) do
      {output, 0} ->
        # OpenSSL's -info output mentions "Keybag" or "Shrouded Keybag" when a
        # private key bag is present in the PKCS#12 structure. We don't
        # decrypt the key — we only confirm its presence.
        has_key = String.contains?(output, "Keybag") or String.contains?(output, "Shrouded")

        {:ok, has_key}

      _ ->
        # If -info itself failed, default conservatively to false; the cert
        # extraction must have succeeded for us to reach here, so the bundle
        # is structurally valid.
        {:ok, false}
    end
  end

  defp decode_pem_certs(pem) do
    case :public_key.pem_decode(pem) do
      [] ->
        {:error, :p12_invalid}

      entries ->
        ders = for {:Certificate, der, _} <- entries, do: der

        if ders == [], do: {:error, :p12_invalid}, else: {:ok, ders}
    end
  end

  defp decode_x509s(ders) do
    Enum.reduce_while(ders, {:ok, []}, fn der, {:ok, acc} ->
      case X509.from_der(der) do
        {:ok, cert} -> {:cont, {:ok, [cert | acc]}}
        err -> {:halt, err}
      end
    end)
    |> case do
      {:ok, list} -> {:ok, Enum.reverse(list)}
      err -> err
    end
  end

  defp check_chain_length(ders, max_chain) when length(ders) > max_chain,
    do: {:error, :p12_chain_too_long}

  defp check_chain_length(_, _), do: :ok

  defp classify_openssl_error(output) do
    out_lower = String.downcase(output)

    cond do
      String.contains?(out_lower, "mac verify failure") ->
        :p12_password_incorrect

      String.contains?(out_lower, "wrong password") ->
        :p12_password_incorrect

      String.contains?(out_lower, "invalid password") ->
        :p12_password_incorrect

      String.contains?(out_lower, "unknown algorithm") ->
        unsupported_kdf_from_output(output)

      String.contains?(out_lower, "unsupported algorithm") ->
        unsupported_kdf_from_output(output)

      true ->
        :p12_invalid
    end
  end

  # Match the OID *only* in the contexts openssl uses to report it.
  # Patterns that show up across openssl 1.1 / 3.x:
  #   - "unknown algorithm <oid>"
  #   - "unsupported algorithm <oid>"
  #   - "OID = <oid>"
  #   - "(OID: <oid>)"
  # The naive `\d+(?:\.\d+)+` regex matched the first dot-separated
  # number anywhere — which catches the openssl version string
  # ("OpenSSL 3.2.1") before it sees the actual OID. Anchoring to the
  # error wording avoids that false positive.
  #
  # An OID has at least 2 arcs and arcs are non-negative integers.
  # We require ≥ 3 arcs to filter out short version strings.
  defp unsupported_kdf_from_output(output) do
    patterns = [
      ~r/(?:unknown|unsupported)\s+algorithm[^\d]*?(\d+(?:\.\d+){2,})/i,
      ~r/OID\s*[:=]\s*(\d+(?:\.\d+){2,})/i,
      ~r/\bOID\s+(\d+(?:\.\d+){2,})/i
    ]

    Enum.find_value(patterns, {:p12_unsupported_kdf, :unknown}, fn re ->
      case Regex.run(re, output, capture: :all_but_first) do
        [oid] -> {:p12_unsupported_kdf, oid}
        _ -> nil
      end
    end)
  end

  defp with_tmp(bytes, fun) do
    path =
      Path.join(System.tmp_dir!(), "pkcs11ex_p12_#{System.unique_integer([:positive])}.p12")

    File.write!(path, bytes)

    try do
      fun.(path)
    after
      File.rm(path)
    end
  end

  defp openssl_executable do
    System.get_env("PKCS11EX_OPENSSL") || System.find_executable("openssl")
  end
end