# Part of ftfy for Elixir — an Apache-2.0 port of python-ftfy.
# Original ftfy Copyright 2023 Robyn Speer; this port Copyright 2026 FashionUnited.
defmodule Ftfy do
@moduledoc """
ftfy: fixes text for you.
A port of the Python [ftfy](https://github.com/rspeer/python-ftfy) library for
making text less broken — most importantly, fixing *mojibake* (text that was
decoded in the wrong encoding).
iex> Ftfy.fix_text("✔ No problems")
"✔ No problems"
iex> Ftfy.fix_text("Broken text… it’s flubberific!")
"Broken text… it's flubberific!"
See `Ftfy.TextFixerConfig` for the available options. The top-level functions
accept either a `%Ftfy.TextFixerConfig{}` or a keyword list of overrides.
"""
alias Ftfy.{Badness, Chardata, Codecs, Fixes, TextFixerConfig}
@version "6.3.1"
def version, do: @version
# Functions that can be applied by a "transcode"/"apply" plan step.
@fixers %{
"unescape_html" => &Fixes.unescape_html/1,
"remove_terminal_escapes" => &Fixes.remove_terminal_escapes/1,
"restore_byte_a0" => &Fixes.restore_byte_a0/1,
"replace_lossy_sequences" => &Fixes.replace_lossy_sequences/1,
"decode_inconsistent_utf8" => &Fixes.decode_inconsistent_utf8/1,
"fix_c1_controls" => &Fixes.fix_c1_controls/1,
"fix_latin_ligatures" => &Fixes.fix_latin_ligatures/1,
"fix_character_width" => &Fixes.fix_character_width/1,
"uncurl_quotes" => &Fixes.uncurl_quotes/1,
"fix_line_breaks" => &Fixes.fix_line_breaks/1,
"fix_surrogates" => &Fixes.fix_surrogates/1,
"remove_control_chars" => &Fixes.remove_control_chars/1
}
def fixers, do: @fixers
@doc """
Given Unicode text, fix inconsistencies and glitches such as mojibake.
Fixes the text in independent segments (usually lines), and discards any
explanation. Pass a `%Ftfy.TextFixerConfig{}` or keyword overrides as the
second argument.
iex> Ftfy.fix_text("LOUD NOISES")
"LOUD NOISES"
"""
@spec fix_text(binary(), TextFixerConfig.t() | keyword()) :: binary()
def fix_text(text, config_or_opts \\ []) do
config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
max = config.max_decode_length
{chunks, _config} =
text
|> split_segments(max)
|> Enum.reduce({[], config}, fn segment, {acc, cfg} ->
cfg =
if cfg.unescape_html == "auto" and String.contains?(segment, "<"),
do: %{cfg | unescape_html: false},
else: cfg
{fixed, _explanation} = fix_and_explain(segment, cfg)
{[fixed | acc], cfg}
end)
chunks |> Enum.reverse() |> IO.iodata_to_binary()
end
@doc """
Fix text as a single segment, returning `{fixed_text, explanation}`.
The explanation is a list of `{action, parameter}` steps that can be replayed
with `apply_plan/2`, or `nil` if `explain` is false.
"""
@spec fix_and_explain(binary(), TextFixerConfig.t() | keyword()) ::
{binary(), [{String.t(), String.t()}] | nil}
def fix_and_explain(text, config_or_opts \\ []) do
config = build_config(%TextFixerConfig{}, config_or_opts)
config =
if config.unescape_html == "auto" and String.contains?(text, "<"),
do: %{config | unescape_html: false},
else: config
steps = if config.explain, do: [], else: nil
fix_loop(text, config, steps)
end
defp fix_loop(text, config, steps) do
{new_text, steps} =
{text, steps}
|> try_fix(:unescape_html, config)
|> apply_encoding_fix(config)
|> try_fixers(
[
:fix_c1_controls,
:fix_latin_ligatures,
:fix_character_width,
:uncurl_quotes,
:fix_line_breaks,
:fix_surrogates,
:remove_terminal_escapes,
:remove_control_chars
],
config
)
|> apply_normalization(config)
if new_text == text, do: {new_text, steps}, else: fix_loop(new_text, config, steps)
end
defp apply_encoding_fix({text, steps}, %{fix_encoding: false}), do: {text, steps}
defp apply_encoding_fix({text, nil}, config), do: {fix_encoding(text, config), nil}
defp apply_encoding_fix({text, steps}, config) do
{fixed, encoding_steps} = fix_encoding_and_explain(text, config)
{fixed, steps ++ encoding_steps}
end
defp try_fixers({text, steps}, names, config) do
Enum.reduce(names, {text, steps}, fn name, acc -> try_fix(acc, name, config) end)
end
defp try_fix({text, steps}, name, config) do
if Map.get(config, name) do
fixed = @fixers[Atom.to_string(name)].(text)
steps =
if steps != nil and fixed != text,
do: steps ++ [{"apply", Atom.to_string(name)}],
else: steps
{fixed, steps}
else
{text, steps}
end
end
defp apply_normalization({text, steps}, %{normalization: nil}), do: {text, steps}
defp apply_normalization({text, steps}, %{normalization: form}) do
fixed = normalize(text, form)
steps = if steps != nil and fixed != text, do: steps ++ [{"normalize", form}], else: steps
{fixed, steps}
end
@doc """
Apply just the encoding-fixing steps of ftfy, returning `{fixed, explanation}`.
iex> Ftfy.fix_encoding_and_explain("só")
{"só", [{"encode", "latin-1"}, {"decode", "utf-8"}]}
"""
@spec fix_encoding_and_explain(binary(), TextFixerConfig.t() | keyword()) ::
{binary(), [{String.t(), String.t()}]}
def fix_encoding_and_explain(text, config_or_opts \\ []) do
config = build_config(%TextFixerConfig{}, config_or_opts)
if config.fix_encoding do
fix_encoding_loop(text, config, [])
else
{text, []}
end
end
defp fix_encoding_loop(text, config, plan_so_far) do
{new_text, plan} = fix_encoding_one_step(text, config)
plan_so_far = plan_so_far ++ plan
if new_text == text,
do: {new_text, plan_so_far},
else: fix_encoding_loop(new_text, config, plan_so_far)
end
@doc """
Apply just the encoding-fixing steps of ftfy, discarding the explanation.
iex> Ftfy.fix_encoding("ó")
"ó"
iex> Ftfy.fix_encoding("&ATILDE;&SUP3;")
"&ATILDE;&SUP3;"
"""
@spec fix_encoding(binary(), TextFixerConfig.t() | keyword()) :: binary()
def fix_encoding(text, config_or_opts \\ []) do
config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
{fixed, _plan} = fix_encoding_and_explain(text, config)
fixed
end
@doc "Fix text as a single segment, discarding the explanation."
@spec fix_text_segment(binary(), TextFixerConfig.t() | keyword()) :: binary()
def fix_text_segment(text, config_or_opts \\ []) do
config = build_config(%TextFixerConfig{explain: false}, config_or_opts)
{fixed, _plan} = fix_and_explain(text, config)
fixed
end
# One step of encoding repair (port of _fix_encoding_one_step_and_explain).
defp fix_encoding_one_step("", _config), do: {"", []}
defp fix_encoding_one_step(text, config) do
if Chardata.possible_encoding(text, "ascii") or not Badness.is_bad(text) do
{text, []}
else
case try_encodings(Chardata.charmap_encodings(), text, config, []) do
{:done, fixed, steps} -> {fixed, steps}
{:exhausted, possible} -> encoding_fallbacks(text, possible, config)
end
end
end
# Try to repair by re-encoding in each candidate single-byte encoding and
# decoding the result as UTF-8. Returns on the first success.
defp try_encodings([], _text, _config, possible), do: {:exhausted, Enum.reverse(possible)}
defp try_encodings([encoding | rest], text, config, possible) do
if Chardata.possible_encoding(text, encoding) do
{:ok, encoded} = Codecs.encode(text, encoding)
{encoded, transcode_steps} = maybe_restore_byte_a0(encoded, encoding, config)
{encoded, transcode_steps} = maybe_replace_lossy(encoded, encoding, config, transcode_steps)
decoding =
if byte_in?(encoded, 0xED) or byte_in?(encoded, 0xC0),
do: "utf-8-variants",
else: "utf-8"
case Codecs.decode(encoded, decoding) do
{:ok, fixed} ->
steps = [{"encode", encoding}] ++ transcode_steps ++ [{"decode", decoding}]
{:done, fixed, steps}
{:error, _} ->
try_encodings(rest, text, config, [encoding | possible])
end
else
try_encodings(rest, text, config, possible)
end
end
defp maybe_restore_byte_a0(encoded, encoding, config) do
if config.restore_byte_a0 and encoding != "macroman" and
Regex.match?(Chardata.altered_utf8_re(), encoded) do
replaced = Fixes.restore_byte_a0(encoded)
if replaced != encoded,
do: {replaced, [{"transcode", "restore_byte_a0"}]},
else: {encoded, []}
else
{encoded, []}
end
end
defp maybe_replace_lossy(encoded, encoding, config, transcode_steps) do
if config.replace_lossy_sequences and String.starts_with?(encoding, "sloppy") do
replaced = Fixes.replace_lossy_sequences(encoded)
if replaced != encoded,
do: {replaced, transcode_steps ++ [{"transcode", "replace_lossy_sequences"}]},
else: {encoded, transcode_steps}
else
{encoded, transcode_steps}
end
end
# After the encoding loop: the inconsistent-UTF-8 fix, the Latin-1-as-1252
# fix, and the C1-controls fix, in that order.
defp encoding_fallbacks(text, possible, config) do
with :no <- inconsistent_utf8_fallback(text, config),
:no <- latin1_fallback(text, possible),
:no <- c1_controls_fallback(text, config) do
{text, []}
else
result -> result
end
end
defp inconsistent_utf8_fallback(text, config) do
if config.decode_inconsistent_utf8 and Regex.match?(Chardata.utf8_detector_re(), text) do
fixed = Fixes.decode_inconsistent_utf8(text)
if fixed != text, do: {fixed, [{"apply", "decode_inconsistent_utf8"}]}, else: :no
else
:no
end
end
defp latin1_fallback(text, possible) do
if "latin-1" in possible do
with {:ok, encoded} <- Codecs.encode(text, "latin-1"),
{:ok, fixed} when fixed != text <- Codecs.decode(encoded, "windows-1252") do
{fixed, [{"encode", "latin-1"}, {"decode", "windows-1252"}]}
else
_ -> :no
end
else
:no
end
end
defp c1_controls_fallback(text, config) do
if config.fix_c1_controls and Regex.match?(Chardata.c1_control_re(), text) do
{Fixes.fix_c1_controls(text), [{"transcode", "fix_c1_controls"}]}
else
:no
end
end
@doc """
Apply a plan (a list of `{operation, arg}` steps) to fix the encoding of text.
`operation` is one of `"encode"`, `"decode"`, `"transcode"`, or `"apply"`.
iex> {_text, plan} = Ftfy.fix_and_explain("schön")
iex> Ftfy.apply_plan("schön", plan)
"schön"
"""
@spec apply_plan(binary(), [{String.t(), String.t()}]) :: binary()
def apply_plan(text, plan) do
Enum.reduce(plan, text, fn {operation, arg}, obj ->
case operation do
"encode" ->
{:ok, bytes} = Codecs.encode(obj, arg)
bytes
"decode" ->
{:ok, string} = Codecs.decode(obj, arg)
string
op when op in ["transcode", "apply"] ->
case Map.fetch(@fixers, arg) do
{:ok, fixer} -> fixer.(obj)
:error -> raise ArgumentError, "Unknown function to apply: #{arg}"
end
other ->
raise ArgumentError, "Unknown plan step: #{other}"
end
end)
end
@doc """
Guess a reasonable decoding for some bytes in an unknown encoding, returning
`{string, encoding_name}`.
This is not accurate and may *create* Unicode problems. It is not the
recommended way to use ftfy.
"""
@spec guess_bytes(binary()) :: {binary(), String.t()}
def guess_bytes(<<0xFE, 0xFF, rest::binary>>),
do: {:unicode.characters_to_binary(rest, {:utf16, :big}), "utf-16"}
def guess_bytes(<<0xFF, 0xFE, rest::binary>>),
do: {:unicode.characters_to_binary(rest, {:utf16, :little}), "utf-16"}
def guess_bytes(bytes) when is_binary(bytes) do
variant? = byte_in?(bytes, 0xED) or byte_in?(bytes, 0xC0)
decoding = if variant?, do: "utf-8-variants", else: "utf-8"
case Codecs.decode(bytes, decoding) do
{:ok, decoded} ->
{decoded, decoding}
{:error, _} ->
if byte_in?(bytes, 0x0D) and not byte_in?(bytes, 0x0A) do
{:ok, decoded} = Codecs.decode(bytes, "macroman")
{decoded, "macroman"}
else
{:ok, decoded} = Codecs.decode(bytes, "sloppy-windows-1252")
{decoded, "sloppy-windows-1252"}
end
end
end
@doc """
Fix text found in a stream of lines, returning a stream of fixed lines.
`opts` may include `:encoding` (decode each line from these bytes; `nil` or
absent means guess) and `:config`.
"""
@spec fix_file(Enumerable.t(), keyword()) :: Enumerable.t()
def fix_file(lines, opts \\ []) do
encoding = Keyword.get(opts, :encoding)
config = build_config(%TextFixerConfig{}, Keyword.get(opts, :config, []))
Stream.transform(lines, config, fn line, cfg ->
decoded =
case encoding do
nil ->
elem(guess_bytes(line), 0)
enc ->
case Codecs.decode(line, enc) do
{:ok, string} -> string
{:error, _} -> raise ArgumentError, "could not decode input as #{enc}"
end
end
cfg =
if cfg.unescape_html == "auto" and String.contains?(decoded, "<"),
do: %{cfg | unescape_html: false},
else: cfg
{fixed, _explanation} = fix_and_explain(decoded, cfg)
{[fixed], cfg}
end)
end
@doc """
Print a breakdown of each codepoint in `text` (number, glyph, and category)
for debugging mysterious Unicode.
Note: unlike the Python original, this does not print the Unicode *name* of
each character, since the BEAM does not ship the Unicode names database.
"""
@spec explain_unicode(binary()) :: :ok
def explain_unicode(text) do
Enum.each(String.to_charlist(text), fn cp ->
line = :io_lib.format("U+~4.16.0B ~ts", [cp, <<cp::utf8>>])
IO.puts(IO.iodata_to_binary(line))
end)
end
# --- helpers -------------------------------------------------------------
# Pure-ASCII text is already in every normal form, so skip the (allocating)
# normalization entirely. The byte scan is cheap and short-circuits on the
# first non-ASCII byte.
defp normalize(text, form) do
if ascii?(text), do: text, else: normalize_form(text, form)
end
defp normalize_form(text, "NFC"), do: :unicode.characters_to_nfc_binary(text)
defp normalize_form(text, "NFD"), do: :unicode.characters_to_nfd_binary(text)
defp normalize_form(text, "NFKC"), do: :unicode.characters_to_nfkc_binary(text)
defp normalize_form(text, "NFKD"), do: :unicode.characters_to_nfkd_binary(text)
defp ascii?(<<c, rest::binary>>) when c < 128, do: ascii?(rest)
defp ascii?(<<>>), do: true
defp ascii?(_), do: false
defp byte_in?(binary, byte), do: :binary.match(binary, <<byte>>) != :nomatch
defp build_config(_base, %TextFixerConfig{} = config), do: config
defp build_config(base, opts) when is_list(opts) do
struct(base, normalize_opts(opts))
end
# Support the old `fix_entities` keyword as an alias for `unescape_html`.
defp normalize_opts(opts) do
case Keyword.pop(opts, :fix_entities) do
{nil, opts} -> opts
{value, opts} -> Keyword.put(opts, :unescape_html, value)
end
end
# Split text into segments at each newline (kept), capping each segment at
# `max` codepoints. Port of the segmentation loop in fix_text.
defp split_segments("", _max), do: []
defp split_segments(text, max) do
case :binary.split(text, "\n") do
[before, after_rest] -> cap(before <> "\n", max) ++ split_segments(after_rest, max)
[whole] -> cap(whole, max)
end
end
# Split one newline-delimited segment into chunks of at most `max` characters.
# byte_size is always >= the character count, so a segment within the cap by
# bytes is returned whole without any per-character work; only an oversized
# segment is chopped, in a single linear pass over the binary (no quadratic
# re-slicing or re-concatenation). Chops by codepoint, matching Python's
# length-based segmentation.
defp cap(segment, max) when byte_size(segment) <= max, do: [segment]
defp cap(segment, max), do: chop(segment, max, segment, 0, 0, 0, [])
defp chop(<<>>, _max, full, start, _count, pos, acc) do
acc = if pos > start, do: [binary_part(full, start, pos - start) | acc], else: acc
Enum.reverse(acc)
end
defp chop(<<_::utf8, rest::binary>>, max, full, start, count, pos, acc) do
pos = pos + (byte_size(full) - byte_size(rest) - pos)
count = count + 1
if count == max do
chop(rest, max, full, pos, 0, pos, [binary_part(full, start, pos - start) | acc])
else
chop(rest, max, full, start, count, pos, acc)
end
end
end