lib/allm/providers/gemini.ex

defmodule ALLM.Providers.Gemini do
  @moduledoc """
  Google Gemini provider adapter — Layer B. See spec §6.4, §7.1, §20,
  §32.1 (bundled adapters).

  Phase 16.1 ships the non-streaming `ALLM.Adapter` callback set against
  the Generative Language API at
  `https://generativelanguage.googleapis.com/v1beta`. Streaming
  (`ALLM.StreamAdapter`) lands in Phase 16.2; tools / vision / image-out
  in Phases 16.3/16.4/16.5.

  This module implements:

    * `generate/2` — fires `POST /v1beta/models/{model}:generateContent`
      via `Req`, wrapped in `ALLM.Retry.run/3` with the **default** retry
      policy (Decision #16 — Gemini's 429 / 500 / 503 / 504 are already
      covered by spec §6.1's default retryable set; no Gemini-specific
      wrapper is needed).
    * `prepare_request/2` — returns an unfired `%Req.Request{}` with the
      API key injected as `x-goog-api-key` (Decision #2).
    * `translate_options/2` — identity (Decision #18). Gemini's
      camelCase rename and `generationConfig` nesting happens inside
      `to_generation_config/1` at request-build time.

  ## Single translator (Decision #1)

  Gemini exposes one chat endpoint, `generateContent`, that covers both
  text and image generation — image generation is selected by toggling
  `generationConfig.responseModalities`. The request-builder
  (`to_gemini_request_body/2`) is therefore a single function shared
  across the chat adapter and (in Phase 16.5) the image adapter. This
  amortizes the PHASE_10 dual-translator drift class to zero.

  ## Auth header (Decision #2/#3)

  The API key flows on the `x-goog-api-key` request header, not the
  documented `?key=...` query parameter. Both forms are equivalent
  server-side; the header form keeps the API key out of HTTP access
  logs and metrics. The same header is reused for the streaming
  endpoint (Decision #3).

  ## Wire field map (per spec §35.7 + GEMINI_DESIGN.md)

  | Concern | Gemini wire field |
  |---------|------------------|
  | Endpoint host | `https://generativelanguage.googleapis.com/v1beta` |
  | Method (chat non-streaming) | `POST /models/{model}:generateContent` |
  | Auth header | `x-goog-api-key: $key` |
  | Roles | `user`, `model` (`:assistant → "model"`) |
  | System prompt | top-level `systemInstruction.parts[].text` |
  | Generation params | nested under `generationConfig.{maxOutputTokens, temperature, topP, topK, stopSequences, responseMimeType, responseSchema}` |
  | `finish_reason` | `candidates[0].finishReason` (UPPER_SNAKE_CASE; mapping table below) |
  | Prompt-blocked path | `promptFeedback.blockReason` (top-level, no candidates) |
  | Usage location | `usageMetadata.{promptTokenCount, candidatesTokenCount, totalTokenCount}` |
  | Error envelope | `{"error": {"code", "status", "message"}}` |

  ## Finish-reason mapping (Decision #14)

  Gemini's enum has 19 documented values. ALLM's
  `Response.finish_reason` is a closed 6-atom union; the raw string is
  preserved at `Response.raw_finish_reason` for non-canonical rows.

  | Gemini `finishReason` | ALLM `Response.finish_reason` |
  |-----------------------|-------------------------------|
  | `STOP` | `:stop` |
  | `MAX_TOKENS` | `:length` |
  | `SAFETY` | `:content_filter` |
  | `RECITATION` | `:content_filter` |
  | `LANGUAGE` | `:content_filter` |
  | `BLOCKLIST` | `:content_filter` |
  | `PROHIBITED_CONTENT` | `:content_filter` |
  | `SPII` | `:content_filter` |
  | `IMAGE_SAFETY` | `:content_filter` |
  | `IMAGE_PROHIBITED_CONTENT` | `:content_filter` |
  | `IMAGE_RECITATION` | `:content_filter` |
  | `IMAGE_OTHER` | `:other` |
  | `NO_IMAGE` | `:other` |
  | `MALFORMED_FUNCTION_CALL` | `:error` |
  | `UNEXPECTED_TOOL_CALL` | `:error` |
  | `TOO_MANY_TOOL_CALLS` | `:error` |
  | `MISSING_THOUGHT_SIGNATURE` | `:error` |
  | `MALFORMED_RESPONSE` | `:error` |
  | `OTHER` / `FINISH_REASON_UNSPECIFIED` / unknown | `:other` |

  ## Empty-candidates branches (Decisions #9 + #10)

    * `promptFeedback.blockReason` with empty candidates →
      `{:ok, %Response{finish_reason: :content_filter, content: ""}}`.
      The block reason is preserved at
      `metadata.error.reason = "blocked:<BLOCK_REASON>"`.
    * empty candidates with no `promptFeedback.blockReason` →
      `{:error, %AdapterError{reason: :malformed_response}}`.

  ## Usage decoding (Decision #11)

  `usageMetadata.candidatesTokenCount` is canonical;
  `usageMetadata.responseTokenCount` is read as a defensive fallback
  when `candidatesTokenCount` is absent. If both are missing,
  `Usage.output_tokens` is left at `nil` and a one-time
  `Logger.warning/1` fires per call.

  ## Error envelope mapping (Decision #15)

  Maps Google's `{error: {code, status, message}}` envelope onto
  `%AdapterError{reason: ...}`:

  | HTTP | Google `status` | `AdapterError.reason` |
  |------|-----------------|----------------------|
  | 400 | `INVALID_ARGUMENT` (no marker) | `:invalid_request` |
  | 400 | `INVALID_ARGUMENT` (`exceeds the maximum number of tokens` substring) | `:context_length_exceeded` |
  | 401 | `UNAUTHENTICATED` | `:authentication_failed` |
  | 403 | `PERMISSION_DENIED` | `:authentication_failed` |
  | 404 | `NOT_FOUND` | `:invalid_request` |
  | 429 | `RESOURCE_EXHAUSTED` | `:rate_limited` |
  | 500 | `INTERNAL` | `:provider_unavailable` |
  | 503 | `UNAVAILABLE` | `:provider_unavailable` |
  | 504 | `DEADLINE_EXCEEDED` | `:provider_unavailable` |

  ## Retry policy (Decision #16)

  No Gemini-specific retry-policy wrapper. The default policy at
  `lib/allm/retry.ex` already retries HTTP 429, 500, 502, 503, 504,
  and `:timeout` / `:network_error`. Streaming never retries
  (spec §6.1).

  ## Key resolution

  Keys never appear on the engine. `prepare_request/2` and `generate/2`
  call `ALLM.Keys.fetch!(:gemini, opts)` at request-build time. The
  `:gemini` provider atom is **not** in `ALLM.Keys`'s `@env_var_table`;
  the unknown-provider fallback at
  `lib/allm/keys.ex:189-194` returns `"GEMINI_API_KEY"`.
  """

  @behaviour ALLM.Adapter
  @behaviour ALLM.StreamAdapter

  @base_url "https://generativelanguage.googleapis.com/v1beta"
  @default_stream_timeout 60_000

  alias ALLM.Error.AdapterError
  alias ALLM.Error.StreamError
  alias ALLM.Error.ValidationError
  alias ALLM.Image
  alias ALLM.ImagePart
  alias ALLM.Keys
  alias ALLM.Message
  alias ALLM.Providers.Gemini.Decode
  alias ALLM.Providers.Support.GeminiHeaders
  alias ALLM.Providers.Support.SSE
  alias ALLM.Request
  alias ALLM.Response
  alias ALLM.Retry
  alias ALLM.TextPart
  alias ALLM.Tool
  alias ALLM.ToolCall
  alias ALLM.Usage

  require Logger

  # ---------------------------------------------------------------------------
  # Public API — Adapter callbacks
  # ---------------------------------------------------------------------------

  @impl ALLM.Adapter
  @doc """
  Identity translator (Decision #18). Gemini accepts ALLM's canonical
  `:max_tokens`, `:temperature`, `:top_p`, etc. — the camelCase rename
  and `generationConfig` nesting happens in `to_generation_config/1` at
  request-build time, not here.

  ## Examples

      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "x"}], model: "gemini-2.5-flash")
      iex> ALLM.Providers.Gemini.translate_options([max_tokens: 100, temperature: 0.7], req)
      [max_tokens: 100, temperature: 0.7]
  """
  @spec translate_options(keyword(), Request.t()) :: keyword()
  def translate_options(opts, %Request{}) when is_list(opts), do: opts

  @impl ALLM.Adapter
  @doc """
  Build an unfired `%Req.Request{}` with the resolved API key injected
  as `x-goog-api-key: <key>` (Decision #2).

  Per `ALLM.Keys.fetch!/2`, this function **raises**
  `%ALLM.Error.EngineError{reason: :missing_key}` when no key resolver
  yields a value.

  Honors `opts[:request_timeout]` (forwarded as `Req`'s
  `:receive_timeout`) and `opts[:adapter_opts][:endpoint]` (URL host
  override, primarily for testing).

  ## Examples

      iex> ALLM.Keys.put(:gemini, "AIza-doctest-prep")
      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "hi"}], model: "gemini-2.5-flash")
      iex> {:ok, %Req.Request{} = http} = ALLM.Providers.Gemini.prepare_request(req, [])
      iex> Req.Request.get_header(http, "x-goog-api-key")
      ["AIza-doctest-prep"]
      iex> ALLM.Keys.delete(:gemini)
      :ok
  """
  @spec prepare_request(Request.t(), keyword()) ::
          {:ok, Req.Request.t()} | {:error, AdapterError.t()}
  def prepare_request(%Request{} = request, opts) when is_list(opts) do
    api_key = Keys.fetch!(:gemini, opts)
    body = to_gemini_request_body(request, opts)
    url = endpoint(opts) <> "/models/#{request.model}:generateContent"

    req =
      Req.new(
        method: :post,
        url: url,
        headers: GeminiHeaders.headers(api_key),
        json: body
      )
      |> maybe_apply_req_test_stub(opts)
      |> maybe_apply_request_timeout(opts)

    {:ok, req}
  end

  defp endpoint(opts) do
    case Keyword.get(opts, :adapter_opts, []) |> Keyword.get(:endpoint) do
      nil -> @base_url
      url when is_binary(url) -> url
    end
  end

  defp maybe_apply_req_test_stub(req, opts) do
    case Keyword.get(opts, :adapter_opts, []) |> Keyword.get(:plug) do
      nil -> req
      plug -> Req.merge(req, plug: plug)
    end
  end

  defp maybe_apply_request_timeout(req, opts) do
    case Keyword.get(opts, :request_timeout) do
      nil -> req
      ms when is_integer(ms) and ms > 0 -> Req.merge(req, receive_timeout: ms)
    end
  end

  @impl ALLM.Adapter
  @doc """
  Execute a non-streaming `generateContent` request synchronously.

  Wraps the HTTP call in `ALLM.Retry.run/3` with the spec §6.1 default
  policy (Decision #16). Returns `{:ok, %Response{}}` on 2xx success or
  `{:error, %AdapterError{}}` on every failure shape.

  ## Empty-candidates handling (Decisions #9 + #10)

    * `promptFeedback.blockReason` with empty candidates →
      `{:ok, %Response{finish_reason: :content_filter, content: ""}}`
      (a successful HTTP response is a successful call from the
      adapter's perspective; the content filter is a finish reason).
    * Empty candidates with no `promptFeedback.blockReason` →
      `{:error, %AdapterError{reason: :malformed_response}}`.

  ## Error reasons (Decision #15)

  | HTTP | `AdapterError.reason` |
  |------|----------------------|
  | 400 generic | `:invalid_request` |
  | 400 ctx-window | `:context_length_exceeded` |
  | 401 / 403 | `:authentication_failed` |
  | 404 | `:invalid_request` |
  | 429 | `:rate_limited` |
  | 500 / 503 / 504 | `:provider_unavailable` |
  | network drop | `:network_error` |
  | malformed body | `:malformed_response` |

  ## Examples

      iex> ALLM.Keys.put(:gemini, "AIza-doctest-gen")
      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "x"}], model: "gemini-2.5-flash")
      iex> {:error, %ALLM.Error.AdapterError{reason: :authentication_failed}} =
      ...>   ALLM.Providers.Gemini.generate(req,
      ...>     retry: false,
      ...>     adapter_opts: [plug: fn conn ->
      ...>       conn
      ...>       |> Plug.Conn.put_resp_content_type("application/json")
      ...>       |> Plug.Conn.resp(401, ~s({"error":{"code":401,"status":"UNAUTHENTICATED","message":"bad"}}))
      ...>     end]
      ...>   )
      iex> ALLM.Keys.delete(:gemini)
      :ok
  """
  @spec generate(Request.t(), keyword()) ::
          {:ok, Response.t()} | {:error, AdapterError.t() | ValidationError.t()}
  def generate(%Request{} = request, opts) when is_list(opts) do
    with :ok <- reject_image_in_system_messages(request),
         :ok <- reject_unsupported_image_sources(request) do
      do_generate(request, opts)
    end
  end

  defp do_generate(%Request{} = request, opts) do
    {:ok, %Req.Request{} = http_req} = prepare_request(request, opts)

    retry_policy = Keyword.get(opts, :retry, :default)
    telemetry_meta = build_retry_telemetry_meta(opts)

    retry_policy
    |> Retry.run(telemetry_meta, fn -> run_one_attempt(http_req, opts) end)
    |> unwrap_retry_token()
  end

  defp build_retry_telemetry_meta(opts) do
    base = %{provider: :gemini}

    case Keyword.get(opts, :request_id) do
      nil -> base
      value -> Map.put(base, :request_id, value)
    end
  end

  defp unwrap_retry_token({:ok, _} = ok), do: ok

  defp unwrap_retry_token(
         {:error, %AdapterError{metadata: %{final_error: %AdapterError{} = real}}}
       ),
       do: {:error, real}

  defp unwrap_retry_token({:error, _} = err), do: err

  defp run_one_attempt(http_req, opts) do
    case Req.request(http_req) do
      {:ok, %Req.Response{status: status, body: body}}
      when status in 200..299 ->
        decode_success_body(body, opts)

      {:ok, %Req.Response{status: status, body: body, headers: headers}} ->
        classify_http_error(status, body, headers)

      {:error, %{__struct__: Req.TransportError, reason: :timeout} = cause} ->
        timeout_err = AdapterError.new(:timeout, provider: :gemini, cause: cause)

        {:retry, 0,
         struct(timeout_err,
           reason: :timeout,
           metadata: Map.put(timeout_err.metadata, :final_error, timeout_err)
         )}

      {:error, %{__struct__: Jason.DecodeError} = cause} ->
        {:error, malformed_error(cause)}

      {:error, exception} ->
        {:error,
         AdapterError.new(:network_error,
           provider: :gemini,
           message: "transport failure: " <> Exception.message(exception),
           cause: exception
         )}
    end
  end

  defp decode_success_body(body, opts) when is_map(body) do
    {:ok, decode_response(body, opts)}
  rescue
    e in ArgumentError ->
      # Defensive: malformed shape that decode_response/2 cannot handle.
      {:error, malformed_error(e)}
  catch
    :throw, {:malformed_response, cause} ->
      {:error, malformed_error(cause)}
  end

  defp decode_success_body(other, _opts) do
    {:error, malformed_error({:unexpected_body_shape, other})}
  end

  defp malformed_error(cause) do
    AdapterError.new(:malformed_response,
      provider: :gemini,
      message: "Gemini returned a body that could not be decoded",
      cause: cause
    )
  end

  defp classify_http_error(status, body, headers) do
    decoded = decode_error_body(body)
    classified = classify_error(status, decoded, headers)

    if classified.reason in [:rate_limited, :provider_unavailable, :timeout] do
      retry_token =
        struct(classified,
          reason: status,
          metadata: Map.put(classified.metadata, :final_error, classified)
        )

      {:retry, retry_after_ms(headers) || 0, retry_token}
    else
      {:error, classified}
    end
  end

  defp decode_error_body(body) when is_map(body), do: body
  defp decode_error_body(_), do: %{}

  @doc false
  @spec classify_error(non_neg_integer(), map(), Enumerable.t()) :: AdapterError.t()
  def classify_error(status, body, headers) when is_integer(status) and is_map(body) do
    error = Map.get(body, "error", %{})
    google_status = Map.get(error, "status")
    message = Map.get(error, "message", "Gemini HTTP #{status}")

    reason = classify_reason(status, google_status, message)
    retry_after = retry_after_ms(headers)

    AdapterError.new(reason,
      provider: :gemini,
      status: status,
      retry_after_ms: retry_after,
      message: message,
      metadata: %{google_status: google_status}
    )
  end

  defp classify_reason(401, _gs, _msg), do: :authentication_failed
  defp classify_reason(403, _gs, _msg), do: :authentication_failed
  defp classify_reason(404, _gs, _msg), do: :invalid_request
  defp classify_reason(429, _gs, _msg), do: :rate_limited

  defp classify_reason(400, _gs, msg) when is_binary(msg) do
    if context_length_marker?(msg) do
      :context_length_exceeded
    else
      :invalid_request
    end
  end

  defp classify_reason(400, _gs, _msg), do: :invalid_request

  defp classify_reason(status, _gs, _msg) when status in [500, 502, 503, 504],
    do: :provider_unavailable

  defp classify_reason(_status, _gs, _msg), do: :unknown

  defp context_length_marker?(msg) do
    String.contains?(msg, "exceeds the maximum number of tokens") or
      String.contains?(msg, "input token count")
  end

  # Parse `Retry-After` header per RFC 7231 §7.1.3. Decision #16 amendment:
  # `Retry-After` parsing is a separate concern from the retry-policy
  # wrapper choice — even though Gemini uses the default policy, the
  # adapter still honors server-issued back-off hints. Mirrors
  # `lib/allm/providers/anthropic.ex:515-547` and
  # `lib/allm/providers/openai.ex:690-737`.
  defp retry_after_ms(headers) do
    case header_value(headers, "retry-after") do
      nil -> nil
      value -> parse_retry_after(value)
    end
  end

  defp header_value(headers, name) when is_map(headers) do
    case Map.get(headers, name) do
      nil -> nil
      value -> header_value_to_string(value)
    end
  end

  defp header_value(headers, name) when is_list(headers) do
    Enum.find_value(headers, fn
      {k, v} when is_binary(k) ->
        if String.downcase(k) == name, do: header_value_to_string(v), else: nil

      _ ->
        nil
    end)
  end

  defp header_value(_headers, _name), do: nil

  defp header_value_to_string([v | _]) when is_binary(v), do: v
  defp header_value_to_string(v) when is_binary(v), do: v
  defp header_value_to_string(_), do: nil

  defp parse_retry_after(value) when is_binary(value) do
    case Integer.parse(value) do
      {seconds, ""} when seconds >= 0 -> seconds * 1_000
      _ -> nil
    end
  end

  defp parse_retry_after(_), do: nil

  # ---------------------------------------------------------------------------
  # Request-body composition (Decision #1 — single translator)
  # ---------------------------------------------------------------------------

  @doc """
  Compose the JSON request body for `generateContent` from a canonical
  `%Request{}`. Pure function; no I/O.

  Performs system-message extraction (hoist into top-level
  `systemInstruction`), role mapping (`:assistant → "model"`), and
  `generationConfig` composition.

  Phase 16.1 surface only — tools (16.3) and image-out (16.5) extend
  this builder via `opts` flags without changing the text-only path.

  ## Examples

      iex> req = ALLM.Request.new(
      ...>   [%ALLM.Message{role: :system, content: "Be concise."},
      ...>    %ALLM.Message{role: :user, content: "Hi"}],
      ...>   model: "gemini-2.5-flash", max_tokens: 256
      ...> )
      iex> body = ALLM.Providers.Gemini.to_gemini_request_body(req, [])
      iex> {body["systemInstruction"], length(body["contents"]), body["generationConfig"]["maxOutputTokens"]}
      {%{"parts" => [%{"text" => "Be concise."}]}, 1, 256}
  """
  @spec to_gemini_request_body(Request.t(), keyword()) :: map()
  def to_gemini_request_body(%Request{} = request, opts) when is_list(opts) do
    {system_text, non_system} = extract_system(request.messages)

    base = %{"contents" => to_gemini_contents(non_system)}

    base
    |> maybe_put_system_instruction(system_text)
    |> maybe_put_generation_config(request)
    |> maybe_put_tools(request.tools)
    |> maybe_put_tool_config(request.tool_choice, request.tools)
  end

  defp maybe_put_tools(body, []), do: body

  defp maybe_put_tools(body, [_ | _] = tools) do
    Map.put(body, "tools", [%{"functionDeclarations" => to_gemini_tools(tools)}])
  end

  # tool_choice has no Gemini equivalent until tools are declared. Omit
  # toolConfig when tools is empty so the wire payload stays minimal.
  defp maybe_put_tool_config(body, _choice, []), do: body
  defp maybe_put_tool_config(body, nil, _tools), do: body

  defp maybe_put_tool_config(body, choice, _tools) do
    Map.put(body, "toolConfig", %{"functionCallingConfig" => to_gemini_tool_config(choice)})
  end

  defp maybe_put_system_instruction(body, nil), do: body

  defp maybe_put_system_instruction(body, text) when is_binary(text) do
    Map.put(body, "systemInstruction", %{"parts" => [%{"text" => text}]})
  end

  defp maybe_put_generation_config(body, %Request{} = request) do
    case to_generation_config(request) do
      gc when map_size(gc) == 0 -> body
      gc -> Map.put(body, "generationConfig", gc)
    end
  end

  @doc false
  @spec extract_system([Message.t()]) :: {String.t() | nil, [Message.t()]}
  def extract_system(messages) when is_list(messages) do
    {systems, others} = Enum.split_with(messages, &(&1.role == :system))

    case systems do
      [] -> {nil, others}
      _ -> {Enum.map_join(systems, "\n\n", &stringify_content(&1.content)), others}
    end
  end

  # Phase 16.4 — `user_content/1` is the user-side content emitter.
  #
  #   - binary  → forwarded verbatim wrapped in a single text part
  #     (v0.2 backward-compat).
  #   - list with any `%ImagePart{}` → translated to inlineData parts
  #     via `to_gemini_content_blocks/1`.
  #   - list with only `%TextPart{}` → flattened to one joined-text part
  #     (Phase 14.4 backward-compat).
  #   - nil → single text part with empty string.
  #
  # Mirrors the byte-identical helper names in `lib/allm/providers/anthropic.ex`
  # (`user_content/1`, `to_anthropic_content_blocks/1`, `part_to_block/1`,
  # `materialize_part/1`, `system_has_image_part?/1`,
  # `reject_image_in_system_messages/1`) and the OpenAI counterparts at
  # `lib/allm/providers/openai.ex:1694-1798` per CLAUDE.md cross-provider
  # helper-name alignment rule.
  defp user_content(c) when is_binary(c) or is_nil(c) do
    [%{"text" => stringify_content(c)}]
  end

  defp user_content(parts) when is_list(parts) do
    if Enum.any?(parts, &match?(%ImagePart{}, &1)) do
      to_gemini_content_blocks(parts)
    else
      [%{"text" => stringify_content(parts)}]
    end
  end

  defp stringify_content(c) when is_binary(c), do: c
  defp stringify_content(nil), do: ""

  # Flatten a text-only content list. Image-bearing lists are routed
  # through `to_gemini_content_blocks/1` by `user_content/1` BEFORE
  # reaching this helper; system / tool-role messages always carry
  # text or text-only lists.
  defp stringify_content(parts) when is_list(parts) do
    Enum.map_join(parts, "\n", &materialize_part/1)
  end

  defp materialize_part(%TextPart{text: t}), do: t
  # Defensive: ImagePart should be filtered upstream for system / tool
  # contexts; render to empty string rather than raising — text-only
  # contexts that accidentally see one degrade gracefully. Mirrors the
  # OpenAI / Anthropic precedent; see `lib/allm/providers/anthropic.ex:780`.
  defp materialize_part(%ImagePart{}), do: ""

  defp materialize_part(other) do
    raise ArgumentError,
          "stringify_content/1 expects a TextPart; got: #{inspect(other)}"
  end

  # Phase 16.4 — system-message ImagePart rejection. System content is
  # text-only on Gemini's `systemInstruction` field; an ImagePart in a
  # system role is a hard reject before any other validation runs.
  # Mirrors `lib/allm/providers/anthropic.ex:793` and
  # `lib/allm/providers/openai.ex:1752`.
  @spec reject_image_in_system_messages(Request.t()) ::
          :ok | {:error, ValidationError.t()}
  defp reject_image_in_system_messages(%Request{messages: messages}) do
    errors =
      messages
      |> Enum.with_index()
      |> Enum.flat_map(fn {%Message{role: role, content: c}, idx} ->
        if role == :system and system_has_image_part?(c) do
          [{[:messages, idx, :content], :image_in_system_message}]
        else
          []
        end
      end)

    case errors do
      [] ->
        :ok

      list ->
        {:error,
         ValidationError.new(:invalid_message, list,
           message: "image content is not supported in system-role messages"
         )}
    end
  end

  defp system_has_image_part?(content) when is_list(content) do
    Enum.any?(content, &match?(%ImagePart{}, &1))
  end

  defp system_has_image_part?(_), do: false

  # Phase 16.4 — Gemini does NOT fetch URL-source images; the Files API
  # is out of scope for v0.3. Walk every ImagePart and reject:
  #
  #   * `{:url, _}` → unsupported_feature with pre-fetch guidance.
  #   * `{:file, _}` with `:mime_type == nil` → unsupported_feature
  #     with the path named so callers know to use `Image.from_binary/2`
  #     or `from_file/1` against an extension-bearing path.
  #
  # Returns `AdapterError`, NOT `ValidationError`, because the rejection
  # is a provider-capability gate (Gemini-specific), not a structural
  # validation error.
  @spec reject_unsupported_image_sources(Request.t()) ::
          :ok | {:error, AdapterError.t()}
  defp reject_unsupported_image_sources(%Request{messages: messages}) do
    Enum.reduce_while(messages, :ok, fn %Message{content: c}, _acc ->
      case check_message_image_sources(c) do
        :ok -> {:cont, :ok}
        {:error, _} = err -> {:halt, err}
      end
    end)
  end

  defp check_message_image_sources(content) when is_list(content) do
    Enum.reduce_while(content, :ok, fn part, _acc ->
      case check_part_source(part) do
        :ok -> {:cont, :ok}
        {:error, _} = err -> {:halt, err}
      end
    end)
  end

  defp check_message_image_sources(_other), do: :ok

  defp check_part_source(%ImagePart{image: %Image{source: {:url, _}}}) do
    {:error,
     AdapterError.new(:unsupported_feature,
       provider: :gemini,
       message:
         "Gemini adapter does not fetch URL-source images; pre-fetch and pass as " <>
           "%Image{source: {:binary, _}, mime_type: _}"
     )}
  end

  defp check_part_source(%ImagePart{image: %Image{source: {:file, path}, mime_type: nil}}) do
    {:error,
     AdapterError.new(:unsupported_feature,
       provider: :gemini,
       message:
         "Gemini adapter cannot infer MIME type for file #{inspect(path)}; " <>
           "use ALLM.Image.from_binary/2 or pass an extension-bearing path"
     )}
  end

  defp check_part_source(_other), do: :ok

  # Phase 16.4 — content-block translator. Routes a `[TextPart | ImagePart]`
  # content list to Gemini's `parts` array. Each ALLM part becomes one
  # Gemini Part. URL and nil-mime-file ImageParts MUST be rejected
  # upstream by `reject_unsupported_image_sources/1`; reaching them here
  # raises so the closed-set assumption remains explicit.
  #
  # `ImagePart.detail` is read but NOT emitted on the wire (Gemini has
  # no equivalent field); a single `Logger.debug/1` per process surfaces
  # the drop the first time a non-default value is observed.
  @spec to_gemini_content_blocks([TextPart.t() | ImagePart.t()]) :: [map()]
  defp to_gemini_content_blocks(parts) when is_list(parts) do
    Enum.map(parts, &part_to_block/1)
  end

  defp part_to_block(%TextPart{text: t}) do
    %{"text" => t}
  end

  defp part_to_block(%ImagePart{image: %Image{source: {:base64, data}, mime_type: mime}, detail: d}) do
    detail_drop_check(d)
    %{"inlineData" => %{"mimeType" => mime, "data" => data}}
  end

  defp part_to_block(%ImagePart{image: %Image{source: {:binary, bytes}, mime_type: mime}, detail: d}) do
    detail_drop_check(d)
    %{"inlineData" => %{"mimeType" => mime, "data" => Base.encode64(bytes)}}
  end

  defp part_to_block(%ImagePart{image: %Image{source: {:file, path}, mime_type: mime}, detail: d})
       when is_binary(mime) do
    detail_drop_check(d)
    bytes = File.read!(path)
    %{"inlineData" => %{"mimeType" => mime, "data" => Base.encode64(bytes)}}
  end

  # Phase 16.4 — Gemini's wire shape has no `detail` field. When a
  # non-default detail is observed, fire a single deferred-form
  # `Logger.debug/1` per process and stash a flag in the process dict so
  # subsequent calls in the same process stay silent. Mirrors Anthropic's
  # precedent at `lib/allm/providers/anthropic.ex:884`. `:auto` is the
  # default and does NOT trigger the warning.
  defp detail_drop_check(:auto), do: :ok
  defp detail_drop_check(nil), do: :ok

  defp detail_drop_check(_detail) do
    warn_detail_dropped_once()
    :ok
  end

  defp warn_detail_dropped_once do
    if !Process.get(:allm_gemini_detail_warned, false) do
      Logger.debug(fn ->
        "ALLM.Providers.Gemini: ImagePart.detail is not supported by Gemini; dropping. " <>
          "This warning fires once per process."
      end)

      Process.put(:allm_gemini_detail_warned, true)
    end

    :ok
  end

  @doc false
  @spec to_gemini_contents([Message.t()]) :: [map()]
  def to_gemini_contents(messages) when is_list(messages) do
    name_lookup = build_tool_call_name_lookup(messages)
    Enum.map(messages, &to_gemini_message(&1, name_lookup))
  end

  # Walk the message list and collect a `tool_call_id => tool_name` map from
  # every assistant turn's `metadata.tool_calls`. Used by `:tool`-role
  # rewrite to populate `functionResponse.name`, which ALLM `Message.name`
  # doesn't carry on tool-result messages constructed by the chat
  # orchestrator. Gemini rejects empty `functionResponse.name`.
  defp build_tool_call_name_lookup(messages) do
    Enum.reduce(messages, %{}, fn
      %Message{role: :assistant, metadata: %{tool_calls: calls}}, acc when is_list(calls) ->
        Enum.reduce(calls, acc, fn
          %{id: id, name: n}, a when is_binary(id) and is_binary(n) -> Map.put(a, id, n)
          _, a -> a
        end)

      _, acc ->
        acc
    end)
  end

  defp to_gemini_message(%Message{role: :user, content: c}, _lookup) do
    %{"role" => "user", "parts" => user_content(c)}
  end

  defp to_gemini_message(%Message{role: :assistant, content: c, metadata: meta}, _lookup) do
    # Phase 16.3 — round-trip the model's prior `functionCall` parts back
    # to Gemini when the assistant turn is being re-fed. The chat-loop
    # stashes the tool_calls list under `metadata.tool_calls` per the
    # cross-provider convention (mirrors Anthropic's
    # `to_anthropic_message/1` `:assistant` clause at
    # `lib/allm/providers/anthropic.ex:703-736`).
    case Map.get(meta, :tool_calls) do
      [_ | _] = calls -> assistant_with_tool_calls(c, calls)
      _ -> %{"role" => "model", "parts" => [%{"text" => stringify_content(c)}]}
    end
  end

  defp to_gemini_message(%Message{role: :system, content: c}, _lookup) do
    # Defensive: extract_system/1 should have removed these.
    %{"role" => "user", "parts" => [%{"text" => stringify_content(c)}]}
  end

  # Phase 16.3 — `:tool`-role messages translate to a `user`-role Gemini
  # turn carrying a single `functionResponse` part (Decision #5). The
  # `:tool_call_id` is echoed as `functionResponse.id` when present.
  # `:name` is the function name; absent name surfaces as the empty
  # string and Gemini will reject — the wire-shape is honest about the
  # caller's omission rather than synthesizing a placeholder.
  defp to_gemini_message(%Message{role: :tool} = msg, lookup) do
    %Message{content: c, tool_call_id: tcid, name: name} = msg

    # `Message.name` is `nil` on tool-result messages built by the chat
    # orchestrator; resolve from the prior assistant turn's tool_calls
    # metadata via `lookup` (built in `to_gemini_contents/1`). Empty name
    # is rejected by Gemini, so falling back to "" is a last resort that
    # preserves wire honesty about the caller's omission.
    resolved_name = name || Map.get(lookup, tcid) || ""

    fr =
      %{
        "name" => resolved_name,
        "response" => to_function_response_payload(c)
      }
      |> maybe_put_response_id(tcid)

    %{"role" => "user", "parts" => [%{"functionResponse" => fr}]}
  end

  defp maybe_put_response_id(map, nil), do: map
  defp maybe_put_response_id(map, ""), do: map
  defp maybe_put_response_id(map, id) when is_binary(id), do: Map.put(map, "id", id)

  # Decision #5 prose — `functionResponse.response` is a free-form JSON
  # object. ALLM standardizes binary tool output as
  # `%{"output" => content}` so callers don't need to read provider docs;
  # callers wanting full control pass a map and we passthrough verbatim.
  defp to_function_response_payload(c) when is_binary(c), do: %{"output" => c}
  defp to_function_response_payload(c) when is_map(c), do: c
  defp to_function_response_payload(nil), do: %{"output" => ""}

  # Render an `:assistant`-role message that carried tool-calls metadata
  # back to the wire as a `model`-role turn whose parts hold one
  # `functionCall` per call, optionally preceded by a text part if the
  # turn also carried text (rare but legal — Gemini's mixed text+call
  # response shape).
  defp assistant_with_tool_calls(c, [_ | _] = calls) do
    text = stringify_content(c)

    text_parts =
      case text do
        "" -> []
        _ -> [%{"text" => text}]
      end

    call_parts = Enum.map(calls, &tool_call_to_function_call_part/1)
    %{"role" => "model", "parts" => text_parts ++ call_parts}
  end

  defp tool_call_to_function_call_part(%ToolCall{
         id: id,
         name: name,
         arguments: args,
         metadata: meta
       }) do
    fc_base = %{"name" => name, "args" => args || %{}}

    fc =
      case id do
        nil -> fc_base
        "" -> fc_base
        bin when is_binary(bin) -> Map.put(fc_base, "id", bin)
      end

    # Echo `thoughtSignature` on the part containing functionCall — Gemini 3
    # rejects subsequent turns missing it. Captured on decode by
    # `Gemini.Decode.decode_function_call/2`.
    part = %{"functionCall" => fc}

    case meta do
      %{"thoughtSignature" => sig} when is_binary(sig) and sig != "" ->
        Map.put(part, "thoughtSignature", sig)

      _ ->
        part
    end
  end

  @doc """
  Translate a list of canonical `%ALLM.Tool{}`s to Gemini's
  `functionDeclarations` shape.

  Gemini's `tools` is an array of `%{functionDeclarations: [...]}`
  objects, not a flat array of declarations. Each declaration carries
  `:name`, `:description`, and `:parameters` (Gemini's name for the
  JSON-Schema field — distinct from OpenAI's `parameters` key on the
  tool's `function` sub-map and Anthropic's `input_schema`).

  ## Examples

      iex> tool = ALLM.Tool.new(name: "get_weather", description: "weather", schema: %{"type" => "object"})
      iex> ALLM.Providers.Gemini.to_gemini_tools([tool])
      [%{"name" => "get_weather", "description" => "weather", "parameters" => %{"type" => "object"}}]
  """
  @spec to_gemini_tools([Tool.t()]) :: [map()]
  def to_gemini_tools(tools) when is_list(tools) do
    Enum.map(tools, fn %Tool{name: n, description: d, schema: s} ->
      %{"name" => n, "description" => d, "parameters" => sanitize_schema(s)}
    end)
  end

  # Gemini's `tools[].functionDeclarations[].parameters` and
  # `generationConfig.responseSchema` accept the OpenAPI 3.0 Schema-Object
  # subset, which does NOT include `additionalProperties` (a JSON Schema /
  # OpenAI-strict-mode keyword). Passing it verbatim returns
  # `INVALID_ARGUMENT: Unknown name "additionalProperties"`. Strip it (and
  # `$schema`) recursively so caller schemas authored for OpenAI strict mode
  # are portable across providers. Other unsupported keywords are still
  # rejected by Gemini at request time per design line 197.
  @doc false
  @spec sanitize_schema(any()) :: any()
  def sanitize_schema(schema) when is_map(schema) do
    schema
    |> Map.drop(["additionalProperties", :additionalProperties, "$schema", :"$schema"])
    |> Map.new(fn {k, v} -> {k, sanitize_schema(v)} end)
  end

  def sanitize_schema(list) when is_list(list), do: Enum.map(list, &sanitize_schema/1)
  def sanitize_schema(other), do: other

  @doc """
  Translate an ALLM canonical `tool_choice` to Gemini's
  `functionCallingConfig` map.

  | ALLM canonical | Gemini wire |
  |----------------|-------------|
  | `:auto` | `%{"mode" => "AUTO"}` |
  | `:required` | `%{"mode" => "ANY"}` |
  | `:none` | `%{"mode" => "NONE"}` |
  | `{:tool, "name"}` | `%{"mode" => "ANY", "allowedFunctionNames" => ["name"]}` |
  | `"name"` (string) | shorthand for `{:tool, "name"}` |

  Map shapes (`%{"mode" => "AUTO"}`, etc.) are passed through verbatim
  so callers can hand-craft Gemini-specific extensions.

  ## Examples

      iex> ALLM.Providers.Gemini.to_gemini_tool_config(:auto)
      %{"mode" => "AUTO"}

      iex> ALLM.Providers.Gemini.to_gemini_tool_config({:tool, "set_color"})
      %{"mode" => "ANY", "allowedFunctionNames" => ["set_color"]}
  """
  @spec to_gemini_tool_config(Request.tool_choice() | {:tool, String.t()}) :: map()
  def to_gemini_tool_config(:auto), do: %{"mode" => "AUTO"}
  def to_gemini_tool_config(:required), do: %{"mode" => "ANY"}
  def to_gemini_tool_config(:none), do: %{"mode" => "NONE"}

  def to_gemini_tool_config({:tool, name}) when is_binary(name) do
    %{"mode" => "ANY", "allowedFunctionNames" => [name]}
  end

  def to_gemini_tool_config(name) when is_binary(name) do
    %{"mode" => "ANY", "allowedFunctionNames" => [name]}
  end

  def to_gemini_tool_config(%{} = wire), do: wire

  @doc false
  @spec to_generation_config(Request.t()) :: map()
  def to_generation_config(%Request{} = request) do
    %{}
    |> maybe_put("maxOutputTokens", request.max_tokens)
    |> maybe_put("temperature", request.temperature)
    |> put_response_format(request.response_format)
    |> Map.merge(stringify_options(request.options))
  end

  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)

  defp put_response_format(map, nil), do: map
  defp put_response_format(map, :text), do: map

  defp put_response_format(map, %{type: :json_object}) do
    Map.put(map, "responseMimeType", "application/json")
  end

  defp put_response_format(map, %{type: :json_schema, schema: schema}) do
    map
    |> Map.put("responseMimeType", "application/json")
    |> Map.put("responseSchema", sanitize_schema(schema))
  end

  defp put_response_format(map, _other), do: map

  # Map ALLM's canonical option keys onto Gemini's camelCase
  # `generationConfig` keys. Unknown keys are passed through verbatim
  # (string-coerced) so callers can attach Gemini-specific fields via
  # `request.options` (e.g., `:thinkingConfig` in a future phase).
  defp stringify_options(options) when is_map(options) do
    Map.new(options, fn {k, v} -> {option_key(k), v} end)
  end

  defp option_key(:top_p), do: "topP"
  defp option_key(:top_k), do: "topK"
  defp option_key(:stop), do: "stopSequences"
  defp option_key(:max_tokens), do: "maxOutputTokens"
  defp option_key(:temperature), do: "temperature"
  defp option_key(:response_mime_type), do: "responseMimeType"
  defp option_key(:response_schema), do: "responseSchema"
  defp option_key(k) when is_atom(k), do: Atom.to_string(k)
  defp option_key(k) when is_binary(k), do: k

  # ---------------------------------------------------------------------------
  # Response decoding
  # ---------------------------------------------------------------------------

  @doc false
  @spec decode_response(map(), keyword()) :: Response.t()
  def decode_response(%{} = body, _opts) do
    candidates = Map.get(body, "candidates", [])

    case candidates do
      [_ | _] = list -> decode_with_candidates(body, list)
      [] -> decode_empty_candidates(body)
      _other -> throw({:malformed_response, {:non_list_candidates, candidates}})
    end
  end

  # Phase 16.5 — load-bearing refactor: `Gemini.Decode.candidate_parts/1`
  # is the single shared decoder for `Gemini.generate/2` and
  # `Gemini.Images.generate/2`. The chat path ignores `image_parts` (which
  # Gemini-text models never emit anyway) and the image path ignores
  # `tool_calls` (image models do not emit functionCall parts on
  # generateContent + responseModalities=[TEXT, IMAGE]). See
  # `lib/allm/providers/gemini/decode.ex` and GEMINI_DESIGN.md
  # "Cross-function invariants" lines 217-219.
  defp decode_with_candidates(body, [first | _rest]) do
    {text, tool_calls, _image_parts, raw_finish} = Decode.candidate_parts(first)

    {finish_reason, raw_keep} = parse_finish_reason(raw_finish)

    {final_finish, final_raw} =
      promote_finish_reason_for_tool_calls(finish_reason, raw_keep, tool_calls)

    message = %Message{
      role: :assistant,
      content: text || "",
      metadata: tool_calls_metadata(tool_calls)
    }

    %Response{
      id: nil,
      model: Map.get(body, "modelVersion"),
      message: message,
      output_text: text,
      tool_calls: tool_calls,
      finish_reason: final_finish,
      raw_finish_reason: final_raw,
      usage: parse_usage(Map.get(body, "usageMetadata", %{})),
      raw: body,
      metadata: response_metadata(body)
    }
  end

  defp tool_calls_metadata([]), do: %{}
  defp tool_calls_metadata([_ | _] = tool_calls), do: %{tool_calls: tool_calls}

  # Decision #14 finish-reason override: STOP + functionCall parts ->
  # :tool_calls (the override path matches OpenAI Chat Completions and
  # Anthropic — finish reason follows the *content*, not the wire enum).
  # The raw_finish_reason collapses to nil because :tool_calls IS the
  # canonical row for "stopped to invoke a tool"; callers who need the
  # original "STOP" string can read response.raw["candidates"]....
  defp promote_finish_reason_for_tool_calls(:stop, _raw, [_ | _]), do: {:tool_calls, nil}
  defp promote_finish_reason_for_tool_calls(fr, raw, _tool_calls), do: {fr, raw}

  # Phase 16.5 — `walk_parts/1` is preserved as a public test seam that
  # delegates to `Gemini.Decode.candidate_parts/1`. The original 16.1
  # caller (decode_with_candidates/2) now calls `candidate_parts/1`
  # directly; this helper exists so external test files asserting the
  # shared-helper invariant against a `parts` list (rather than a
  # candidate map) keep working.
  @doc false
  @spec walk_parts([map()]) :: {String.t() | nil, [ToolCall.t()]}
  def walk_parts(parts) when is_list(parts) do
    {text, tool_calls, _image_parts, _raw_finish} =
      Decode.candidate_parts(%{"content" => %{"parts" => parts}})

    {text, tool_calls}
  end

  defp decode_empty_candidates(body) do
    case get_in(body, ["promptFeedback", "blockReason"]) do
      reason when is_binary(reason) ->
        # Decision #9 — prompt-blocked. Successful response shape with
        # finish_reason: :content_filter and empty content.
        %Response{
          model: Map.get(body, "modelVersion"),
          message: %Message{role: :assistant, content: "", metadata: %{}},
          output_text: "",
          tool_calls: [],
          finish_reason: :content_filter,
          raw_finish_reason: reason,
          usage: parse_usage(Map.get(body, "usageMetadata", %{})),
          raw: body,
          metadata: %{
            error: %{reason: "blocked:#{reason}"},
            model_version: Map.get(body, "modelVersion")
          }
        }

      _ ->
        # Decision #10 — empty candidates with no block reason is malformed.
        throw({:malformed_response, :empty_candidates_no_block_reason})
    end
  end

  defp response_metadata(body) do
    case Map.get(body, "modelVersion") do
      nil -> %{}
      version -> %{model_version: version}
    end
  end

  @doc """
  Map a Gemini `finishReason` string to ALLM's closed
  `Response.finish_reason` enum, returning `{atom, raw_string_or_nil}`
  per Decision #14.

  `STOP` collapses to `{:stop, nil}` (the canonical "natural completion"
  row); every other row preserves the raw string at index 1 so callers
  can recover provider fidelity from `Response.raw_finish_reason`.

  ## Examples

      iex> ALLM.Providers.Gemini.parse_finish_reason("STOP")
      {:stop, nil}

      iex> ALLM.Providers.Gemini.parse_finish_reason("MAX_TOKENS")
      {:length, "MAX_TOKENS"}

      iex> ALLM.Providers.Gemini.parse_finish_reason("SAFETY")
      {:content_filter, "SAFETY"}

      iex> ALLM.Providers.Gemini.parse_finish_reason("OTHER")
      {:other, "OTHER"}

      iex> ALLM.Providers.Gemini.parse_finish_reason(nil)
      {nil, nil}
  """
  @spec parse_finish_reason(String.t() | nil) ::
          {Response.finish_reason() | nil, String.t() | nil}
  def parse_finish_reason(nil), do: {nil, nil}
  def parse_finish_reason("STOP"), do: {:stop, nil}
  def parse_finish_reason("MAX_TOKENS"), do: {:length, "MAX_TOKENS"}
  def parse_finish_reason("SAFETY"), do: {:content_filter, "SAFETY"}
  def parse_finish_reason("RECITATION"), do: {:content_filter, "RECITATION"}
  def parse_finish_reason("LANGUAGE"), do: {:content_filter, "LANGUAGE"}
  def parse_finish_reason("BLOCKLIST"), do: {:content_filter, "BLOCKLIST"}

  def parse_finish_reason("PROHIBITED_CONTENT"),
    do: {:content_filter, "PROHIBITED_CONTENT"}

  def parse_finish_reason("SPII"), do: {:content_filter, "SPII"}
  def parse_finish_reason("IMAGE_SAFETY"), do: {:content_filter, "IMAGE_SAFETY"}

  def parse_finish_reason("IMAGE_PROHIBITED_CONTENT"),
    do: {:content_filter, "IMAGE_PROHIBITED_CONTENT"}

  def parse_finish_reason("IMAGE_RECITATION"),
    do: {:content_filter, "IMAGE_RECITATION"}

  def parse_finish_reason("IMAGE_OTHER"), do: {:other, "IMAGE_OTHER"}
  def parse_finish_reason("NO_IMAGE"), do: {:other, "NO_IMAGE"}

  def parse_finish_reason("MALFORMED_FUNCTION_CALL"),
    do: {:error, "MALFORMED_FUNCTION_CALL"}

  def parse_finish_reason("UNEXPECTED_TOOL_CALL"),
    do: {:error, "UNEXPECTED_TOOL_CALL"}

  def parse_finish_reason("TOO_MANY_TOOL_CALLS"),
    do: {:error, "TOO_MANY_TOOL_CALLS"}

  def parse_finish_reason("MISSING_THOUGHT_SIGNATURE"),
    do: {:error, "MISSING_THOUGHT_SIGNATURE"}

  def parse_finish_reason("MALFORMED_RESPONSE"),
    do: {:error, "MALFORMED_RESPONSE"}

  def parse_finish_reason(other) when is_binary(other), do: {:other, other}

  @doc false
  @spec parse_usage(map()) :: Usage.t()
  def parse_usage(%{} = usage) do
    input = Map.get(usage, "promptTokenCount")
    output = output_tokens(usage)
    total = Map.get(usage, "totalTokenCount")

    %Usage{
      input_tokens: input,
      output_tokens: output,
      total_tokens: total,
      extra:
        Map.drop(usage, [
          "promptTokenCount",
          "candidatesTokenCount",
          "responseTokenCount",
          "totalTokenCount"
        ])
    }
  end

  def parse_usage(_), do: %Usage{}

  defp output_tokens(usage) do
    case Map.get(usage, "candidatesTokenCount") do
      n when is_integer(n) -> n
      _ -> output_tokens_fallback(Map.get(usage, "responseTokenCount"))
    end
  end

  defp output_tokens_fallback(n) when is_integer(n) do
    Logger.warning(fn ->
      "ALLM.Providers.Gemini: usageMetadata.candidatesTokenCount absent; " <>
        "falling back to responseTokenCount per Decision #11."
    end)

    n
  end

  defp output_tokens_fallback(_), do: nil

  # ---------------------------------------------------------------------------
  # Streaming (Phase 16.2 — Decision #1, #11, #12, #13, #14)
  # ---------------------------------------------------------------------------

  @impl ALLM.StreamAdapter
  @doc """
  Open an SSE stream against `streamGenerateContent?alt=sse`.

  Returns `{:ok, enumerable}` on success — the enumerable is lazy; the
  HTTP request fires on the first reduce. Returns `{:error, %AdapterError{}}`
  only for synchronous pre-flight failures (key-resolution failure raises
  `%EngineError{}` directly per the `Keys.fetch!/2` contract; that is
  surfaced through the existing `with`-chain at the call site).

  Per CLAUDE.md mid-stream-error invariant, **HTTP-shaped errors observed
  AFTER the consumer starts reducing are folded into a terminating
  `{:error, _}` event** — the call-site tuple stays `{:ok, stream}`. This
  includes 4xx status codes received before the first SSE event (the
  `{:status, code}` Finch frame folds via `handle_finch_payload/2`).

  ## Decision references

    * **Decision #1** — request body byte-equal to `generate/2`'s. Only
      the URL path differs (`:streamGenerateContent?alt=sse` vs
      `:generateContent`).
    * **Decision #3** — `?alt=sse` is the ONLY required query parameter;
      auth still flows via `x-goog-api-key`.
    * **Decision #12** — `usageMetadata` may appear on intermediate
      chunks; the chunk-mapper emits `{:raw_chunk, {:usage, _}}` on
      every appearance and `StreamCollector.apply_event/2` overwrites.
    * **Decision #13** — stream terminates on Finch's `:done` payload,
      not a `data: [DONE]` lookahead. The synthetic `:message_completed`
      event is built from accumulated state.

  ## Options

    * `:stream_timeout` (default 60_000 ms) — receive-loop after-clause
      between chunks.
    * `:finch_module` (default `Finch`) — test injection seam.
    * `:finch_name` (default `ALLM.Finch`).
    * `:finch_stub_ref` — opaque ref forwarded to the Finch shim
      (used only by `ALLM.Test.FinchStub`).
    * `:adapter_opts[:endpoint]` — endpoint override (testing).
  """
  @spec stream(Request.t(), keyword()) ::
          {:ok, Enumerable.t()} | {:error, AdapterError.t() | ValidationError.t()}
  def stream(%Request{} = request, opts) when is_list(opts) do
    with :ok <- reject_image_in_system_messages(request),
         :ok <- reject_unsupported_image_sources(request) do
      do_stream(request, opts)
    end
  end

  defp do_stream(%Request{} = request, opts) do
    api_key = Keys.fetch!(:gemini, opts)
    body = to_gemini_request_body(request, opts)
    json_body = Jason.encode!(body)
    headers = GeminiHeaders.headers(api_key)

    url =
      endpoint(opts) <>
        "/models/#{request.model}:streamGenerateContent?alt=sse"

    finch_request = Finch.build(:post, url, headers, json_body)

    finch_module = Keyword.get(opts, :finch_module, Finch)
    finch_name = Keyword.get(opts, :finch_name, ALLM.Finch)
    finch_extra_opts = Keyword.take(opts, [:finch_stub_ref])
    stream_timeout = Keyword.get(opts, :stream_timeout, @default_stream_timeout)

    enumerable =
      Stream.resource(
        fn ->
          stream_start_fun(finch_request, finch_module, finch_name, finch_extra_opts)
        end,
        fn state -> stream_next_fun(state, stream_timeout) end,
        fn state -> stream_after_fun(state, finch_module) end
      )

    {:ok, enumerable}
  end

  defp stream_start_fun(finch_request, finch_module, finch_name, finch_extra_opts) do
    ref = finch_module.async_request(finch_request, finch_name, finch_extra_opts)
    bookend_msg = %Message{role: :assistant, content: ""}

    %{
      ref: ref,
      finch_module: finch_module,
      sse_acc: SSE.new(),
      buffered: [{:message_started, %{message: bookend_msg}}],
      done: false,
      status: nil,
      message_completed_emitted?: false,
      accumulated_text: "",
      tool_calls_by_id: %{},
      tool_call_order: [],
      finish_reason: nil,
      raw_finish_reason: nil
    }
  end

  # next_fun: drain buffered events first; then pull one Finch payload.
  # On terminal events we set state.done = true so after_fun skips cancel.
  defp stream_next_fun(%{buffered: [event | rest]} = state, _timeout) do
    {[event], %{state | buffered: rest}}
  end

  defp stream_next_fun(%{done: true} = state, _timeout), do: {:halt, state}

  defp stream_next_fun(%{ref: ref} = state, timeout) do
    receive do
      {^ref, payload} -> handle_finch_payload(state, payload)
    after
      timeout ->
        finalize_with_error(state, :timeout, "stream_timeout exceeded between events")
    end
  end

  # ---------------------------------------------------------------------------
  # Finch payload dispatch — one defp per payload class.
  # ---------------------------------------------------------------------------

  defp handle_finch_payload(state, {:status, code}) when code in 200..299 do
    {[], %{state | status: code}}
  end

  defp handle_finch_payload(state, {:status, code}) when is_integer(code) and code >= 400 do
    err = classify_error(code, %{}, [])
    finalize_with_event(state, {:error, err})
  end

  defp handle_finch_payload(state, {:headers, _headers}), do: {[], state}

  defp handle_finch_payload(state, {:data, chunk}) when is_binary(chunk) do
    case decode_sse_chunk(state, chunk) do
      {:ok, events, new_state} -> {events, new_state}
      {:terminal, events, new_state} -> {events, %{new_state | done: true}}
    end
  end

  defp handle_finch_payload(state, :done) do
    state = %{state | done: true}

    if state.message_completed_emitted? do
      {:halt, state}
    else
      events = synthesize_message_completed(state)
      {events, %{state | message_completed_emitted?: true}}
    end
  end

  defp handle_finch_payload(state, {:error, exception}) do
    err =
      AdapterError.new(:network_error,
        provider: :gemini,
        message: "transport failure: " <> Exception.message(exception),
        cause: exception
      )

    finalize_with_event(state, {:error, err})
  end

  defp handle_finch_payload(state, _other), do: {[], state}

  defp finalize_with_error(state, reason, message) do
    err = AdapterError.new(reason, provider: :gemini, message: message)
    finalize_with_event(state, {:error, err})
  end

  defp finalize_with_event(state, event) do
    {[event], %{state | done: true}}
  end

  # Decode one Finch :data chunk through the line-buffered SSE decoder,
  # then map each parsed message through chunk_to_events/2.
  defp decode_sse_chunk(state, chunk) do
    {messages, new_acc} = SSE.decode_chunk(state.sse_acc, chunk)
    state = %{state | sse_acc: new_acc}
    {events, terminal?, new_state} = messages_to_events(messages, state)
    if terminal?, do: {:terminal, events, new_state}, else: {:ok, events, new_state}
  end

  defp messages_to_events(messages, state) do
    Enum.reduce_while(messages, {[], false, state}, fn msg, {events_acc, _term?, st} ->
      case message_to_events(msg, st) do
        {events, false, new_st} -> {:cont, {events_acc ++ events, false, new_st}}
        {events, true, new_st} -> {:halt, {events_acc ++ events, true, new_st}}
      end
    end)
  end

  # Gemini SSE does NOT use the [DONE] sentinel; the SSE decoder still
  # emits :done for any `data: [DONE]` payload (defensive carve-over from
  # the OpenAI-side support module). When it appears, treat it as a
  # terminal but emit no events — Decision #13 says termination is on
  # connection close, not on a `[DONE]` lookahead, so message_to_events
  # halts but synthesize_message_completed fires later via the Finch
  # `:done` payload path.
  defp message_to_events(:done, state), do: {[], true, %{state | done: true}}

  defp message_to_events(%{data: data}, state) when is_binary(data) do
    case Jason.decode(data) do
      {:ok, %{} = decoded} -> chunk_to_events(decoded, state)
      _ -> malformed_event_response(state, data)
    end
  end

  defp malformed_event_response(state, data) do
    err = StreamError.new(:malformed_event, message: "could not parse SSE data: #{inspect(data)}")
    {[{:error, err}], true, %{state | done: true}}
  end

  # ---------------------------------------------------------------------------
  # Chunk-to-event mapping — one defp clause per top-level event class
  # (per CLAUDE.md "SSE chunk mappers" rule). Dispatch on the presence of
  # `promptFeedback.blockReason` (terminal content-filter), then on
  # `candidates` and `usageMetadata` keys. Returns
  # `{events, terminal?, new_state}`.
  # ---------------------------------------------------------------------------

  @doc false
  def chunk_to_events(decoded, state) do
    blocked = get_in(decoded, ["promptFeedback", "blockReason"])

    if is_binary(blocked) do
      handle_prompt_blocked(blocked, state)
    else
      handle_candidates_and_usage(decoded, state)
    end
  end

  # Decision #9 streaming variant — promptFeedback.blockReason on a chunk
  # surfaces as a terminating {:error, %AdapterError{:content_filter}}
  # event. Note: the non-streaming arm at decode_empty_candidates/1
  # treats the same wire-shape as :ok with finish_reason: :content_filter
  # because the HTTP request succeeded; on the streaming wire the same
  # signal arrives as a separate SSE event AFTER the consumer started
  # reducing, so per StreamAdapter Invariant 2 it folds to a terminal
  # error event (mirrors OpenAI Chat Completions content_filter handling
  # at lib/allm/providers/openai.ex:1287-1296). The mid-stream-error
  # fold invariant means the call-site tuple stays {:ok, stream}.
  defp handle_prompt_blocked(reason, state) do
    err =
      AdapterError.new(:content_filter,
        provider: :gemini,
        message: "Gemini blocked the prompt: #{reason}",
        metadata: %{block_reason: reason}
      )

    {[{:error, err}], true, %{state | done: true}}
  end

  # Walk the parsed chunk for candidates and usageMetadata, emitting one
  # event group per source. Updates state with accumulated text /
  # tool-calls / finish_reason. Per Decision #12, usage emits whenever it
  # appears (intermediate or terminal) and the StreamCollector overwrites.
  defp handle_candidates_and_usage(decoded, state) do
    {cand_events, cand_terminal?, state} = handle_candidates(decoded, state)
    {usage_events, state} = handle_usage_metadata(decoded, state)

    events = cand_events ++ usage_events

    if cand_terminal? do
      completion = synthesize_message_completed(state)
      state = %{state | done: true, message_completed_emitted?: true}
      {events ++ completion, true, state}
    else
      {events, false, state}
    end
  end

  defp handle_candidates(decoded, state) do
    case decoded["candidates"] do
      [first | _] when is_map(first) -> handle_candidate(first, state)
      _ -> {[], false, state}
    end
  end

  # Walk the parts of the first candidate. Each part dispatches to a
  # dedicated handler (text vs functionCall). The candidate's finishReason
  # — when present — promotes to state.finish_reason and signals terminal
  # so the next-fun synthesizes :message_completed.
  defp handle_candidate(candidate, state) do
    parts = get_in(candidate, ["content", "parts"]) || []

    {events, state} =
      Enum.reduce(parts, {[], state}, fn part, {ev_acc, st} ->
        {ev, st2} = handle_part(part, st)
        {ev_acc ++ ev, st2}
      end)

    case candidate["finishReason"] do
      nil ->
        {events, false, state}

      raw when is_binary(raw) ->
        {fr, raw_keep} = parse_finish_reason(raw)
        state = %{state | finish_reason: fr, raw_finish_reason: raw_keep}
        {events, true, state}
    end
  end

  # ---------------------------------------------------------------------------
  # Per-part handlers (one defp per part-class)
  # ---------------------------------------------------------------------------

  defp handle_part(%{"text" => text}, state) when is_binary(text) and text != "" do
    events = [{:text_delta, %{id: nil, delta: text}}]
    {events, %{state | accumulated_text: state.accumulated_text <> text}}
  end

  defp handle_part(%{"functionCall" => %{} = fc} = part, state) do
    handle_function_call_part(fc, part, state)
  end

  defp handle_part(_other, state), do: {[], state}

  # functionCall parts arrive in a single SSE event (Decision-table row in
  # GEMINI_DESIGN.md "Tool-call delta-field name (streaming)"). The adapter
  # emits :tool_call_started + :tool_call_completed with zero deltas to
  # match the spec §8 event protocol — downstream collectors (StreamCollector
  # + Session) fold the same shape regardless of provider.
  defp handle_function_call_part(%{} = fc, part, state) do
    name = function_call_name(fc)
    args_map = normalize_function_call_args(Map.get(fc, "args"))
    raw_args = encode_function_call_args(args_map)
    id = synthesize_tool_call_id(fc)
    metadata = thought_signature_metadata(part)

    events = build_function_call_events(id, name, args_map, raw_args, fc, metadata)

    partial = %{
      id: id,
      name: name,
      arguments: args_map,
      raw_arguments: raw_args,
      metadata: metadata
    }

    state = %{
      state
      | tool_calls_by_id: Map.put(state.tool_calls_by_id, id, partial),
        tool_call_order: state.tool_call_order ++ [id]
    }

    {events, state}
  end

  # Gemini 3 emits `thoughtSignature` as a sibling field to `functionCall`
  # on the same part. Required to be echoed back on subsequent turns.
  defp thought_signature_metadata(%{"thoughtSignature" => sig}) when is_binary(sig) and sig != "" do
    %{"thoughtSignature" => sig}
  end

  defp thought_signature_metadata(_part), do: %{}

  defp function_call_name(%{"name" => name}) when is_binary(name), do: name
  defp function_call_name(_), do: ""

  defp build_function_call_events(id, name, args_map, raw_args, _fc, metadata)
       when is_binary(id) and id != "" and is_binary(name) and name != "" do
    [
      {:tool_call_started, %{id: id, name: name}},
      {:tool_call_completed,
       %{id: id, name: name, arguments: args_map, raw_arguments: raw_args, metadata: metadata}}
    ]
  end

  defp build_function_call_events(_id, _name, _args_map, _raw_args, fc, _metadata) do
    Logger.debug(fn ->
      "ALLM.Providers.Gemini: dropping malformed functionCall part #{inspect(fc)}"
    end)

    []
  end

  # Gemini's `functionCall.args` is documented as a JSON object (not a
  # string — Decision #4). Real wire shapes have only ever surfaced maps;
  # this defensive coercion drops nil and non-map values to `%{}` so the
  # downstream encoder always sees a clean JSON-encodable map.
  defp normalize_function_call_args(args) when is_map(args), do: args
  defp normalize_function_call_args(_), do: %{}

  defp encode_function_call_args(map) when is_map(map) do
    case Jason.encode(map) do
      {:ok, json} -> json
      _ -> ""
    end
  end

  # Gemini's `functionCall.id` is documented as optional. When absent we
  # synthesize a deterministic id from the name + args hash so collectors
  # don't see an empty-id event. Mirrors Anthropic's id-or-generate
  # behaviour at lib/allm/providers/anthropic.ex:1056-1067.
  defp synthesize_tool_call_id(%{"id" => id}) when is_binary(id) and id != "", do: id

  defp synthesize_tool_call_id(%{"name" => name} = fc) when is_binary(name) do
    args = Map.get(fc, "args") || %{}
    "fc_" <> Integer.to_string(:erlang.phash2({name, args}), 16)
  end

  defp synthesize_tool_call_id(_), do: ""

  # ---------------------------------------------------------------------------
  # usageMetadata handler (Decision #12)
  # ---------------------------------------------------------------------------

  defp handle_usage_metadata(%{"usageMetadata" => %{} = um}, state) do
    usage = parse_usage(um)

    pre_mapped = %{
      input_tokens: usage.input_tokens,
      output_tokens: usage.output_tokens,
      total_tokens: usage.total_tokens,
      extra: usage.extra
    }

    {[{:raw_chunk, {:usage, pre_mapped}}], state}
  end

  defp handle_usage_metadata(_other, state), do: {[], state}

  # ---------------------------------------------------------------------------
  # Synthetic :message_completed on connection close (Decision #13)
  # ---------------------------------------------------------------------------

  defp synthesize_message_completed(state) do
    tool_calls = build_tool_calls_from_state(state)
    finish_reason = promote_finish_reason(state.finish_reason, tool_calls)
    metadata = build_completion_metadata(tool_calls)

    msg = %Message{
      role: :assistant,
      content: state.accumulated_text,
      metadata: metadata
    }

    [{:message_completed, %{message: msg, finish_reason: finish_reason, metadata: metadata}}]
  end

  defp build_completion_metadata([]), do: %{}
  defp build_completion_metadata([_ | _] = tool_calls), do: %{tool_calls: tool_calls}

  defp build_tool_calls_from_state(%{tool_call_order: order, tool_calls_by_id: by_id}) do
    Enum.map(order, fn id ->
      partial = Map.fetch!(by_id, id)

      ToolCall.new(
        id: partial.id,
        name: partial.name,
        arguments: partial.arguments,
        raw_arguments: partial.raw_arguments,
        metadata: Map.get(partial, :metadata, %{})
      )
    end)
  end

  # finish_reason :stop + non-empty tool_calls promotes to :tool_calls
  # (matches Anthropic precedent at lib/allm/providers/anthropic.ex:1879).
  defp promote_finish_reason(:stop, [_ | _]), do: :tool_calls
  defp promote_finish_reason(other, _tool_calls), do: other

  # after_fun: cancel only when state.done == false. Defensive rescue —
  # cancel_async_request/1 may raise if the ref already completed.
  defp stream_after_fun(%{done: true}, _finch_module), do: :ok

  defp stream_after_fun(%{ref: ref, finch_module: finch_module}, _) do
    finch_module.cancel_async_request(ref)
    :ok
  rescue
    _ -> :ok
  end
end