Skip to main content

lib/amarula/protocol/messages/conversation_sender.ex

defmodule Amarula.Protocol.Messages.ConversationSender do
  @moduledoc """
  Per-recipient send process. One `ConversationSender` exists per recipient JID;
  all sends to that recipient funnel through it and run **one at a time**.
  Different recipients run in parallel under the DynamicSupervisor.

  ## Why a process per recipient — it is a *lock*, not a cache

  The Sender holds **no state of its own** — not even the ratchet. Encrypting a
  message is a `load → advance → store` against the **shared** Signal session in
  Storage (see `encrypt_for_device/2`): load the session record, advance the
  ratchet (pure), write the advanced record back. That read-modify-write is **not
  atomic**. If two sends to the *same* recipient ran concurrently, both could load
  the same record, advance from the same point, and store — a lost update that
  **forks the ratchet** and corrupts the session.

  So the Sender exists to **serialize** that read-modify-write, not to hold it. Its
  mailbox is the lock: cast #2's `load` can't begin until cast #1's `store` has
  finished. One process per recipient gives exactly the right granularity — serial
  *within* a recipient (ratchet-safe), parallel *across* recipients (throughput).
  This is why a bare `Task` per send would be wrong (no per-recipient mutual
  exclusion → forked ratchets) and why a single shared process would be wrong (no
  cross-recipient parallelism). Because it holds nothing, a Sender is cheap to lose
  and respawn, and current credentials are handed to it per send (creds mutate
  after login, so a cached snapshot would encrypt stale).

  A send is a branchless pipe of `ctx -> ctx` steps that block on IQ round-trips
  through `Connection` (the sole websocket owner):

      ctx
      |> resolve_devices()   # device-list cache, else a USync query
      |> ensure_sessions()   # session files, else a prekey-bundle fetch
      |> encrypt()           # per device; plain vs DSM; advance ratchet
      |> relay()             # frame + send the <participants> stanza

  Each step that needs server data calls `Connection.query_iq/2`, which
  blocks until the matching websocket reply arrives. A step failure crashes the
  process (the DynamicSupervisor reaps it); the pipe carries no error branches.

  ## Lifecycle & registry presence

  **Identity.** A sender's identity *is* its `{instance_id, recipient_jid}` pair.
  It is registered in the app-level `Amarula.InstanceRegistry` under that key
  (namespaced by `instance_id` so two connections don't collide on a shared
  recipient) — so at most one sender per recipient per connection exists at a
  time, and `deliver/2` is a find-or-start on that key.

  **Birth (lazy).** Started on the first `deliver/2` to a recipient with no live
  sender: find-or-start via `Registry.lookup` → else `DynamicSupervisor.start_child`.
  The `{:error, {:already_started, pid}}` branch makes the start race-safe even
  though, in practice, only `Connection` calls `deliver` (single process, so no
  concurrent start for the same recipient).

  **Registration.** Automatic, via the `:via` name in `start_link` — the Registry
  registers the pid on start and **auto-unregisters it on death** (it monitors the
  pid). So a dead sender's key vanishes; there are never stale registry entries to
  reap.

  **Life.** Serializes all sends to its recipient — one pipe at a time, so the
  ratchet's load-modify-store can't interleave (see the lock note above).
  Different recipients run in parallel. Holds no durable state: sessions/keys live
  in Storage, the consumer's `from` is parked in `Connection`. So a sender is cheap
  to lose and cheap to respawn.

  **Death (three ways).**
    1. *Idle.* Each `:send` re-arms an idle timer (`idle_ms`, default 1s,
       overridable via `config[:sender_idle_ms]`); after that long with no further
       send it `{:stop, :normal}`s. It carries no durable state, so lingering buys
       only warm reuse — a quick follow-up to the same recipient skips a respawn +
       a session re-read from Storage. The short default keeps a fan-out to N
       one-shot recipients from leaving a long-lived process tail; a disk-backed
       store may want a larger value to cut re-reads under bursty traffic.
    2. *Crash.* A raise in the pipe (Signal error, USync blowup, bad bundle) kills
       it. `restart: :temporary` ⇒ the supervisor does NOT restart it; the next
       `deliver` respawns a fresh one. In-flight + queued sends are lost.
    3. *Shutdown.* The connection's supervision tree going down takes it too.
    All three auto-unregister the Registry key (the Registry's pid monitor).

  **Rebirth.** The next `deliver/2` to that recipient starts a fresh sender — no
  carried state; it re-reads sessions from Storage.

  **Crash ⇒ parked-send recovery.** Because the consumer's `from` lives in
  `Connection` (not in the dying sender), `Connection` monitors each sender and,
  on its `:DOWN`, fails every parked send for that recipient with
  `{:error, {:sender_crashed, reason}}` — promptly, instead of letting the caller
  hang to the ack-timeout. A `:normal` idle-stop fails nothing (no in-flight
  sends). See `Amarula.Connection` (`ensure_sender_monitor` / the `:DOWN` handler)
  and `docs/plans/SENDER_CRASH_FIX.plan.md`.
  """

  use GenServer, restart: :temporary
  require Logger

  alias Amarula.Protocol.Binary.{JID, Node}
  alias Amarula.Protocol.Crypto.Constants
  alias Amarula.Protocol.Groups.Metadata, as: GroupMetadata
  alias Amarula.Protocol.Messages.{MessageContent, MessageEncoder, Relay}
  alias Amarula.Protocol.Proto

  alias Amarula.Protocol.Signal.{
    DeviceListCache,
    LidMappingFileStore,
    SessionCipher,
    SessionInjector,
    SessionStore
  }

  alias Amarula.Protocol.Signal.Group.{
    GroupCipher,
    GroupSessionBuilder,
    SenderKeyName,
    SenderKeyStore
  }

  alias Amarula.Connection
  alias Amarula.Protocol.USync

  # Idle linger before a drained sender stops. Short (1s) so a fan-out to N
  # one-shot recipients sheds its processes within a second, while still letting a
  # quick follow-up to the same recipient reuse the warm sender (skipping a
  # respawn + a re-read of the session from Storage). The sender holds no durable
  # state, so stopping and respawning is cheap.
  @idle_timeout_ms 1_000

  # --- API ---

  @doc """
  Hand a message to the recipient's sender (start-or-lookup), asynchronously. The
  send runs on the per-recipient process — serialized per recipient, parallel
  across recipients — so the CALLING process (Connection) is not blocked.

  `msg` carries `:msg_id`. The sender does NOT reply the consumer: it runs the
  pipe and, **only on failure**, reports back to `Connection` (`state.cm`), which
  owns the parked consumer `from`:

    * relay succeeded (frame written) → the sender reports **nothing**. Connection
      already parked the `from` and armed an ack-timeout at dispatch, so it simply
      awaits the server's `<ack>` (which resolves the caller) — a "frame went out"
      signal would be inert, so we don't send one.
    * pipe failed (not_on_whatsapp / IQ timeout / encrypt error / plugin halt)
      → `{:send_failed, msg_id, reason}`. No frame went out, so no `<ack>` will
      ever come; Connection replies the parked caller the failure immediately
      (instead of letting it hang to the ack-timeout).

  Returns `{:ok, pid}` — the (started or reused) sender pid, so Connection can
  monitor it and fail the recipient's parked sends if it crashes mid-pipe — or
  `{:error, reason}` if the sender could not be started (e.g. `:max_children`).
  A start failure is a recoverable send failure: Connection maps it to a
  `{:send_failed, msg_id, reason}` for the parked caller rather than crashing.
  """
  @spec deliver(keyword(), map()) :: {:ok, pid()} | {:error, term()}
  def deliver(opts, msg) do
    registry = Keyword.fetch!(opts, :registry)

    with {:ok, pid} <- find_or_start(registry, opts) do
      GenServer.cast(pid, {:send, msg})
      {:ok, pid}
    end
  end

  defp find_or_start(registry, opts) do
    case Registry.lookup(registry, sender_key(opts)) do
      [{pid, _}] -> {:ok, pid}
      [] -> start_child(opts)
    end
  end

  # Sender identity in the (shared, app-level) registry: namespaced by the
  # connection's `instance_id` so two connections sending to the same recipient
  # don't collide on one key.
  defp sender_key(opts) do
    {Keyword.fetch!(opts, :instance_id), Keyword.fetch!(opts, :recipient_jid)}
  end

  # Normalize every DynamicSupervisor.start_child/2 outcome to a tagged tuple.
  # A lost race (:already_started) is success — reuse the live sender. Anything
  # else ({:error, reason} | :ignore | {:error, :max_children}) is a recoverable
  # start failure, surfaced as {:error, reason} so the send fails cleanly instead
  # of raising a CaseClauseError inside Connection.
  defp start_child(opts) do
    spec = {__MODULE__, opts}

    case DynamicSupervisor.start_child(Keyword.fetch!(opts, :supervisor), spec) do
      {:ok, pid} -> {:ok, pid}
      {:error, {:already_started, pid}} -> {:ok, pid}
      :ignore -> {:error, :sender_start_ignored}
      {:error, reason} -> {:error, reason}
    end
  end

  def start_link(opts) do
    registry = Keyword.fetch!(opts, :registry)
    GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {registry, sender_key(opts)}})
  end

  # --- GenServer ---

  @impl true
  def init(opts) do
    conn = Keyword.fetch!(opts, :conn)

    state = %{
      recipient_jid: Keyword.fetch!(opts, :recipient_jid),
      cm: Keyword.fetch!(opts, :cm),
      conn: conn,
      creds: Keyword.fetch!(opts, :creds),
      idle_ms: idle_ms(conn)
    }

    {:ok, state, state.idle_ms}
  end

  # Idle linger, overridable per connection via `config[:sender_idle_ms]`
  # (defaults to @idle_timeout_ms). A larger value keeps senders warm longer —
  # useful with a disk-backed session store to avoid re-reading the ratchet on
  # bursty traffic; a smaller one sheds processes faster after a fan-out.
  defp idle_ms(%{config: %{sender_idle_ms: ms}}) when is_integer(ms) and ms >= 0, do: ms
  defp idle_ms(_conn), do: @idle_timeout_ms

  @impl true
  def handle_cast({:send, %{msg_id: msg_id} = msg}, state) do
    # Report ONLY a failure back to Connection — NOT the consumer, and NOT on
    # success. Connection holds the parked `from` (under msg_id) and an armed
    # ack-timeout: a relayed frame just awaits the server <ack>, so "frame went
    # out" needs no message. A failure means no frame went out (no ack will come),
    # so Connection must reply the parked caller the failure immediately.
    case report(run_send(msg, state), msg_id) do
      :ok -> :noop
      failure -> send(state.cm, failure)
    end

    # Re-arm the idle timer: stay warm `state.idle_ms` for a quick follow-up to
    # this recipient, then stop (`handle_info(:timeout, …)`). While sends keep
    # arriving, each cast resets the timer, so a busy recipient never idles out.
    {:noreply, state, state.idle_ms}
  end

  @impl true
  def handle_info(:timeout, state) do
    Logger.debug("ConversationSender for #{state.recipient_jid} idle — stopping")
    {:stop, :normal, state}
  end

  # Map the pipe result to what Connection needs: nothing on success (`:ok`),
  # a {:send_failed, …} message on failure.
  defp report(:ok, _msg_id), do: :ok
  defp report({:error, reason}, msg_id), do: {:send_failed, msg_id, reason}
  defp report({:halted, reason}, msg_id), do: {:send_failed, msg_id, {:halted, reason}}

  # --- the send pipe ---

  defp run_send(%{msg_id: msg_id} = msg, state) do
    jid = state.recipient_jid
    kind = if JID.jid_group?(jid), do: :group, else: :dm
    Logger.debug("Sending #{msg_id} to #{jid} (#{kind})")

    # Run the send plugin pipeline (before encrypt): steps may transform the
    # message or halt the send. The built-in retry-cache step records it here.
    stanza_attrs = Map.get(msg, :stanza_attrs, %{})

    case run_send_steps(state.conn, msg_id, jid, message_content(msg), stanza_attrs) do
      {:halt, reason} ->
        Logger.debug("Send #{msg_id} to #{jid} halted by a plugin: #{inspect(reason)}")
        {:halted, reason}

      {:cont, %{message: message}} ->
        store_own_lid_mapping(state)
        do_send(state, msg_id, jid, kind, message, stanza_attrs)
    end
  end

  # The plugin send pipeline. ctx carries the message + addressing + the scopes a
  # step needs (e.g. the retry cache). `stanza_attrs` rides along so the retry
  # cache can replay a peer/edit stanza verbatim on a retry-receipt resend.
  # Returns {:cont, ctx} (possibly transformed) or {:halt, reason}.
  defp run_send_steps(conn, msg_id, jid, message, stanza_attrs) do
    ctx = %{
      message: message,
      to: jid,
      profile: conn.profile,
      msg_id: msg_id,
      stanza_attrs: stanza_attrs,
      retry_cache: conn.retry_cache
    }

    Amarula.Plugin.run(conn.send_steps, ctx)
  end

  defp do_send(state, msg_id, jid, kind, message, stanza_attrs) do
    ctx = %{
      cm: state.cm,
      conn: state.conn,
      creds: state.creds,
      kind: kind,
      msg_id: msg_id,
      target_jid: jid,
      message: message,
      # Extra <message> stanza attrs (e.g. category/push_priority for a peer
      # message). Empty for a normal send.
      stanza_attrs: stanza_attrs,
      # The `edit` attr on the <message> stanza, required for delete/edit/pin
      # (Baileys messages-send.ts): "7" delete-for-everyone, "1" edit. nil = none.
      edit_attr: edit_attr(message),
      devices: [],
      participants: [],
      addressing_mode: nil,
      skmsg: nil
    }

    profile = state.conn.profile
    {media?, media_kind, bytes} = media_stats(message)

    # Telemetry span around the whole send. :stop carries duration + byte size +
    # media kind; :exception carries the failing stage. (Privacy: counts/kinds
    # only — no jid/content.)
    Amarula.Telemetry.span(
      [:amarula, :send],
      profile,
      %{kind: kind, media?: media?, media_kind: media_kind},
      fn -> {run_pipe(ctx, msg_id, jid, profile), %{bytes: bytes}} end
    )
  end

  # Each stage returns {:ok, ctx} or {:error, {stage, reason}}; `with` threads the
  # happy path and stops at the first failure. A recoverable failure (e.g. a
  # timed-out IQ, an unreachable recipient) is logged and returned; unexpected
  # errors still crash the (disposable) process.
  defp run_pipe(ctx, msg_id, jid, profile) do
    with {:ok, ctx} <- resolve_devices(ctx),
         {:ok, ctx} <- ensure_sessions(ctx),
         {:ok, ctx} <- encrypt(ctx),
         :ok <- relay(ctx) do
      :ok
    else
      {:error, {:resolve_devices, :not_on_whatsapp} = stage_reason} ->
        Amarula.Telemetry.emit([:amarula, :send, :not_on_whatsapp], profile)
        log_drop(msg_id, jid, stage_reason)

      {:error, {_stage, _reason} = stage_reason} ->
        log_drop(msg_id, jid, stage_reason)
    end
  end

  defp log_drop(msg_id, jid, {stage, reason}) do
    Logger.error("Send #{msg_id} to #{jid} dropped at #{stage}: #{inspect(reason)}")
    {:error, reason}
  end

  # Byte size + media kind of an outgoing message, for telemetry. Media protos
  # carry fileLength; text/other report 0 bytes and media? = false.
  defp media_stats(%Proto.Message{} = message) do
    case MessageContent.classify(message) do
      {:media, kind, m} -> {true, kind, Map.get(m, :fileLength) || 0}
      _ -> {false, nil, 0}
    end
  end

  # A send carries either a ready %Proto.Message{} (:message) or :text shorthand.
  defp message_content(%{message: %Proto.Message{} = m}), do: m
  defp message_content(%{text: text}) when is_binary(text), do: MessageEncoder.text(text)

  # The `edit` stanza attr the server needs to apply delete/edit (Baileys). A
  # plain message has none. Delete-for-everyone (our own) = "7"; MESSAGE_EDIT = "1".
  defp edit_attr(%Proto.Message{protocolMessage: %{type: :REVOKE}}), do: "7"
  defp edit_attr(%Proto.Message{protocolMessage: %{type: :MESSAGE_EDIT}}), do: "1"
  defp edit_attr(_message), do: nil

  # Persist our own PN↔LID mapping so own-device bundle fetches resolve to the
  # LID wire jid (the server won't serve our own PN bundle).
  defp store_own_lid_mapping(%{creds: %{me: %{id: id, lid: lid}}} = state)
       when is_binary(id) and is_binary(lid) do
    LidMappingFileStore.store_mappings(state.conn, [{lid, id}])
  end

  defp store_own_lid_mapping(_state), do: :ok

  # Step 1: resolve the device set to encrypt for. DM = the recipient's (and our
  # own) devices. Group = every participant's (and our own) devices, after a
  # group-metadata fetch. Either way the result is a flat list of device maps.
  defp resolve_devices(%{kind: :group} = ctx) do
    with {:ok, devices, addressing_mode} <- group_devices(ctx) do
      {:ok, %{ctx | devices: devices, addressing_mode: addressing_mode}}
    end
  end

  defp resolve_devices(%{kind: :dm} = ctx) do
    with {:ok, devices} <- user_devices(ctx, [ctx.target_jid | own_id_list(ctx)]) do
      # A peer-category send (PEER_DATA_OPERATION: placeholder resend, on-demand
      # history) is addressed to OURSELVES — every resolved device is our own, so
      # the recipient-devices guard below doesn't apply. Baileys sends these to
      # jidNormalizedUser(me.id) with no such check (sendPeerDataOperationMessage).
      #
      # A self-chat ("Message Yourself") targets our own account — every resolved
      # device is ours, so the recipient-devices guard below would reject it. It's
      # a real conversation though (Baileys allows sendMessage to me.id), so skip
      # the guard, exactly like the peer_send? carve-out.
      #
      # Otherwise: if the recipient resolved to no real devices, the number isn't
      # reachable on WhatsApp (unregistered / wrong number). Fail instead of
      # fabricating a device and producing a "sent" message the server silently
      # drops — Baileys#2635. The recipient must contribute at least one device
      # (our own devices don't count).
      if peer_send?(ctx) or self_send?(ctx) or recipient_devices?(ctx, devices) do
        {:ok, %{ctx | devices: devices}}
      else
        {:error, {:resolve_devices, :not_on_whatsapp}}
      end
    end
  end

  # A peer-category stanza targets our own devices (PEER_DATA_OPERATION).
  defp peer_send?(%{stanza_attrs: %{"category" => "peer"}}), do: true
  defp peer_send?(_), do: false

  # A self-chat: the target normalizes to our own account (id or lid), ignoring
  # device. Our own devices are the legitimate recipients here.
  defp self_send?(%{target_jid: target, creds: %{me: me}}) do
    target_user = JID.jid_normalized_user(target)
    mine = [me[:id], me[:lid]] |> Enum.reject(&is_nil/1) |> Enum.map(&JID.jid_normalized_user/1)
    target_user in mine
  end

  # The device set for a list of user jids: cache-hit users skip USync; misses
  # are fetched in one query. {:ok, flat device list} | {:error, {stage, reason}}.
  defp user_devices(ctx, users) do
    case DeviceListCache.get_many(ctx.conn, users) do
      {hits, []} -> {:ok, hits |> Map.values() |> List.flatten()}
      {_hits, _misses} -> usync_devices(ctx, users)
    end
  end

  # Group: fetch metadata → participant jids (+ our own) → their devices.
  # {:ok, devices, addressing_mode} | {:error, {stage, reason}}; the mode picks
  # our sender-key identity.
  defp group_devices(ctx) do
    iq = GroupMetadata.query_iq(ctx.target_jid)

    with {:ok, reply} <- Connection.query_iq(ctx.cm, iq),
         {:ok, meta} <- GroupMetadata.parse(reply) do
      store_participant_lid_mappings(ctx, meta)
      users = Enum.uniq(participant_user_jids(meta) ++ own_id_list(ctx))

      # user_devices already returns {:error, {stage, reason}} on failure.
      with {:ok, devices} <- user_devices(ctx, users) do
        {:ok, devices, meta.addressing_mode}
      end
    else
      {:error, {_stage, _reason}} = err -> err
      error -> {:error, {:group_metadata, error}}
    end
  end

  # Persist PN↔LID pairs the metadata carried, so sessions for PN-USynced devices
  # resolve to the recipient's LID identity (matching how we receive from them).
  # A lid-group participant is {id: lid, phone_number: pn}; a pn-group one is
  # {id: pn, lid: lid} — either way pull the {lid, pn} when both are present.
  defp store_participant_lid_mappings(ctx, %{participants: parts}) do
    pairs =
      for p <- parts,
          {lid, pn} = participant_lid_pn(p),
          is_binary(lid) and is_binary(pn),
          do: {lid, pn}

    unless pairs == [], do: LidMappingFileStore.store_mappings(ctx.conn, pairs)
  end

  defp participant_lid_pn(%{id: id, lid: lid, phone_number: pn}) do
    cond do
      JID.lid_user?(id) -> {id, pn}
      is_binary(lid) -> {lid, id}
      true -> {nil, nil}
    end
  end

  # Participant jids to USync for devices. USync's device directory is keyed by
  # phone number, so for a lid-addressed group we look up each member's PN
  # (participant_pn), not their lid — querying by lid gets no reply (the live
  # symptom was a USync timeout). The PN→LID session addressing is handled later
  # by LidMappingFileStore.signal_address. A pn group already exposes pn ids.
  defp participant_user_jids(%{participants: parts}) do
    Enum.map(parts, fn p -> p.phone_number || p.id end)
  end

  # Our own jid, unless we are the recipient (DM to self).
  defp own_id_list(%{creds: %{me: %{id: id}}, target_jid: id}), do: []
  defp own_id_list(%{creds: %{me: %{id: id}}}), do: [id]

  defp usync_devices(ctx, users) do
    # USync is a user→devices lookup: the <user jid> must be the bare user jid,
    # never device-suffixed (our own me.id carries our companion device, e.g.
    # :29). A device-suffixed jid makes the server drop the whole query with no
    # reply → timeout. Normalize + dedup.
    normalized = users |> Enum.map(&JID.jid_normalized_user/1) |> Enum.uniq()

    query =
      Enum.reduce(normalized, USync.with_context(USync.new(), "message"), fn user, q ->
        USync.with_user(q, %{id: user})
      end)
      |> USync.with_protocol(:devices)
      |> USync.with_protocol(:lid)

    {:ok, iq} = USync.build_iq(query)

    case Connection.query_iq(ctx.cm, iq) do
      {:ok, reply} -> {:ok, parse_devices(ctx, reply)}
      {:error, reason} -> {:error, {:usync, reason}}
    end
  end

  defp parse_devices(ctx, reply) do
    query = USync.new() |> USync.with_protocol(:devices) |> USync.with_protocol(:lid)

    case USync.parse_result(query, reply) do
      %{list: list} ->
        store_lid_mappings(ctx, list)
        me = ctx.creds.me
        # 1:1 send keeps device 0 (the recipient's primary).
        devices = USync.Devices.extract(list, me.id, Map.get(me, :lid), _exclude_zero? = false)
        cache_device_lists(ctx, devices)
        devices

      nil ->
        # No <list> in the reply — nothing resolved. Don't fabricate a device;
        # resolve_devices turns an empty set into a :not_on_whatsapp error.
        []
    end
  end

  # Does the resolved device set include at least one device that is NOT us? A
  # number that isn't on WhatsApp resolves to only our own devices (or none).
  defp recipient_devices?(ctx, devices) do
    me = ctx.creds.me
    mine = [me[:id], me[:lid]] |> Enum.reject(&is_nil/1) |> Enum.map(&JID.jid_normalized_user/1)

    Enum.any?(devices, fn %{jid: jid} ->
      JID.jid_normalized_user(jid) not in mine
    end)
  end

  # Step 2: ensure a Signal session per device, fetching bundles for any missing.
  defp ensure_sessions(%{devices: devices} = ctx) do
    missing = Enum.filter(devices, fn %{jid: jid} -> is_nil(load_session(ctx, jid)) end)

    if missing == [] do
      {:ok, ctx}
    else
      Logger.debug("Fetching bundles for #{length(missing)} device(s)")

      with {:ok, reply} <- fetch_bundles(ctx, Enum.map(missing, & &1.jid)) do
        SessionInjector.inject(reply, ctx.creds, ctx.conn)
        {:ok, ctx}
      end
    end
  end

  defp fetch_bundles(ctx, jids) do
    # Fetch keyed by LID when the user is lid-mapped (Baileys wireJids): the
    # server doesn't answer a PN bundle request for a lid-mapped user.
    wire = jids |> Enum.map(&LidMappingFileStore.wire_jid(ctx.conn, &1)) |> Enum.uniq()
    users = Enum.map(wire, fn jid -> %Node{tag: "user", attrs: %{"jid" => jid}, content: nil} end)

    iq = %Node{
      tag: "iq",
      attrs: [{"xmlns", "encrypt"}, {"type", "get"}, {"to", Constants.s_whatsapp_net()}],
      content: [%Node{tag: "key", attrs: %{}, content: users}]
    }

    case Connection.query_iq(ctx.cm, iq) do
      {:ok, reply} -> {:ok, reply}
      {:error, reason} -> {:error, {:fetch_bundles, reason}}
    end
  end

  # Step 3 (DM): encrypt per device — own companion devices get a DSM-wrapped
  # copy, the recipient's devices the plain message — advancing each ratchet.
  defp encrypt(%{kind: :dm, devices: devices, message: message, target_jid: jid} = ctx) do
    plaintext = MessageEncoder.encode(message)
    dsm_plaintext = MessageEncoder.encode(device_sent_message(message, jid))
    own_user = JID.decode(ctx.creds.me.id).user

    participants =
      devices
      |> Enum.map(fn device ->
        bytes = if own_device?(device, own_user), do: dsm_plaintext, else: plaintext
        encrypt_for_device(ctx, device, bytes)
      end)
      |> Enum.reject(&is_nil/1)

    {:ok, %{ctx | participants: participants}}
  end

  # Step 3 (group): encrypt the message ONCE with our sender key (skmsg), and
  # distribute our sender-key-distribution-message (SKDM) to every participant
  # device as a per-device pkmsg so they can decrypt the skmsg. First send to a
  # group distributes keys to everyone; we redistribute on every send for now.
  defp encrypt(%{kind: :group, message: message} = ctx) do
    sk_store = SenderKeyStore.build(ctx.conn)
    me_id = sender_identity(ctx)
    sender_name = SenderKeyName.from_jids(ctx.target_jid, me_id)

    {:ok, skdm} =
      GroupSessionBuilder.create_sender_key_distribution_message(
        GroupSessionBuilder.new(sk_store),
        sk_store,
        ctx.target_jid,
        me_id
      )

    {:ok, skmsg} =
      GroupCipher.encrypt(sk_store, sender_name, MessageEncoder.encode(message))

    # The SKDM-only message, encrypted per device (the dm path). It carries no
    # text — members read the body from the group skmsg; this pkmsg only delivers
    # our sender key (Baileys senderKeyMsg).
    skdm_plaintext = MessageEncoder.encode(skdm_message(ctx.target_jid, skdm))

    participants =
      ctx.devices
      |> Enum.map(&encrypt_for_device(ctx, &1, skdm_plaintext))
      |> Enum.reject(&is_nil/1)

    {:ok, %{ctx | participants: participants, skmsg: skmsg}}
  end

  # Step 4: build + relay the stanza. DM = a <participants> of per-device <enc>.
  # Group = the skmsg <enc> plus the SKDM <participants>.
  defp relay(%{participants: []} = ctx) do
    {:error, {:relay, {:no_encrypted_devices, ctx.target_jid}}}
  end

  defp relay(%{kind: :group} = ctx) do
    {:ok, stanza} =
      Relay.build_group_stanza(
        ctx.msg_id,
        ctx.target_jid,
        ctx.skmsg,
        ctx.participants,
        ctx.creds.account,
        edit: ctx.edit_attr
      )

    Logger.debug("Relaying group #{ctx.msg_id} (#{length(ctx.participants)} devices)")

    # relay_stanza enqueues the frame on the socket and replies :ok (it can't know
    # delivery — that's what later receipts report). Its :ok is this stage's result.
    Connection.relay_stanza(ctx.cm, stanza)
  end

  defp relay(ctx) do
    {:ok, stanza} =
      Relay.build_multi_device_stanza(
        ctx.msg_id,
        ctx.target_jid,
        ctx.participants,
        ctx.creds.account,
        edit: ctx.edit_attr,
        extra_attrs: ctx.stanza_attrs
      )

    Logger.debug("Relaying #{ctx.msg_id} (#{length(ctx.participants)} device(s))")

    Connection.relay_stanza(ctx.cm, stanza)
  end

  # --- helpers (ported from the former Connection send path) ---

  defp store_lid_mappings(ctx, list) do
    pairs = for %{id: pn} = e <- list, lid = Map.get(e, "lid"), is_binary(lid), do: {lid, pn}

    unless pairs == [] do
      {count, newly} = LidMappingFileStore.store_mappings(ctx.conn, pairs)
      if count > 0, do: Logger.debug("Stored #{count} LID↔PN mapping(s) from USync")

      new_lids = for {lid, _pn} <- newly, do: lid
      Connection.assert_lid_sessions(ctx.cm, new_lids)
      # Surface the newly-learned pairs to the consumer (:lid_mapping_update).
      Connection.notify_lid_mappings(ctx.cm, newly)
    end

    :ok
  end

  defp cache_device_lists(ctx, devices) do
    DeviceListCache.put_many(ctx.conn, Enum.group_by(devices, & &1.user))
  end

  # {device_jid, enc_type, ciphertext} or nil on failure. Session keyed by the
  # LID address when a mapping exists; wire <to jid> stays the PN device jid.
  defp encrypt_for_device(ctx, %{jid: device_jid}, plaintext) do
    addr = LidMappingFileStore.signal_address(ctx.conn, device_jid)
    store = SessionStore.build(ctx.creds)
    record = SessionStore.load_session(ctx.conn, addr)

    # SessionCipher.encrypt returns {:ok, ...} or raises (let-it-crash); there is
    # no error tuple to handle.
    {:ok, enc_type, ciphertext, record} = SessionCipher.encrypt(record, plaintext, store)
    SessionStore.store_session(ctx.conn, addr, record)
    {device_jid, enc_type, ciphertext}
  end

  defp load_session(ctx, jid),
    do: SessionStore.load_session(ctx.conn, LidMappingFileStore.signal_address(ctx.conn, jid))

  defp own_device?(%{user: user}, own_user), do: user == own_user

  defp device_sent_message(message, destination_jid) do
    %Proto.Message{
      deviceSentMessage: %Proto.Message.DeviceSentMessage{
        destinationJid: destination_jid,
        message: message
      },
      messageContextInfo: Map.get(message, :messageContextInfo)
    }
  end

  # Our sender identity for the group's sender key: LID for a lid-addressed
  # group, otherwise our PN. Must match how the group addresses us so peers find
  # our sender key under the right name.
  defp sender_identity(%{addressing_mode: :lid, creds: %{me: %{lid: lid}}}) when is_binary(lid),
    do: lid

  defp sender_identity(%{creds: %{me: %{id: id}}}), do: id

  # The per-device message that distributes our sender key — SKDM only, no body
  # (Baileys senderKeyMsg). Members read the actual text from the group skmsg.
  defp skdm_message(group_jid, skdm) do
    %Proto.Message{
      senderKeyDistributionMessage: %Proto.Message.SenderKeyDistributionMessage{
        groupId: group_jid,
        axolotlSenderKeyDistributionMessage: skdm.serialized
      }
    }
  end
end