lib/off_broadway/splunk/splunk_client.ex

defmodule OffBroadway.Splunk.SplunkClient do
  @moduledoc """
  Default Splunk client used by `OffBroadway.Splunk.Producer` to receive data from
  Splunk.
  This module implements the `OffBroadway.Splunk.Client` and `Broadway.Acknowledger`
  behaviours which define callbacks for receiving and acknowledging events.
  Since Splunk does not have any concept of acknowledging consumed events, we need
  to keep track of what events that are consumed ourselves (more on that later).

  The default Splunk client uses the Splunk Web API for receiving events and is
  implemented using the [Tesla](https://hexdocs.pm/tesla/readme.html) library. Tesla
  is a HTTP client abstraction library which let's us easily select from a range of
  HTTP adapters. Please see the Tesla [documentation](https://hexdocs.pm/tesla/readme.html#adapters)
  for more information.
  """

  alias Broadway.{Message, Acknowledger}
  require Logger

  @behaviour Acknowledger
  @behaviour OffBroadway.Splunk.Client

  @impl true
  def init(opts) do
    {:ok, opts |> prepare_cfg(Application.get_env(:off_broadway_splunk, :splunk_client) || [])}
  end

  @spec prepare_cfg(opts :: Keyword.t(), env :: Keyword.t()) :: Keyword.t()
  defp prepare_cfg(opts, env), do: Keyword.merge(env, Keyword.get(opts, :config))

  @doc """
  Returns a `Tesla.Client` configured with middleware.

    * `Tesla.Middleware.BaseUrl` middleware configured with `base_url` passed via `opts`.
    * `Tesla.Middleware.BearerAuth` middleware configured with `api_token` passed via `opts`.
    * `Tesla.Middleware.Query` middleware configured with `query` passed via `opts`.
    * `Tesla.Middleware.JSON` middleware configured with `Jason` engine.

  """
  @spec client(opts :: Keyword.t()) :: Tesla.Client.t()
  def client(opts) do
    middleware = [
      {Tesla.Middleware.BaseUrl, client_option(opts, :base_url)},
      {Tesla.Middleware.BearerAuth, token: client_option(opts, :api_token)},
      {Tesla.Middleware.Query, client_option(opts, :query)},
      {Tesla.Middleware.JSON, engine: Jason}
    ]

    Tesla.client(middleware)
  end

  @impl true
  def receive_status(sid, opts) do
    {:ok, version} = Keyword.fetch(opts, :api_version)

    client(opts)
    |> Tesla.get("/services/search/#{version}/jobs/#{sid}", query: [output_mode: "json"])
  end

  @impl true
  def receive_messages(sid, _demand, opts) do
    {:ok, endpoint} = Keyword.fetch(opts, :endpoint)
    {:ok, version} = Keyword.fetch(opts, :api_version)

    client(opts)
    |> Tesla.get("/services/search/#{version}/jobs/#{sid}/#{Atom.to_string(endpoint)}")
    |> wrap_received_messages(sid)
  end

  @impl Acknowledger
  def ack(ack_ref, successful, failed) do
    ack_options = :persistent_term.get(ack_ref)

    messages =
      Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++
        Enum.filter(failed, &ack?(&1, ack_options, :on_failure))

    Enum.each(messages, &ack_message(&1, ack_options))
  end

  defp ack?(message, ack_options, option) do
    {_, _, msg_ack_options} = message.acknowledger
    (msg_ack_options[option] || Map.fetch!(ack_options, option)) == :ack
  end

  @impl true
  def ack_message(message, %{sid: sid}) do
    :telemetry.execute(
      [:off_broadway_splunk, :receive_messages, :ack],
      %{time: System.system_time(), count: 1},
      %{sid: sid, receipt: extract_message_receipt(message)}
    )
  end

  @impl Acknowledger
  def configure(_ack_ref, ack_data, options), do: {:ok, Map.merge(ack_data, Map.new(options))}

  defp wrap_received_messages(
         {:ok, %Tesla.Env{status: 200, body: %{"results" => messages}}},
         ack_ref
       ) do
    Enum.map(messages, fn message ->
      metadata =
        Map.filter(message, fn {key, _val} -> String.starts_with?(key, "_") and key != "_raw" end)

      acknowledger = build_acknowledger(message, ack_ref)
      %Message{data: message, metadata: metadata, acknowledger: acknowledger}
    end)
  end

  defp wrap_received_messages(
         {:ok, %Tesla.Env{status: status_code, body: %{"messages" => reason}} = _env},
         ack_ref
       ) do
    Logger.error(
      "Unable to fetch events from Splunk SID #{ack_ref}. " <>
        "Request failed with status code: #{status_code} and reason: #{inspect(reason)}."
    )

    []
  end

  defp build_acknowledger(message, ack_ref) do
    receipt = %{id: build_splunk_message_id(message)}
    {__MODULE__, ack_ref, %{receipt: receipt}}
  end

  defp extract_message_receipt(%{acknowledger: {_, _, %{receipt: receipt}}}), do: receipt

  defp build_splunk_message_id(%{"_si" => si, "_cd" => cd}) when is_list(si),
    do: "#{Enum.join(si, ";")};#{cd}"

  defp build_splunk_message_id(_), do: nil

  @spec client_option(Keyword.t(), Atom.t()) :: any
  defp client_option(opts, :base_url), do: Keyword.get(opts, :base_url, "")
  defp client_option(opts, :api_token), do: Keyword.get(opts, :api_token, "")
  defp client_option(opts, :query), do: Keyword.get(opts, :query, [])
end