Skip to main content

lib/sat/cfdi/descarga/masiva/pipeline.ex

defmodule Sat.Cfdi.Descarga.Masiva.Pipeline do
  @moduledoc """
  Orquestador del flujo completo de Descarga Masiva.

  Encadena los 4 servicios (autenticacion -> solicitud -> verificacion ->
  paquete) y entrega un stream de XMLs / metadatos al consumidor, ocultando
  el ZIP por debajo.

  > Nota: en produccion con Oban cada paso se ejecuta como worker independiente.
  > Usar este modulo solo para scripts, CLI o volúmenes chicos.

  ## Ejemplo

      {:ok, cred} = Sat.Certificados.Credential.create("fiel.cer", "fiel.key", "pwd")

      params = %SolicitudParams{
        rfc_solicitante: "AAA010101AAA",
        fecha_inicial: ~U[2025-01-01 00:00:00Z],
        fecha_final:   ~U[2025-01-31 23:59:59Z],
        tipo_solicitud: :cfdi
      }

      Pipeline.stream_xml(params, credential: cred)
      |> Stream.each(fn
        {:ok, {filename, xml}} -> IO.puts(filename)
        {:error, reason} -> IO.warn(inspect(reason))
      end)
      |> Stream.run()

  Polling y retries son configurables via opciones.
  """

  alias Sat.Cfdi.Descarga.Masiva.{Autenticacion, Paquete, Solicitud, Verificacion}
  alias Sat.Cfdi.Descarga.Masiva.Paquete.Reader
  alias Sat.Cfdi.Descarga.Masiva.Types.{SolicitudParams, Token}

  @doc """
  Ejecuta el flujo completo y devuelve un `Stream` lazy donde cada elemento
  es `{:ok, {filename, xml}}` o `{:error, reason}`.

  Opciones:
    * `:credential` (requerido) — FIEL del solicitante
    * `:poll_interval_ms` — intervalo de polling para verificacion
    * `:max_attempts` — intentos maximos de polling
    * `:timeout` — HTTP timeout
  """
  @spec stream_xml(SolicitudParams.t(), keyword()) :: Enumerable.t()
  def stream_xml(%SolicitudParams{} = params, opts) do
    Stream.resource(
      fn -> begin_pipeline(params, opts) end,
      fn state -> next_xml(state, opts) end,
      fn _state -> :ok end
    )
  end

  @doc """
  Variante sincrona que materializa todos los CFDIs en una lista.
  Solo recomendable para volumenes chicos (< 10,000).
  """
  @spec listar_xml(SolicitudParams.t(), keyword()) ::
          {:ok, [{String.t(), String.t()}]} | {:error, term()}
  def listar_xml(%SolicitudParams{} = params, opts) do
    case run_pipeline(params, opts) do
      {:ok, paquetes} ->
        xmls =
          paquetes
          |> Enum.flat_map(fn paquete ->
            case Reader.stream_cfdis(paquete) do
              {:ok, stream} -> Enum.to_list(stream)
              {:error, _} -> []
            end
          end)

        {:ok, xmls}

      {:error, _} = e ->
        e
    end
  end

  @doc """
  Lista metadata de todos los paquetes (cuando el `tipo_solicitud` es
  `:metadata`). Devuelve una lista de mapas.
  """
  @spec listar_metadata(SolicitudParams.t(), keyword()) ::
          {:ok, [map()]} | {:error, term()}
  def listar_metadata(%SolicitudParams{} = params, opts) do
    params = %{params | tipo_solicitud: :metadata}

    case run_pipeline(params, opts) do
      {:ok, paquetes} ->
        rows =
          paquetes
          |> Enum.flat_map(fn paquete ->
            case Reader.parse_metadata(paquete) do
              {:ok, list} -> list
              {:error, _} -> []
            end
          end)

        {:ok, rows}

      {:error, _} = e ->
        e
    end
  end

  defp begin_pipeline(params, opts) do
    case run_pipeline(params, opts) do
      {:ok, paquetes} -> {:paquetes, paquetes, []}
      {:error, reason} -> {:error_state, reason}
    end
  end

  defp next_xml({:error_state, reason} = state, _opts), do: {[{:error, reason}], state}

  defp next_xml({:paquetes, [], []}, _opts), do: {:halt, :done}

  defp next_xml({:paquetes, [paquete | rest], []}, _opts) do
    case Reader.stream_cfdis(paquete) do
      {:ok, stream} ->
        items = Enum.to_list(stream) |> Enum.map(&{:ok, &1})
        next_xml({:paquetes, rest, items}, [])

      {:error, reason} ->
        {[{:error, reason}], {:paquetes, rest, []}}
    end
  end

  defp next_xml({:paquetes, paquetes, [head | tail]}, _opts) do
    {[head], {:paquetes, paquetes, tail}}
  end

  defp run_pipeline(%SolicitudParams{} = params, opts) do
    with {:ok, %Token{} = token} <- Autenticacion.autenticar(opts),
         {:ok, %{cod_estatus: cod, id_solicitud: id_sol}} when cod in ["5000", "0"] <-
           Solicitud.solicitar(token, params, opts),
         id_sol when is_binary(id_sol) and id_sol != "" <- id_sol,
         {:ok, %{ids_paquetes: ids}} <-
           Verificacion.esperar_terminada(token, id_sol, opts) do
      paquetes =
        ids
        |> Enum.map(fn id ->
          case Paquete.descargar(token, id, opts) do
            {:ok, p} -> p
            {:error, _} -> nil
          end
        end)
        |> Enum.reject(&is_nil/1)

      {:ok, paquetes}
    else
      nil -> {:error, {:solicitud_sin_id, "el SAT no devolvio IdSolicitud"}}

      {:ok, %{cod_estatus: cod, mensaje: msg}} ->
        {:error, {:solicitud_rechazada, cod, msg}}

      {:error, _} = e ->
        e
    end
  end
end