Skip to main content

lib/npm/security/compromised/osv.ex

defmodule NPM.Security.Compromised.OSV do
  @moduledoc """
  Optional OSV.dev lookup for malicious npm package advisories.

  The OpenSSF malicious-packages dataset is published in OSV format and is
  ingested by OSV.dev. Calls are opt-in so normal installs remain offline and
  deterministic unless a caller explicitly enables online checks.
  """

  @endpoint "https://api.osv.dev/v1/query"
  @batch_endpoint "https://api.osv.dev/v1/querybatch"

  @doc "Build the OSV package-version query body for an npm package."
  @spec query_body(String.t(), String.t()) :: map()
  def query_body(package, version) do
    %{"package" => %{"name" => package, "ecosystem" => "npm"}, "version" => version}
  end

  @doc "Build the OSV batch query body for npm package versions."
  @spec batch_body([{String.t(), String.t()}]) :: map()
  def batch_body(packages) do
    %{"queries" => Enum.map(packages, fn {package, version} -> query_body(package, version) end)}
  end

  @doc "Query OSV.dev for one npm package version."
  @spec query_package(String.t(), String.t(), keyword()) :: {:ok, [map()]} | {:error, term()}
  def query_package(package, version, opts \\ []) do
    endpoint = Keyword.get(opts, :endpoint, @endpoint)

    request = [
      json: query_body(package, version),
      receive_timeout: Keyword.get(opts, :timeout, 10_000)
    ]

    case Req.post(endpoint, request) do
      {:ok, %{status: status, body: %{"vulns" => vulns}}} when status in 200..299 ->
        {:ok, Enum.filter(vulns, &malicious_advisory?/1)}

      {:ok, %{status: status, body: body}} ->
        {:error, {:http_error, status, body}}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc "Query OSV.dev for multiple npm package versions."
  @spec query_packages([{String.t(), String.t()}], keyword()) ::
          {:ok, %{String.t() => [map()]}} | {:error, term()}
  def query_packages(packages, opts \\ []) do
    endpoint = Keyword.get(opts, :batch_endpoint, @batch_endpoint)
    unique_packages = Enum.uniq(packages)

    request = [
      json: batch_body(unique_packages),
      receive_timeout: Keyword.get(opts, :timeout, 30_000)
    ]

    case Req.post(endpoint, request) do
      {:ok, %{status: status, body: %{"results" => results}}} when status in 200..299 ->
        {:ok, results_to_map(unique_packages, results)}

      {:ok, %{status: status, body: body}} ->
        {:error, {:http_error, status, body}}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc "Return whether an OSV advisory is a malicious-package report."
  @spec malicious_advisory?(map()) :: boolean()
  def malicious_advisory?(%{"id" => "MAL-" <> _}), do: true

  def malicious_advisory?(%{"database_specific" => %{"malicious-packages-origins" => origins}})
      when is_list(origins) do
    origins != []
  end

  def malicious_advisory?(%{"summary" => summary}) when is_binary(summary) do
    summary |> String.downcase() |> String.contains?("malicious")
  end

  def malicious_advisory?(_), do: false

  defp results_to_map(packages, results) do
    packages
    |> Enum.zip(results)
    |> Map.new(fn {{package, _version}, result} ->
      advisories = result |> Map.get("vulns", []) |> Enum.filter(&malicious_advisory?/1)
      {package, advisories}
    end)
  end
end