defmodule Magika do
@moduledoc """
Elixir binding of [Magika](https://github.com/google/magika), Google's
deep-learning file content type detector.
Magika identifies the content type of a file (e.g. `html`, `python`, `pdf`,
`zip`) from its bytes, using a small ONNX model run via
[OnnxRuntime](https://hex.pm/packages/onnxruntime). It is a faithful port of
the reference Python implementation's `standard_v3_3` model and inference
logic.
## Usage
The model is loaded once and hosted by a supervised `Magika.Server` that
starts automatically with the `:magika` application. Call the API without
threading an instance around:
{:ok, result} = Magika.identify("<!DOCTYPE html>\\n<html>...</html>")
result.prediction.output.label #=> "html"
result.prediction.output.mime_type #=> "text/html"
result.prediction.score #=> 0.99...
{:ok, result} = Magika.identify_path("/path/to/file.pdf")
result.prediction.output.label #=> "pdf"
## Prediction mode
The prediction mode controls how strict Magika is before trusting the model's
guess. The hosted server uses `:high_confidence` by default; change it in your
application config:
config :magika, prediction_mode: :best_guess
The modes:
* `:high_confidence` (default) — keep the model prediction only when its
score clears the per-content-type threshold (falling back to the
medium-confidence threshold otherwise).
* `:medium_confidence` — keep the model prediction when its score clears the
generic medium-confidence threshold.
* `:best_guess` — always return the model prediction regardless of score.
When the score is too low for the chosen mode, the output is generalized to
`txt` (for text content types) or `unknown` (for binary content types).
## Standalone instances (advanced)
You normally don't need this. For one-off scripts or tests you can build an
instance with `new/1` and pass it as the first argument, bypassing the
supervised server. A `Magika.t()` is immutable and safe to reuse:
magika = Magika.new(prediction_mode: :best_guess)
{:ok, result} = Magika.identify(magika, content)
A specific named server can also be targeted with the `:server` option:
{:ok, result} = Magika.identify(content, server: MyApp.Magika)
"""
alias Magika.{Config, Features, Inference, Prediction, Result}
@enforce_keys [:model, :config, :prediction_mode]
defstruct [:model, :config, :prediction_mode]
@type prediction_mode :: :high_confidence | :medium_confidence | :best_guess
@type t :: %__MODULE__{
model: OnnxRuntime.Model.t(),
config: Config.t(),
prediction_mode: prediction_mode()
}
@doc """
Creates a new Magika instance, loading the model and configuration.
## Options
* `:prediction_mode` — one of `:high_confidence` (default),
`:medium_confidence`, `:best_guess`.
* `:model_path` — path to a custom `model.onnx`. Defaults to the vendored
`standard_v3_3` model.
* `:model_config_path` — path to a custom `config.min.json`.
* `:content_types_kb_path` — path to a custom `content_types_kb.min.json`.
"""
@spec new(keyword()) :: t()
def new(opts \\ []) do
prediction_mode = Keyword.get(opts, :prediction_mode, :high_confidence)
unless prediction_mode in [:high_confidence, :medium_confidence, :best_guess] do
raise ArgumentError, "invalid :prediction_mode #{inspect(prediction_mode)}"
end
model_path = Keyword.get(opts, :model_path, Config.default_model_path())
config_path = Keyword.get(opts, :model_config_path, Config.default_model_config_path())
kb_path = Keyword.get(opts, :content_types_kb_path, Config.default_kb_path())
%__MODULE__{
model: OnnxRuntime.load(model_path),
config: Config.load(config_path, kb_path),
prediction_mode: prediction_mode
}
end
@doc """
Identifies the content type of the given raw `content` (a binary).
Resolves the hosted instance from a `Magika.Server`. Pass `server:` to target
a specific named server (defaults to `Magika.Server`). Alternatively, pass a
`Magika` instance as the first argument to bypass the server entirely.
Always returns `{:ok, result}` — identification of in-memory bytes cannot
fail the way a filesystem read can.
"""
@spec identify(binary(), keyword()) :: {:ok, Result.t()}
@spec identify(t(), binary()) :: {:ok, Result.t()}
def identify(content, opts \\ [])
def identify(content, opts) when is_binary(content) and is_list(opts) do
identify(server_instance(opts), content)
end
def identify(%__MODULE__{} = magika, content) when is_binary(content) do
prediction = predict_from_content(magika, content)
{:ok, %Result{status: :ok, prediction: prediction}}
end
@doc """
Identifies the content type of the file at `path`.
Resolves the hosted instance from a `Magika.Server`. Pass `server:` to target
a specific named server (defaults to `Magika.Server`). Alternatively, pass a
`Magika` instance as the first argument to bypass the server entirely.
Returns `{:ok, result}` on success, or `{:error, result}` when the path does
not exist or cannot be read. Directories and other special files are reported
via dedicated content types (`directory`, `symlink`, `unknown`).
"""
@spec identify_path(Path.t(), keyword()) :: {:ok, Result.t()} | {:error, Result.t()}
@spec identify_path(t(), Path.t()) :: {:ok, Result.t()} | {:error, Result.t()}
def identify_path(path, opts \\ [])
def identify_path(path, opts) when is_binary(path) and is_list(opts) do
identify_path(server_instance(opts), path)
end
def identify_path(%__MODULE__{} = magika, path) when is_binary(path) do
case classify_path(magika, path) do
{:ok, prediction} ->
{:ok, %Result{status: :ok, prediction: prediction, path: path}}
{:error, status} ->
{:error, %Result{status: status, prediction: nil, path: path}}
end
end
@doc """
Identifies the content type read from an open binary `IO.device`/file.
Resolves the hosted instance from a `Magika.Server`. Pass `server:` to target
a specific named server (defaults to `Magika.Server`). Alternatively, pass a
`Magika` instance as the first argument to bypass the server entirely.
The whole stream is read into memory (Magika only needs a bounded prefix and
suffix, but reading fully keeps the implementation simple and correct). The
caller is responsible for opening and closing the device.
"""
@spec identify_stream(IO.device(), keyword()) :: {:ok, Result.t()}
@spec identify_stream(t(), IO.device()) :: {:ok, Result.t()}
def identify_stream(device, opts \\ [])
def identify_stream(device, opts) when is_list(opts) and not is_struct(device, __MODULE__) do
identify_stream(server_instance(opts), device)
end
def identify_stream(%__MODULE__{} = magika, device) do
content =
case IO.binread(device, :eof) do
:eof -> <<>>
data when is_binary(data) -> data
end
identify(magika, content)
end
defp server_instance(opts) do
opts |> Keyword.get(:server, Magika.Server.default_name()) |> Magika.Server.instance()
end
@doc "Returns the loaded model's name (the model directory basename)."
@spec model_name(t()) :: String.t()
def model_name(%__MODULE__{}), do: "standard_v3_3"
# ── Path corner cases ──────────────────────────────────────────────────────
defp classify_path(magika, path) do
case File.lstat(path) do
{:error, :enoent} ->
{:error, :file_not_found}
{:error, _reason} ->
{:error, :permission_error}
{:ok, %File.Stat{type: :symlink}} ->
# Default behaviour follows the symlink; resolve via File.stat.
classify_resolved(magika, path)
{:ok, %File.Stat{type: :directory}} ->
{:ok, special_prediction(magika, "directory")}
{:ok, %File.Stat{type: :regular}} ->
read_and_classify(magika, path)
{:ok, _other} ->
{:ok, special_prediction(magika, "unknown")}
end
end
defp classify_resolved(magika, path) do
case File.stat(path) do
{:error, :enoent} -> {:error, :file_not_found}
{:error, _} -> {:error, :permission_error}
{:ok, %File.Stat{type: :directory}} -> {:ok, special_prediction(magika, "directory")}
{:ok, %File.Stat{type: :regular}} -> read_and_classify(magika, path)
{:ok, _} -> {:ok, special_prediction(magika, "unknown")}
end
end
defp read_and_classify(magika, path) do
case File.read(path) do
{:ok, content} -> {:ok, predict_from_content(magika, content)}
{:error, _} -> {:error, :permission_error}
end
end
# ── Content classification ──────────────────────────────────────────────────
defp predict_from_content(%__MODULE__{config: config} = magika, content) do
size = byte_size(content)
cond do
size == 0 ->
special_prediction(magika, "empty")
size < config.min_file_size_for_dl ->
few_bytes_prediction(magika, content)
true ->
beg = Features.extract_beg(content, config)
# If the n-th token (n = min_file_size_for_dl) is padding, then after
# stripping whitespace we do not have enough meaningful bytes for a
# reliable DL prediction; fall back to the few-bytes heuristic.
if Enum.at(beg, config.min_file_size_for_dl - 1) == config.padding_token do
few_bytes_prediction(magika, content)
else
dl_prediction(magika, content)
end
end
end
defp dl_prediction(%__MODULE__{config: config} = magika, content) do
features = Features.extract(content, config)
{dl_label, score} = Inference.predict(magika.model, features, config.target_labels_space)
{output_label, reason} = resolve_output(magika, dl_label, score)
%Prediction{
dl: Config.content_type_info(config, dl_label),
output: Config.content_type_info(config, output_label),
score: score,
overwrite_reason: reason
}
end
# Apply the overwrite map and confidence thresholds to turn a raw DL label
# and score into the final output label. Mirrors
# `_get_output_label_from_dl_label_and_score` in the reference.
defp resolve_output(%__MODULE__{config: config} = magika, dl_label, score) do
mapped = Map.get(config.overwrite_map, dl_label, dl_label)
base_reason = if mapped != dl_label, do: :overwrite_map, else: :none
keep? =
case magika.prediction_mode do
:best_guess ->
true
:high_confidence ->
threshold = Map.get(config.thresholds, dl_label, config.medium_confidence_threshold)
score >= threshold
:medium_confidence ->
score >= config.medium_confidence_threshold
end
if keep? do
{mapped, base_reason}
else
# Not confident enough: generalize to txt or unknown based on whether the
# (mapped) content type is textual.
generalized =
if Config.content_type_info(config, mapped).is_text, do: "txt", else: "unknown"
reason = if dl_label == generalized, do: :none, else: :low_confidence
{generalized, reason}
end
end
# For very small files (or files that are mostly whitespace), decide between
# txt and unknown by attempting to interpret the bytes as UTF-8.
defp few_bytes_prediction(%__MODULE__{config: config}, content) do
label = if String.valid?(content), do: "txt", else: "unknown"
undefined_special(config, label)
end
defp special_prediction(%__MODULE__{config: config}, output_label) do
undefined_special(config, output_label)
end
# Build a prediction for inputs handled without the model: the DL label is the
# special `undefined` content type, the score is 1.0, and there is no
# overwrite.
defp undefined_special(config, output_label) do
%Prediction{
dl: Config.content_type_info(config, "undefined"),
output: Config.content_type_info(config, output_label),
score: 1.0,
overwrite_reason: :none
}
end
end