Skip to main content

lib/ftfy.ex

# Part of ftfy for Elixir — an Apache-2.0 port of python-ftfy.
# Original ftfy Copyright 2023 Robyn Speer; this port Copyright 2026 FashionUnited.
defmodule Ftfy do
  @moduledoc """
  ftfy: fixes text for you.

  A port of the Python [ftfy](https://github.com/rspeer/python-ftfy) library for
  making text less broken — most importantly, fixing *mojibake* (text that was
  decoded in the wrong encoding).

      iex> Ftfy.fix_text("✔ No problems")
      "✔ No problems"

      iex> Ftfy.fix_text("Broken text… it’s flubberific!")
      "Broken text… it's flubberific!"

  See `Ftfy.TextFixerConfig` for the available options. The top-level functions
  accept either a `%Ftfy.TextFixerConfig{}` or a keyword list of overrides.
  """

  alias Ftfy.{Badness, Chardata, Codecs, Fixes, TextFixerConfig}

  @version "6.3.1"
  def version, do: @version

  # Functions that can be applied by a "transcode"/"apply" plan step.
  @fixers %{
    "unescape_html" => &Fixes.unescape_html/1,
    "remove_terminal_escapes" => &Fixes.remove_terminal_escapes/1,
    "restore_byte_a0" => &Fixes.restore_byte_a0/1,
    "replace_lossy_sequences" => &Fixes.replace_lossy_sequences/1,
    "decode_inconsistent_utf8" => &Fixes.decode_inconsistent_utf8/1,
    "fix_c1_controls" => &Fixes.fix_c1_controls/1,
    "fix_latin_ligatures" => &Fixes.fix_latin_ligatures/1,
    "fix_character_width" => &Fixes.fix_character_width/1,
    "uncurl_quotes" => &Fixes.uncurl_quotes/1,
    "fix_line_breaks" => &Fixes.fix_line_breaks/1,
    "fix_surrogates" => &Fixes.fix_surrogates/1,
    "remove_control_chars" => &Fixes.remove_control_chars/1
  }

  def fixers, do: @fixers

  @doc """
  Given Unicode text, fix inconsistencies and glitches such as mojibake.

  Fixes the text in independent segments (usually lines), and discards any
  explanation. Pass a `%Ftfy.TextFixerConfig{}` or keyword overrides as the
  second argument.

      iex> Ftfy.fix_text("LOUD NOISES")
      "LOUD NOISES"
  """
  @spec fix_text(binary(), TextFixerConfig.t() | keyword()) :: binary()
  def fix_text(text, config_or_opts \\ []) do
    config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
    max = config.max_decode_length

    {chunks, _config} =
      text
      |> split_segments(max)
      |> Enum.reduce({[], config}, fn segment, {acc, cfg} ->
        cfg =
          if cfg.unescape_html == "auto" and String.contains?(segment, "<"),
            do: %{cfg | unescape_html: false},
            else: cfg

        {fixed, _explanation} = fix_and_explain(segment, cfg)
        {[fixed | acc], cfg}
      end)

    chunks |> Enum.reverse() |> IO.iodata_to_binary()
  end

  @doc """
  Fix text as a single segment, returning `{fixed_text, explanation}`.

  The explanation is a list of `{action, parameter}` steps that can be replayed
  with `apply_plan/2`, or `nil` if `explain` is false.
  """
  @spec fix_and_explain(binary(), TextFixerConfig.t() | keyword()) ::
          {binary(), [{String.t(), String.t()}] | nil}
  def fix_and_explain(text, config_or_opts \\ []) do
    config = build_config(%TextFixerConfig{}, config_or_opts)

    config =
      if config.unescape_html == "auto" and String.contains?(text, "<"),
        do: %{config | unescape_html: false},
        else: config

    steps = if config.explain, do: [], else: nil
    fix_loop(text, config, steps)
  end

  defp fix_loop(text, config, steps) do
    {new_text, steps} =
      {text, steps}
      |> try_fix(:unescape_html, config)
      |> apply_encoding_fix(config)
      |> try_fixers(
        [
          :fix_c1_controls,
          :fix_latin_ligatures,
          :fix_character_width,
          :uncurl_quotes,
          :fix_line_breaks,
          :fix_surrogates,
          :remove_terminal_escapes,
          :remove_control_chars
        ],
        config
      )
      |> apply_normalization(config)

    if new_text == text, do: {new_text, steps}, else: fix_loop(new_text, config, steps)
  end

  defp apply_encoding_fix({text, steps}, %{fix_encoding: false}), do: {text, steps}

  defp apply_encoding_fix({text, nil}, config), do: {fix_encoding(text, config), nil}

  defp apply_encoding_fix({text, steps}, config) do
    {fixed, encoding_steps} = fix_encoding_and_explain(text, config)
    {fixed, steps ++ encoding_steps}
  end

  defp try_fixers({text, steps}, names, config) do
    Enum.reduce(names, {text, steps}, fn name, acc -> try_fix(acc, name, config) end)
  end

  defp try_fix({text, steps}, name, config) do
    if Map.get(config, name) do
      fixed = @fixers[Atom.to_string(name)].(text)

      steps =
        if steps != nil and fixed != text,
          do: steps ++ [{"apply", Atom.to_string(name)}],
          else: steps

      {fixed, steps}
    else
      {text, steps}
    end
  end

  defp apply_normalization({text, steps}, %{normalization: nil}), do: {text, steps}

  defp apply_normalization({text, steps}, %{normalization: form}) do
    fixed = normalize(text, form)
    steps = if steps != nil and fixed != text, do: steps ++ [{"normalize", form}], else: steps
    {fixed, steps}
  end

  @doc """
  Apply just the encoding-fixing steps of ftfy, returning `{fixed, explanation}`.

      iex> Ftfy.fix_encoding_and_explain("só")
      {"só", [{"encode", "latin-1"}, {"decode", "utf-8"}]}
  """
  @spec fix_encoding_and_explain(binary(), TextFixerConfig.t() | keyword()) ::
          {binary(), [{String.t(), String.t()}]}
  def fix_encoding_and_explain(text, config_or_opts \\ []) do
    config = build_config(%TextFixerConfig{}, config_or_opts)

    if config.fix_encoding do
      fix_encoding_loop(text, config, [])
    else
      {text, []}
    end
  end

  defp fix_encoding_loop(text, config, plan_so_far) do
    {new_text, plan} = fix_encoding_one_step(text, config)
    plan_so_far = plan_so_far ++ plan

    if new_text == text,
      do: {new_text, plan_so_far},
      else: fix_encoding_loop(new_text, config, plan_so_far)
  end

  @doc """
  Apply just the encoding-fixing steps of ftfy, discarding the explanation.

      iex> Ftfy.fix_encoding("ó")
      "ó"
      iex> Ftfy.fix_encoding("&ATILDE;&SUP3;")
      "&ATILDE;&SUP3;"
  """
  @spec fix_encoding(binary(), TextFixerConfig.t() | keyword()) :: binary()
  def fix_encoding(text, config_or_opts \\ []) do
    config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
    {fixed, _plan} = fix_encoding_and_explain(text, config)
    fixed
  end

  @doc "Fix text as a single segment, discarding the explanation."
  @spec fix_text_segment(binary(), TextFixerConfig.t() | keyword()) :: binary()
  def fix_text_segment(text, config_or_opts \\ []) do
    config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
    {fixed, _plan} = fix_and_explain(text, config)
    fixed
  end

  # One step of encoding repair (port of _fix_encoding_one_step_and_explain).
  defp fix_encoding_one_step("", _config), do: {"", []}

  defp fix_encoding_one_step(text, config) do
    if Chardata.possible_encoding(text, "ascii") or not Badness.is_bad(text) do
      {text, []}
    else
      case try_encodings(Chardata.charmap_encodings(), text, config, []) do
        {:done, fixed, steps} -> {fixed, steps}
        {:exhausted, possible} -> encoding_fallbacks(text, possible, config)
      end
    end
  end

  # Try to repair by re-encoding in each candidate single-byte encoding and
  # decoding the result as UTF-8. Returns on the first success.
  defp try_encodings([], _text, _config, possible), do: {:exhausted, Enum.reverse(possible)}

  defp try_encodings([encoding | rest], text, config, possible) do
    if Chardata.possible_encoding(text, encoding) do
      {:ok, encoded} = Codecs.encode(text, encoding)
      {encoded, transcode_steps} = maybe_restore_byte_a0(encoded, encoding, config)
      {encoded, transcode_steps} = maybe_replace_lossy(encoded, encoding, config, transcode_steps)

      decoding =
        if byte_in?(encoded, 0xED) or byte_in?(encoded, 0xC0),
          do: "utf-8-variants",
          else: "utf-8"

      case Codecs.decode(encoded, decoding) do
        {:ok, fixed} ->
          steps = [{"encode", encoding}] ++ transcode_steps ++ [{"decode", decoding}]
          {:done, fixed, steps}

        {:error, _} ->
          try_encodings(rest, text, config, [encoding | possible])
      end
    else
      try_encodings(rest, text, config, possible)
    end
  end

  defp maybe_restore_byte_a0(encoded, encoding, config) do
    if config.restore_byte_a0 and encoding != "macroman" and
         Regex.match?(Chardata.altered_utf8_re(), encoded) do
      replaced = Fixes.restore_byte_a0(encoded)

      if replaced != encoded,
        do: {replaced, [{"transcode", "restore_byte_a0"}]},
        else: {encoded, []}
    else
      {encoded, []}
    end
  end

  defp maybe_replace_lossy(encoded, encoding, config, transcode_steps) do
    if config.replace_lossy_sequences and String.starts_with?(encoding, "sloppy") do
      replaced = Fixes.replace_lossy_sequences(encoded)

      if replaced != encoded,
        do: {replaced, transcode_steps ++ [{"transcode", "replace_lossy_sequences"}]},
        else: {encoded, transcode_steps}
    else
      {encoded, transcode_steps}
    end
  end

  # After the encoding loop: the inconsistent-UTF-8 fix, the Latin-1-as-1252
  # fix, and the C1-controls fix, in that order.
  defp encoding_fallbacks(text, possible, config) do
    with :no <- inconsistent_utf8_fallback(text, config),
         :no <- latin1_fallback(text, possible),
         :no <- c1_controls_fallback(text, config) do
      {text, []}
    else
      result -> result
    end
  end

  defp inconsistent_utf8_fallback(text, config) do
    if config.decode_inconsistent_utf8 and Regex.match?(Chardata.utf8_detector_re(), text) do
      fixed = Fixes.decode_inconsistent_utf8(text)
      if fixed != text, do: {fixed, [{"apply", "decode_inconsistent_utf8"}]}, else: :no
    else
      :no
    end
  end

  defp latin1_fallback(text, possible) do
    if "latin-1" in possible do
      with {:ok, encoded} <- Codecs.encode(text, "latin-1"),
           {:ok, fixed} when fixed != text <- Codecs.decode(encoded, "windows-1252") do
        {fixed, [{"encode", "latin-1"}, {"decode", "windows-1252"}]}
      else
        _ -> :no
      end
    else
      :no
    end
  end

  defp c1_controls_fallback(text, config) do
    if config.fix_c1_controls and Regex.match?(Chardata.c1_control_re(), text) do
      {Fixes.fix_c1_controls(text), [{"transcode", "fix_c1_controls"}]}
    else
      :no
    end
  end

  @doc """
  Apply a plan (a list of `{operation, arg}` steps) to fix the encoding of text.

  `operation` is one of `"encode"`, `"decode"`, `"transcode"`, or `"apply"`.

      iex> {_text, plan} = Ftfy.fix_and_explain("schön")
      iex> Ftfy.apply_plan("schön", plan)
      "schön"
  """
  @spec apply_plan(binary(), [{String.t(), String.t()}]) :: binary()
  def apply_plan(text, plan) do
    Enum.reduce(plan, text, fn {operation, arg}, obj ->
      case operation do
        "encode" ->
          {:ok, bytes} = Codecs.encode(obj, arg)
          bytes

        "decode" ->
          {:ok, string} = Codecs.decode(obj, arg)
          string

        op when op in ["transcode", "apply"] ->
          case Map.fetch(@fixers, arg) do
            {:ok, fixer} -> fixer.(obj)
            :error -> raise ArgumentError, "Unknown function to apply: #{arg}"
          end

        other ->
          raise ArgumentError, "Unknown plan step: #{other}"
      end
    end)
  end

  @doc """
  Guess a reasonable decoding for some bytes in an unknown encoding, returning
  `{string, encoding_name}`.

  This is not accurate and may *create* Unicode problems. It is not the
  recommended way to use ftfy.
  """
  @spec guess_bytes(binary()) :: {binary(), String.t()}
  def guess_bytes(<<0xFE, 0xFF, rest::binary>>),
    do: {:unicode.characters_to_binary(rest, {:utf16, :big}), "utf-16"}

  def guess_bytes(<<0xFF, 0xFE, rest::binary>>),
    do: {:unicode.characters_to_binary(rest, {:utf16, :little}), "utf-16"}

  def guess_bytes(bytes) when is_binary(bytes) do
    variant? = byte_in?(bytes, 0xED) or byte_in?(bytes, 0xC0)
    decoding = if variant?, do: "utf-8-variants", else: "utf-8"

    case Codecs.decode(bytes, decoding) do
      {:ok, decoded} ->
        {decoded, decoding}

      {:error, _} ->
        if byte_in?(bytes, 0x0D) and not byte_in?(bytes, 0x0A) do
          {:ok, decoded} = Codecs.decode(bytes, "macroman")
          {decoded, "macroman"}
        else
          {:ok, decoded} = Codecs.decode(bytes, "sloppy-windows-1252")
          {decoded, "sloppy-windows-1252"}
        end
    end
  end

  @doc """
  Fix text found in a stream of lines, returning a stream of fixed lines.

  `opts` may include `:encoding` (decode each line from these bytes; `nil` or
  absent means guess) and `:config`.
  """
  @spec fix_file(Enumerable.t(), keyword()) :: Enumerable.t()
  def fix_file(lines, opts \\ []) do
    encoding = Keyword.get(opts, :encoding)
    config = build_config(%TextFixerConfig{}, Keyword.get(opts, :config, []))

    Stream.transform(lines, config, fn line, cfg ->
      decoded =
        case encoding do
          nil ->
            elem(guess_bytes(line), 0)

          enc ->
            case Codecs.decode(line, enc) do
              {:ok, string} -> string
              {:error, _} -> raise ArgumentError, "could not decode input as #{enc}"
            end
        end

      cfg =
        if cfg.unescape_html == "auto" and String.contains?(decoded, "<"),
          do: %{cfg | unescape_html: false},
          else: cfg

      {fixed, _explanation} = fix_and_explain(decoded, cfg)
      {[fixed], cfg}
    end)
  end

  @doc """
  Print a breakdown of each codepoint in `text` (number, glyph, and category)
  for debugging mysterious Unicode.

  Note: unlike the Python original, this does not print the Unicode *name* of
  each character, since the BEAM does not ship the Unicode names database.
  """
  @spec explain_unicode(binary()) :: :ok
  def explain_unicode(text) do
    Enum.each(String.to_charlist(text), fn cp ->
      line = :io_lib.format("U+~4.16.0B  ~ts", [cp, <<cp::utf8>>])
      IO.puts(IO.iodata_to_binary(line))
    end)
  end

  # --- helpers -------------------------------------------------------------

  # Pure-ASCII text is already in every normal form, so skip the (allocating)
  # normalization entirely. The byte scan is cheap and short-circuits on the
  # first non-ASCII byte.
  defp normalize(text, form) do
    if ascii?(text), do: text, else: normalize_form(text, form)
  end

  defp normalize_form(text, "NFC"), do: :unicode.characters_to_nfc_binary(text)
  defp normalize_form(text, "NFD"), do: :unicode.characters_to_nfd_binary(text)
  defp normalize_form(text, "NFKC"), do: :unicode.characters_to_nfkc_binary(text)
  defp normalize_form(text, "NFKD"), do: :unicode.characters_to_nfkd_binary(text)

  defp ascii?(<<c, rest::binary>>) when c < 128, do: ascii?(rest)
  defp ascii?(<<>>), do: true
  defp ascii?(_), do: false

  defp byte_in?(binary, byte), do: :binary.match(binary, <<byte>>) != :nomatch

  defp build_config(_base, %TextFixerConfig{} = config), do: config

  defp build_config(base, opts) when is_list(opts) do
    struct(base, normalize_opts(opts))
  end

  # Support the old `fix_entities` keyword as an alias for `unescape_html`.
  defp normalize_opts(opts) do
    case Keyword.pop(opts, :fix_entities) do
      {nil, opts} -> opts
      {value, opts} -> Keyword.put(opts, :unescape_html, value)
    end
  end

  # Split text into segments at each newline (kept), capping each segment at
  # `max` codepoints. Port of the segmentation loop in fix_text.
  defp split_segments("", _max), do: []

  defp split_segments(text, max) do
    case :binary.split(text, "\n") do
      [before, after_rest] -> cap(before <> "\n", max) ++ split_segments(after_rest, max)
      [whole] -> cap(whole, max)
    end
  end

  # Split one newline-delimited segment into chunks of at most `max` characters.
  # byte_size is always >= the character count, so a segment within the cap by
  # bytes is returned whole without any per-character work; only an oversized
  # segment is chopped, in a single linear pass over the binary (no quadratic
  # re-slicing or re-concatenation). Chops by codepoint, matching Python's
  # length-based segmentation.
  defp cap(segment, max) when byte_size(segment) <= max, do: [segment]
  defp cap(segment, max), do: chop(segment, max, segment, 0, 0, 0, [])

  defp chop(<<>>, _max, full, start, _count, pos, acc) do
    acc = if pos > start, do: [binary_part(full, start, pos - start) | acc], else: acc
    Enum.reverse(acc)
  end

  defp chop(<<_::utf8, rest::binary>>, max, full, start, count, pos, acc) do
    pos = pos + (byte_size(full) - byte_size(rest) - pos)
    count = count + 1

    if count == max do
      chop(rest, max, full, pos, 0, pos, [binary_part(full, start, pos - start) | acc])
    else
      chop(rest, max, full, start, count, pos, acc)
    end
  end
end