Skip to main content

lib/npm/security/compromised.ex

defmodule NPM.Security.Compromised do
  @moduledoc """
  Checks lockfiles against known malicious or compromised npm package reports.

  Local reports use the OSV schema used by the OpenSSF malicious-packages
  repository and OSV.dev `MAL-*` advisories. This keeps the default check
  offline and reproducible while allowing callers to supply an updated database
  path through configuration.
  """

  alias NPM.Config
  alias NPM.JSON
  alias NPM.Security.Compromised.OSV

  @type source :: :local | :osv
  @type finding :: %{
          package: String.t(),
          version: String.t(),
          source: source(),
          advisory: map()
        }

  @doc "Check a lockfile map against configured compromised-package sources."
  @spec check(NPM.Lockfile.t(), keyword()) :: [finding()]
  def check(lockfile, opts \\ []) when is_map(lockfile) do
    sources = Keyword.get(opts, :sources, Config.compromised_sources())

    sources
    |> Enum.flat_map(&check_source(lockfile, &1, opts))
    |> sort_findings()
  end

  @doc "Check one package version against configured compromised-package sources."
  @spec check_package(String.t(), String.t(), keyword()) :: [finding()]
  def check_package(name, version, opts \\ []) do
    check(%{name => %{version: version}}, opts)
  end

  @doc "Check a lockfile against OSV.dev and return query errors to the caller."
  @spec check_osv(NPM.Lockfile.t(), keyword()) :: {:ok, [finding()]} | {:error, term()}
  def check_osv(lockfile, opts \\ []) when is_map(lockfile) do
    packages = Enum.map(lockfile, fn {package, entry} -> {package, entry_version(entry)} end)

    case OSV.query_packages(packages, opts) do
      {:ok, advisories_by_package} ->
        findings =
          Enum.flat_map(lockfile, fn {package, entry} ->
            match_advisories(
              %{package => entry},
              Map.get(advisories_by_package, package, []),
              :osv
            )
          end)

        {:ok, sort_findings(findings)}

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc "Return the shared global cache path for OSV-format compromised-package reports."
  @spec cache_path :: String.t()
  def cache_path, do: Config.compromised_db_path()

  @doc "Read OSV advisory reports from a local JSON database."
  @spec read_database(String.t()) :: {:ok, [map()]} | {:error, term()}
  def read_database(path \\ cache_path()) do
    case JSON.read_file(path) do
      {:ok, data} -> normalize_database(data)
      {:error, :enoent} -> read_bundled_database(path)
      error -> error
    end
  end

  @doc "Write OSV advisory reports to a local JSON database."
  @spec write_database(String.t(), [map()]) :: :ok | {:error, term()}
  def write_database(path, advisories) do
    File.mkdir_p!(Path.dirname(path))
    File.write(path, JSON.encode_pretty(sort_advisories(advisories)))
  end

  @doc "Merge new OSV advisories into an existing local database."
  @spec merge_database(String.t(), [map()]) :: {:ok, [map()]} | {:error, term()}
  def merge_database(path, advisories) do
    with {:ok, existing} <- read_database(path),
         merged = merge_advisories(existing, advisories),
         :ok <- write_database(path, merged) do
      {:ok, merged}
    end
  end

  @doc "Merge advisory lists by OSV id and return stable sorted advisories."
  @spec merge_advisories([map()], [map()]) :: [map()]
  def merge_advisories(existing, new) do
    existing
    |> Kernel.++(new)
    |> Enum.uniq_by(&advisory_key/1)
    |> sort_advisories()
  end

  @doc "Return whether an OSV advisory affects an npm package version."
  @spec affects?(map(), String.t(), String.t()) :: boolean()
  def affects?(advisory, package, version) do
    advisory
    |> Map.get("affected", [])
    |> Enum.any?(&affected_entry?(&1, package, version))
  end

  @doc "Formats compromised-package findings for CLI output."
  @spec format_findings([finding()]) :: [String.t()]
  def format_findings(findings) do
    Enum.map(findings, fn finding ->
      id = finding.advisory["id"] || "unknown"
      summary = finding.advisory["summary"] || "known malicious package"
      "#{finding.package}@#{finding.version} matches #{id}: #{summary}"
    end)
  end

  @doc "Convert a finding to a JSON-encodable map."
  @spec finding_to_json(finding()) :: map()
  def finding_to_json(finding) do
    %{
      "package" => finding.package,
      "version" => finding.version,
      "source" => Atom.to_string(finding.source),
      "advisory" => finding.advisory
    }
  end

  defp check_source(lockfile, :local, opts) do
    path = Keyword.get(opts, :db_path, Config.compromised_db_path())

    case read_database(path) do
      {:ok, advisories} -> match_advisories(lockfile, advisories, :local)
      {:error, :enoent} -> []
      {:error, _reason} -> []
    end
  end

  defp check_source(lockfile, :osv, opts) do
    case Keyword.get(opts, :online?, false) and check_osv(lockfile, opts) do
      {:ok, findings} -> findings
      _ -> []
    end
  end

  defp check_source(_lockfile, _source, _opts), do: []

  defp read_bundled_database(path) do
    bundled_path = Config.bundled_compromised_db_path()

    if path == bundled_path do
      {:error, :enoent}
    else
      read_database(bundled_path)
    end
  end

  defp normalize_database(%{"advisories" => advisories}) when is_list(advisories),
    do: {:ok, advisories}

  defp normalize_database(%{"vulns" => advisories}) when is_list(advisories),
    do: {:ok, advisories}

  defp normalize_database(advisories) when is_list(advisories), do: {:ok, advisories}
  defp normalize_database(_), do: {:error, :invalid_compromised_database}

  defp sort_findings(findings) do
    findings
    |> Enum.uniq_by(fn finding ->
      {finding.source, finding.advisory["id"], finding.package, finding.version}
    end)
    |> Enum.sort_by(fn finding ->
      {finding.package, finding.version, finding.advisory["id"] || ""}
    end)
  end

  defp sort_advisories(advisories) do
    Enum.sort_by(advisories, fn advisory -> advisory["id"] || "" end)
  end

  defp advisory_key(%{"id" => id}) when is_binary(id), do: {:id, id}
  defp advisory_key(advisory), do: {:content, :erlang.phash2(advisory)}

  defp match_advisories(lockfile, advisories, source) do
    for {package, entry} <- lockfile,
        advisory <- advisories,
        version = entry_version(entry),
        affects?(advisory, package, version) do
      %{package: package, version: version, source: source, advisory: advisory}
    end
  end

  defp affected_entry?(
         %{"package" => %{"ecosystem" => ecosystem, "name" => name}} = affected,
         package,
         version
       ) do
    npm_ecosystem?(ecosystem) and name == package and version_affected?(affected, version)
  end

  defp affected_entry?(_affected, _package, _version), do: false

  defp npm_ecosystem?(ecosystem) when is_binary(ecosystem),
    do: String.downcase(ecosystem) == "npm"

  defp npm_ecosystem?(_), do: false

  defp version_affected?(affected, version) do
    explicit_match? = version in Map.get(affected, "versions", [])
    range_match? = affected |> Map.get("ranges", []) |> Enum.any?(&range_affected?(&1, version))

    explicit_match? or range_match? or no_version_constraints?(affected)
  end

  defp no_version_constraints?(affected) do
    Map.get(affected, "versions", []) == [] and Map.get(affected, "ranges", []) == []
  end

  defp range_affected?(%{"type" => type, "events" => events}, version)
       when is_binary(type) and is_list(events) do
    String.upcase(type) == "ECOSYSTEM" and events_match?(events, version)
  end

  defp range_affected?(_range, _version), do: false

  defp events_match?(events, version) do
    events
    |> Enum.chunk_while(nil, &range_event/2, &range_after/1)
    |> Enum.any?(&version_in_range?(version, &1))
  end

  defp range_event(%{"introduced" => introduced}, nil), do: {:cont, %{introduced: introduced}}
  defp range_event(%{"introduced" => introduced}, _range), do: {:cont, %{introduced: introduced}}

  defp range_event(%{"fixed" => fixed}, range),
    do: {:cont, Map.put(range || %{}, :fixed, fixed), nil}

  defp range_event(_event, range), do: {:cont, range}

  defp range_after(nil), do: {:cont, []}
  defp range_after(range), do: {:cont, range, nil}

  defp version_in_range?(version, %{introduced: introduced, fixed: fixed}) do
    NPMSemver.matches?(version, ">=#{normalize_version(introduced)} <#{fixed}")
  rescue
    _ -> false
  end

  defp version_in_range?(version, %{introduced: introduced}) do
    NPMSemver.matches?(version, ">=#{normalize_version(introduced)}")
  rescue
    _ -> false
  end

  defp version_in_range?(version, %{fixed: fixed}) do
    NPMSemver.matches?(version, "<#{fixed}")
  rescue
    _ -> false
  end

  defp version_in_range?(_version, _range), do: false

  defp entry_version(%{version: version}), do: version
  defp entry_version(%{"version" => version}), do: version

  defp normalize_version("0"), do: "0.0.0"
  defp normalize_version(version), do: version
end