lib/sat/csf/parser.ex

defmodule Sat.Csf.Parser do
  @moduledoc """
  Parser de Constancia de Situación Fiscal (CSF) que toma el resultado
  estructurado de `Pdf.Reader.read/2` y produce un `%Sat.Csf.Document{}`.

  El parser asume que el PDF se leyó con `dictionary: :es` (necesario para
  separar palabras pegadas como `iniciode → inicio de`). `Sat.Csf.from_file/2`
  y `from_binary/2` lo configuran por defecto.

  ## Estrategia

  - Identificación y Domicilio: se extraen pares `Label: valor` con un regex
    que conoce todos los labels esperados, manejando líneas con dos columnas
    (ej. `Código Postal: 77728 Tipo de Vialidad: AVENIDA (AV.)`).

  - Actividades y Regímenes: regex sobre el texto de cada fila, anclado por el
    formato de fecha `dd/mm/yyyy` al final.

  - Obligaciones: usa las posiciones X de los tokens para separar las cuatro
    columnas (descripción, vencimiento, fecha inicio, fecha fin). Una fila se
    extiende a múltiples líneas cuando la descripción se desborda; la presencia
    de `dd/mm/yyyy` en la columna de fecha inicio marca el comienzo de cada
    obligación.
  """

  alias Pdf.Reader.Result, as: PdfResult
  alias Sat.Csf.{
    ActividadEconomica,
    Document,
    Domicilio,
    Identificacion,
    Obligacion,
    Regimen
  }

  @section_markers [
    {:identificacion, "Datos de Identificación del Contribuyente"},
    {:domicilio, "Datos del domicilio registrado"},
    {:actividades, "Actividades Económicas"},
    {:regimenes_singular, "Régimen Fecha"},
    {:regimenes, "Regímenes"},
    {:obligaciones, "Obligaciones"},
    {:fin, "Sus datos personales son incorporados"}
  ]

  @identificacion_labels [
    {:nombre_comercial, "Nombre Comercial"},
    {:nombre, "Nombre (s)"},
    {:rfc, "RFC"},
    {:curp, "CURP"},
    {:primer_apellido, "Primer Apellido"},
    {:segundo_apellido, "Segundo Apellido"},
    {:fecha_inicio_operaciones, "Fecha inicio de operaciones"},
    {:estatus_padron, "Estatus en el padrón"},
    {:fecha_ultimo_cambio_estado, "Fecha de último cambio de estado"}
  ]

  @domicilio_labels [
    {:municipio_demarcacion_territorial, "Nombre del Municipio o Demarcación Territorial"},
    {:entidad_federativa, "Nombre de la Entidad Federativa"},
    {:nombre_vialidad, "Nombre de Vialidad"},
    {:nombre_vialidad, "Nombre de la Vialidad"},
    {:colonia, "Nombre de la Colonia"},
    {:localidad, "Nombre de la Localidad"},
    {:numero_exterior, "Número Exterior"},
    {:numero_interior, "Número Interior"},
    {:tipo_vialidad, "Tipo de Vialidad"},
    {:codigo_postal, "Código Postal"},
    {:entre_calle, "Entre Calle"},
    {:y_calle, "Y Calle"},
    {:y_calle, "YCalle"}
  ]

  @date_re ~r/^\d{2}\/\d{2}\/\d{4}$/

  @doc """
  Parsea un `%Pdf.Reader.Result{}` y devuelve `{:ok, %Sat.Csf.Document{}}`.

  Retorna `{:error, :not_a_csf}` si no detecta los marcadores de sección
  esperados (sirve como guard para PDFs que no son CSF).
  """
  @spec parse(PdfResult.t()) :: {:ok, Document.t()} | {:error, :not_a_csf}
  def parse(%PdfResult{pages: pages}) do
    lines =
      pages
      |> Enum.flat_map(& &1.lines)
      |> Enum.map(&normalize_line/1)

    section_indexes = find_section_indexes(lines)

    if Map.has_key?(section_indexes, :identificacion) do
      {:ok,
       %Document{
         identificacion: parse_identificacion(slice(lines, section_indexes, :identificacion, :domicilio)),
         domicilio: parse_domicilio(slice(lines, section_indexes, :domicilio, :actividades)),
         actividades_economicas:
           parse_actividades(slice(lines, section_indexes, :actividades, :regimenes)),
         regimenes:
           parse_regimenes(slice(lines, section_indexes, :regimenes, :obligaciones)),
         obligaciones:
           parse_obligaciones(slice(lines, section_indexes, :obligaciones, :fin))
       }}
    else
      {:error, :not_a_csf}
    end
  end

  # ── Section splitting ────────────────────────────────────────

  defp normalize_line(line) do
    %{line | text: collapse_spaces(line.text)}
  end

  defp collapse_spaces(text), do: text |> String.replace(~r/\s+/u, " ") |> String.trim()

  defp find_section_indexes(lines) do
    lines
    |> Enum.with_index()
    |> Enum.reduce(%{}, fn {line, idx}, acc ->
      Enum.reduce(@section_markers, acc, fn {key, marker}, inner ->
        if not Map.has_key?(inner, key) and String.contains?(line.text, marker) do
          Map.put(inner, key, idx)
        else
          inner
        end
      end)
    end)
    |> normalize_regimenes_index()
  end

  # The section header is "Regímenes:" on its own line — but the table header
  # the next line is "Régimen Fecha Inicio Fecha Fin". If we picked up the
  # singular form first (because "Régimen" appears earlier in regimen rows),
  # discard it.
  defp normalize_regimenes_index(%{regimenes: _} = idx), do: Map.delete(idx, :regimenes_singular)
  defp normalize_regimenes_index(idx), do: Map.delete(idx, :regimenes_singular)

  defp slice(lines, indexes, from_key, to_key) do
    case {Map.get(indexes, from_key), Map.get(indexes, to_key)} do
      {nil, _} -> []
      {from, nil} -> Enum.drop(lines, from + 1)
      {from, to} when to > from -> Enum.slice(lines, (from + 1)..(to - 1))
      _ -> []
    end
  end

  # ── Identificación ───────────────────────────────────────────

  defp parse_identificacion(lines) do
    pairs = extract_label_pairs(lines, @identificacion_labels)
    struct(Identificacion, atomize_pairs(pairs, @identificacion_labels))
  end

  # ── Domicilio ────────────────────────────────────────────────

  defp parse_domicilio(lines) do
    pairs = extract_label_pairs(lines, @domicilio_labels)
    struct(Domicilio, atomize_pairs(pairs, @domicilio_labels))
  end

  # ── Actividades Económicas ───────────────────────────────────

  @actividad_re ~r/^(?<orden>\d+)\s+(?<actividad>.+?)\s+(?<porcentaje>\d{1,3})\s+(?<fecha_inicio>\d{2}\/\d{2}\/\d{4})(?:\s+(?<fecha_fin>\d{2}\/\d{2}\/\d{4}))?\s*$/u

  defp parse_actividades(lines) do
    lines
    |> drop_until_data_row()
    |> Enum.flat_map(fn line ->
      case Regex.named_captures(@actividad_re, line.text) do
        nil ->
          []

        caps ->
          [
            %ActividadEconomica{
              orden: String.to_integer(caps["orden"]),
              actividad_economica: String.trim(caps["actividad"]),
              porcentaje: String.to_integer(caps["porcentaje"]),
              fecha_inicio: presence(caps["fecha_inicio"]),
              fecha_fin: presence(caps["fecha_fin"])
            }
          ]
      end
    end)
  end

  defp drop_until_data_row(lines) do
    Enum.drop_while(lines, fn line -> not String.match?(line.text, ~r/^\d+\s+/) end)
  end

  # ── Régimenes ────────────────────────────────────────────────

  @regimen_re ~r/^(?<regimen>.+?)\s+(?<fecha_inicio>\d{2}\/\d{2}\/\d{4})(?:\s+(?<fecha_fin>\d{2}\/\d{2}\/\d{4}))?\s*$/u

  defp parse_regimenes(lines) do
    lines
    |> Enum.flat_map(fn line ->
      case Regex.named_captures(@regimen_re, line.text) do
        nil ->
          []

        caps ->
          regimen_str = String.trim(caps["regimen"])

          [
            %Regimen{
              regimen: regimen_str,
              codigo: lookup_regimen_codigo(regimen_str),
              fecha_inicio: presence(caps["fecha_inicio"]),
              fecha_fin: presence(caps["fecha_fin"])
            }
          ]
      end
    end)
  end

  defp lookup_regimen_codigo(label) do
    target = simplify_regimen(label)

    Sat.Catalogos.RegimenFiscal.list()
    |> Enum.find(&(simplify_regimen(&1.label) == target))
    |> case do
      %{value: code} -> code
      _ -> nil
    end
  end

  defp simplify_regimen(label) do
    label
    |> strip_accents()
    |> String.downcase()
    |> String.replace(~r/^regimen\s+(?:de\s+|del\s+)?/u, "")
    |> String.replace(~r/\s+/u, " ")
    |> String.trim()
  end

  defp strip_accents(s) do
    s
    |> String.normalize(:nfd)
    |> String.replace(~r/\p{Mn}/u, "")
  end

  # ── Obligaciones ─────────────────────────────────────────────
  #
  # The obligation table has 4 columns. Rows wrap across multiple physical
  # lines: the first line carries the start date in the fecha_inicio column,
  # subsequent lines extend the descripción/vencimiento text.
  #
  # We anchor columns from the table header line, then bin each row's tokens
  # into the column whose anchor X is the largest one ≤ the token's X.

  defp parse_obligaciones(lines) do
    case find_obligaciones_header(lines) do
      nil ->
        []

      {header_idx, header} ->
        data_lines = Enum.drop(lines, header_idx + 1)

        data_lines
        |> column_boundaries(header)
        |> case do
          [] -> []
          boundaries -> bin_rows_by_columns(data_lines, boundaries)
        end
        |> group_into_obligaciones()
    end
  end

  defp find_obligaciones_header(lines) do
    Enum.find_index(lines, fn line ->
      text = line.text

      String.contains?(text, "Descripción") and String.contains?(text, "Vencimiento") and
        String.contains?(text, "Fecha")
    end)
    |> case do
      nil -> nil
      idx -> {idx, Enum.at(lines, idx)}
    end
  end

  # The SAT obligation header centres each column label inside its column, while
  # row text is left-aligned. Naive midpoints between header anchors place the
  # col2→col3 boundary too far left (text in col2 extends almost up to col3 in
  # rows, while the header anchors are only ~150pt apart). To compensate, we
  # refine the col2→col3 boundary using the actual X of dd/mm/yyyy tokens —
  # those are the leftmost tokens of column 3 in real data. col1→col2 and
  # col3→col4 use header midpoints (no symmetric data anchor available).
  defp column_boundaries(data_lines, header_line) do
    anchors = cluster_header_xs(header_line.tokens)

    midpoints =
      anchors
      |> Enum.chunk_every(2, 1, :discard)
      |> Enum.map(fn [a, b] -> (a + b) / 2.0 end)

    date_xs =
      data_lines
      |> Enum.flat_map(& &1.tokens)
      |> Enum.filter(&Regex.match?(@date_re, &1.text))
      |> Enum.map(& &1.x)

    case {midpoints, date_xs} do
      {[m12, _m23, m34], [_ | _]} ->
        [m12, Enum.min(date_xs) - 5.0, m34]

      {[m12, _m23], [_ | _]} ->
        [m12, Enum.min(date_xs) - 5.0]

      {ms, _} ->
        ms
    end
  end

  # Tokens in the header that share the same column have very close X (often
  # identical, since the producer emits one label per token at a single Tm
  # position). Walk the unique sorted Xs and start a new cluster whenever a
  # gap exceeds `cluster_gap_threshold/0`.
  defp cluster_header_xs(tokens) do
    tokens
    |> Enum.map(& &1.x)
    |> Enum.uniq()
    |> Enum.sort()
    |> Enum.reduce([], fn x, acc ->
      case acc do
        [] ->
          [x]

        [last | _] when x - last < 30 ->
          acc

        _ ->
          [x | acc]
      end
    end)
    |> Enum.reverse()
  end

  # For each line, place each token into its column bin and return the per-row
  # column texts (always 4 strings, missing columns are "").
  defp bin_rows_by_columns(lines, midpoints) do
    n_columns = length(midpoints) + 1
    empty = List.duplicate([], n_columns)

    Enum.map(lines, fn line ->
      buckets =
        Enum.reduce(line.tokens, empty, fn token, acc ->
          col = column_for_x_midpoints(token.x, midpoints)
          List.update_at(acc, col, &[token | &1])
        end)

      Enum.map(buckets, fn tokens ->
        tokens
        |> Enum.reverse()
        |> Enum.map(& &1.text)
        |> Enum.join(" ")
        |> collapse_spaces()
      end)
    end)
  end

  defp column_for_x_midpoints(x, midpoints) do
    Enum.reduce_while(Enum.with_index(midpoints), length(midpoints), fn {m, idx}, _acc ->
      if x < m, do: {:halt, idx}, else: {:cont, idx + 1}
    end)
  end

  # Walk the binned rows; a row whose third column matches dd/mm/yyyy starts a
  # new obligation. Other rows extend the previous obligation's text.
  defp group_into_obligaciones(rows) do
    {acc, current} = Enum.reduce(rows, {[], nil}, &reduce_obligacion_row/2)

    [current | acc]
    |> Enum.reject(&is_nil/1)
    |> Enum.reverse()
    |> Enum.map(&collapse_obligacion/1)
  end

  defp reduce_obligacion_row([obl, venc, fi, ff], {acc, current}) do
    cond do
      Regex.match?(@date_re, fi) ->
        acc = if current, do: [current | acc], else: acc

        {acc,
         %Obligacion{
           descripcion_obligacion: obl,
           descripcion_vencimiento: venc,
           fecha_inicio: fi,
           fecha_fin: presence(ff)
         }}

      not is_nil(current) ->
        {acc, append_to_current(current, [obl, venc, fi, ff])}

      true ->
        {acc, current}
    end
  end

  defp reduce_obligacion_row(_, state), do: state

  defp append_to_current(%Obligacion{} = ob, [obl, venc, _fi, _ff]) do
    %Obligacion{
      ob
      | descripcion_obligacion: join_nonempty(ob.descripcion_obligacion, obl),
        descripcion_vencimiento: join_nonempty(ob.descripcion_vencimiento, venc)
    }
  end

  defp join_nonempty(a, b) do
    [a, b]
    |> Enum.reject(&(&1 in [nil, ""]))
    |> Enum.join(" ")
    |> collapse_spaces()
  end

  defp collapse_obligacion(%Obligacion{} = ob) do
    %Obligacion{
      ob
      | descripcion_obligacion: collapse_spaces(ob.descripcion_obligacion || ""),
        descripcion_vencimiento: collapse_spaces(ob.descripcion_vencimiento || "")
    }
  end

  # ── Helpers shared by Identificación + Domicilio ─────────────

  # Build a label → value map by scanning the joined section text.
  # Labels are sorted longest-first so that "Nombre Comercial" wins over
  # "Nombre (s)" and "Nombre del Municipio…" wins over "Nombre de la…".
  defp extract_label_pairs(lines, label_specs) do
    text = lines |> Enum.map(& &1.text) |> Enum.join(" ")

    labels =
      label_specs
      |> Enum.map(fn {_field, label} -> label end)
      |> Enum.uniq()
      |> Enum.sort_by(&(-String.length(&1)))

    alt = labels |> Enum.map(&Regex.escape/1) |> Enum.join("|")
    re = Regex.compile!("(#{alt})\\s*:\\s*(.+?)(?=\\s+(?:#{alt})\\s*:|$)", "u")

    Regex.scan(re, text)
    |> Enum.map(fn [_, label, value] -> {label, String.trim(value)} end)
    |> Enum.into(%{})
  end

  defp atomize_pairs(label_to_value, label_specs) do
    Enum.reduce(label_specs, %{}, fn {field, label}, acc ->
      case Map.get(label_to_value, label) do
        nil -> acc
        value -> Map.put_new(acc, field, value)
      end
    end)
  end

  defp presence(nil), do: nil
  defp presence(""), do: nil
  defp presence(other), do: other
end