lib/off_broadway/defender365/incident_client.ex

defmodule OffBroadway.Defender365.IncidentClient do
  @moduledoc """
  Default API client used by `OffBroadway.Defender365.Producer` to receive incidents
  from Microsoft 365 Defender
  [Incident APIs](https://learn.microsoft.com/en-us/microsoft-365/security/defender/api-incident).
  An incident is a collection of related alerts that help describe an attack. Events from
  different entities in an organization are automatically aggregated by Microsoft Defender 365.

  This module implements the `OffBroadway.Defender365.Client` and `Broadway.Acknowledger`
  behaviours which defines callbacks for receiving and acknowledging events.

  The 365 Defender Incident client uses the `api.security.microsoft.com` endpoints for receiving
  incidents and is implemented using the [Tesla](https://hexdocs.pm/tesla/readme.html) library.
  Tesla is a HTTP client abstraction library which lets us easily select from a range of HTTP adapters.
  Please see the Tesla [documentation](https://hexdocs.pm/tesla/readme.html#adapters)
  for more information.

  The following quotas are enforced for the incidents API:

    - Maximum page size is **100 incidents**
    - Maximum rate of requests is **50 calls per minute** and **1500 calls per hour**

  The following permissions are required to call the incidents API:

    - Permission type: **Application** - _Incident.Read.All_
    - Permission type: **Application** - _Incident.ReadWrite.All_
    - Permission type: **Delegated** - _Incident.Read_
    - Permission type: **Delegated** - _Incident.ReadWrite_
  """
  alias Broadway.{Acknowledger, Message}
  alias OffBroadway.Defender365.Incident
  require Logger

  @behaviour Acknowledger
  @behaviour OffBroadway.Defender365.Client

  @max_num_incidents_allowed 100

  @impl true
  def init(opts) do
    {:ok,
     opts
     |> prepare_cfg(Application.get_env(:off_broadway_defender365, :defender365_client) || [])
     |> Keyword.put(:ack_ref, opts[:broadway][:name])}
  end

  @doc """
  Returns a `Tesla.Client` configured with middleware.

    * `Tesla.Middleware.BaseUrl` middleware configured with `base_url` passed via `opts`.
    * `Tesla.Middleware.BearerAuth` middleware configured with `api_token` passed via `opts`.
    * `Tesla.Middleware.Query` middleware configured with `query` passed via `opts`.
    * `Tesla.Middleware.JSON` middleware configured with `Jason` engine.
  """
  @spec client(opts :: Keyword.t()) :: Tesla.Client.t()
  def client(opts) do
    middleware = [
      {Tesla.Middleware.Logger, filter_headers: ["authorization"]},
      {Tesla.Middleware.BaseUrl, client_option(opts, :base_url)},
      {Tesla.Middleware.BearerAuth, token: fetch_client_token(opts)},
      {Tesla.Middleware.Query, client_option(opts, :query)},
      {Tesla.Middleware.JSON, engine: Jason}
    ]

    Tesla.client(middleware)
  end

  @impl true
  def receive_messages(demand, opts), do: receive_messages(demand, opts, [])

  def receive_messages(demand, opts, messages) when demand > 0 do
    max_demand = min(demand, @max_num_incidents_allowed)
    rem_demand = rem(demand, max_demand)

    query =
      case Keyword.get(opts, :query) do
        nil -> Keyword.new("$top": max_demand)
        kw -> Keyword.put(kw, :"$top", max_demand)
      end

    client_opts = Keyword.put(opts, :query, query)

    client(client_opts)
    |> Tesla.get("/api/incidents")
    |> maybe_receive_messages(rem_demand, opts, messages)
  end

  def maybe_receive_messages({:ok, %{status: 200, body: %{"value" => messages}}}, rem_demand, opts, merge)
      when is_list(messages) and rem_demand > 0 do
    Logger.warning(
      "Received demand greater than maximum allowed number of incidents allowed to fetch from " <>
        "365 Defender API. Trying to fetch remaining #{rem_demand} incidents, but this can possibly " <>
        "cause quota limits to be exceeded."
    )

    receive_messages(rem_demand, opts, messages ++ merge)
  end

  def maybe_receive_messages({:ok, %{status: 200, body: %{"value" => messages}}}, _remains, opts, merge)
      when is_list(messages),
      do: wrap_received_messages(messages ++ merge, opts)

  def maybe_receive_messages({:ok, %{status: status_code, body: body}}, _remains, opts, merge) do
    Logger.error(
      "Failed to fetch incidents from remote host. " <>
        "Request failed with status code #{status_code} and response body #{inspect(body)}."
    )

    wrap_received_messages(merge, opts)
  end

  @impl Acknowledger
  def ack(ack_ref, successful, failed) do
    ack_options = :persistent_term.get(ack_ref)

    messages =
      Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++
        Enum.filter(failed, &ack?(&1, ack_options, :on_failure))

    Enum.each(messages, &ack_message(&1, ack_options))
  end

  @impl true
  def ack_message(message, ack_options) do
    :telemetry.execute(
      [:off_broadway_defender365, :receive_messages, :ack],
      %{time: System.system_time(), count: 1},
      %{
        tenant_id: ack_options.config[:tenant_id],
        client_id: ack_options.config[:client_id],
        receipt: extract_message_receipt(message)
      }
    )
  end

  defp ack?(message, ack_options, option) do
    {_, _, msg_ack_options} = message.acknowledger
    (msg_ack_options[option] || Map.fetch!(ack_options, option)) == :ack
  end

  defp wrap_received_messages(messages, opts) do
    Enum.map(messages, fn message ->
      metadata = Map.put(message, "tenant_id", opts[:tenant_id]) |> to_struct("metadata")
      alerts = to_struct(message, "alerts")
      acknowledger = build_acknowledger(metadata, opts[:ack_ref])
      %Message{data: alerts, metadata: metadata, acknowledger: acknowledger}
    end)
  end

  defp build_acknowledger(metadata, ack_ref) do
    receipt = %{id: metadata.incident_id}
    {__MODULE__, ack_ref, %{receipt: receipt}}
  end

  defp extract_message_receipt(%{acknowledger: {_, _, %{receipt: receipt}}}), do: receipt

  @spec to_struct(message :: map, key :: binary) :: map
  defp to_struct(message, "metadata") do
    Incident.Metadata.new(message)
    |> Map.put(:comments, Map.get(message, "comments", []) |> Enum.map(&Incident.Comment.new/1))
  end

  defp to_struct(message, "alerts") do
    Map.get(message, "alerts", [])
    |> Enum.map(&Incident.Alert.new/1)
    |> Enum.map(fn alert -> Map.put(alert, :devices, Enum.map(alert.devices, &Incident.Device.new/1)) end)
    |> Enum.map(fn alert -> Map.put(alert, :entities, Enum.map(alert.entities, &Incident.Entity.new/1)) end)
  end

  @spec prepare_cfg(opts :: Keyword.t(), env :: Keyword.t()) :: Keyword.t()
  defp prepare_cfg(opts, env), do: Keyword.merge(env, Keyword.get(opts, :config))

  @spec fetch_client_token(opts :: Keyword.t()) :: String.t()
  defp fetch_client_token(opts) do
    # TODO: Maybe cache token
    middleware = [
      Tesla.Middleware.FormUrlencoded,
      Tesla.Middleware.JSON,
      {Tesla.Middleware.BaseUrl, "https://login.windows.net/#{client_option(opts, :tenant_id)}"}
    ]

    with body <- prepare_auth_client_body(opts),
         client <- Tesla.client(middleware),
         %Tesla.Env{status: 200, body: response_body} <- Tesla.post!(client, "/oauth2/token", body) do
      Map.fetch!(response_body, "access_token")
    else
      %Tesla.Env{status: status, body: response_body} ->
        Logger.error(
          "Failed to obtain access token for service. " <>
            "Request failed with status code #{status} and response body: #{inspect(response_body)}"
        )

        ""
    end
  end

  @spec prepare_auth_client_body(opts :: Keyword.t()) :: map
  defp prepare_auth_client_body(opts),
    do: %{
      "resource" => "https://api.security.microsoft.com",
      "client_id" => client_option(opts, :client_id),
      "client_secret" => client_option(opts, :client_secret),
      "grant_type" => "client_credentials"
    }

  @spec client_option(opts :: Keyword.t(), Atom.t()) :: any
  defp client_option(opts, :base_url), do: Keyword.get(opts, :base_url, "https://api.security.microsoft.com")
  defp client_option(opts, :tenant_id), do: Keyword.get(opts, :tenant_id, "")
  defp client_option(opts, :client_id), do: Keyword.get(opts, :client_id, "")
  defp client_option(opts, :client_secret), do: Keyword.get(opts, :client_secret, "")
  defp client_option(opts, :query), do: Keyword.get(opts, :query, [])
end