lib/off_broadway/splunk/producer.ex

defmodule OffBroadway.Splunk.Producer do
  @moduledoc """
  GenStage Producer for a Splunk Event Stream.
  Broadway producer acts as a consumer for the specified Splunk SID.

  ## Producer Options

  #{NimbleOptions.docs(OffBroadway.Splunk.Options.definition())}

  ## Acknowledgements

  You can use the `on_success` and `on_failure` options to control how messages are
  acknowledged. You can set these options when starting the Splunk producer or change
  them for each message through `Broadway.Message.configure_ack/2`. By default, successful
  messages are acked (`:ack`) and failed messages are not (`:noop`).

  The possible values for `:on_success` and `:on_failure` are:

    * `:ack` - acknowledge the message. Splunk does not have any concept of acking messages,
      because we are just consuming messages from a web api endpoint.
      For now we are just executing a `:telemetry` event for acked messages.

    * `:noop` - do not acknowledge the message. No action are taken.

  ## Telemetry

  This library exposes the following telemetry events:

    * `[:off_broadway_splunk, :job_status, :start]` - Dispatched before polling SID status
      from Splunk.

      * measurement: `%{time: System.monotonic_time}`
      * metadata: `%{sid: string, progress: integer}`

    * `[:off_broadway_splunk, :job_status, :stop]` - Dispatched when polling SID status from Splunk
      is complete.

      * measurement: `%{time: native_time}`
      * metadata: %{sid: string, progress: integer}

    * `[:off_broadway_splunk, :job_status, :exception]` - Dispatched after a failure while polling
      SID status from Splunk.

      * measurement: `%{duration: native_time}`
      * metadata:

        ```
        %{
          sid: string,
          kind: kind,
          reason: reason,
          stacktrace: stacktrace
        }
        ```

    * `[:off_broadway_splunk, :receive_messages, :start]` - Dispatched before receiving
      messages from Splunk (`c:receive_messages/2`)

      * measurement: `%{time: System.monotonic_time}`
      * metadata: `%{sid: string, demand: integer}`

    * `[:off_broadway_splunk, :receive_messages, :stop]` - Dispatched after messages have been
      received from Splunk and "wrapped".

      * measurement: `%{time: native_time}`
      * metadata:

        ```
        %{
          sid: string,
          received: integer,
          demand: integer
        }
        ```

    * `[:off_broadway_splunk, :receive_messages, :exception]` - Dispatched after a failure while
      receiving messages from Splunk.

      * measurement: `%{duration: native_time}`
      * metadata:

        ```
        %{
          sid: string,
          demand: integer,
          kind: kind,
          reason: reason,
          stacktrace: stacktrace
        }
        ```

    * `[:off_broadway_splunk, :receive_messages, :ack]` - Dispatched when acking a message.

      * measurement: `%{time: System.system_time, count: 1}`
      * meatadata:

        ```
        %{
          sid: string,
          receipt: receipt
        }
        ```

  """

  use GenStage
  alias Broadway.Producer
  alias NimbleOptions.ValidationError
  alias OffBroadway.Splunk.Leader

  @behaviour Producer

  @impl true
  def init(opts) do
    client = opts[:splunk_client]
    {:ok, client_opts} = client.init(opts)

    {:producer,
     %{
       demand: 0,
       total_events: 0,
       processed_events: 0,
       processed_requests: 0,
       receive_timer: nil,
       receive_interval: opts[:receive_interval],
       ready: false,
       sid: opts[:sid],
       splunk_client: {client, client_opts},
       broadway: opts[:broadway][:name],
       shutdown_timeout: opts[:shutdown_timeout]
     }}
  end

  @impl true
  def prepare_for_start(_module, broadway_opts) do
    {producer_module, client_opts} = broadway_opts[:producer][:module]

    case NimbleOptions.validate(client_opts, OffBroadway.Splunk.Options.definition()) do
      {:error, error} ->
        raise ArgumentError, format_error(error)

      {:ok, opts} ->
        :persistent_term.put(opts[:sid], %{
          sid: opts[:sid],
          config: opts[:config],
          on_success: opts[:on_success],
          on_failure: opts[:on_failure]
        })

        with_default_opts = put_in(broadway_opts, [:producer, :module], {producer_module, opts})

        children = [
          {Leader, Keyword.merge(opts, broadway: with_default_opts[:name])}
        ]

        {children, with_default_opts}
    end
  end

  defp format_error(%ValidationError{keys_path: [], message: message}) do
    "invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2, " <>
      message
  end

  defp format_error(%ValidationError{keys_path: keys_path, message: message}) do
    "invalid configuration given to OffBroadway.Splunk.Producer.prepare_for_start/2 for key #{inspect(keys_path)}, " <>
      message
  end

  @impl true
  def handle_demand(incoming_demand, %{demand: demand} = state) do
    handle_receive_messages(%{state | demand: demand + incoming_demand})
  end

  @impl true
  def handle_info(:receive_messages, %{receive_timer: nil} = state), do: {:noreply, [], state}

  def handle_info(:receive_messages, state),
    do: handle_receive_messages(%{state | receive_timer: nil})

  def handle_info(
        :shutdown_broadway,
        %{receive_timer: receive_timer, shutdown_timeout: timeout, broadway: broadway} = state
      ) do
    receive_timer && Process.cancel_timer(receive_timer)
    Broadway.stop(broadway, :normal, timeout)
    {:noreply, [], %{state | receive_timer: nil}}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  @impl true
  # Callback function used by `OffBroadway.Splunk.Leader` to notify that
  # Splunk API is ready to deliver messages.
  def handle_cast({:receive_messages_ready, total_events: event_count}, state),
    do: handle_receive_messages(%{state | total_events: event_count, ready: true})

  @impl Producer
  def prepare_for_draining(%{receive_timer: receive_timer} = state) do
    receive_timer && Process.cancel_timer(receive_timer)
    {:noreply, [], %{state | receive_timer: nil}}
  end

  defp handle_receive_messages(
         %{
           receive_timer: nil,
           ready: true,
           demand: demand,
           splunk_client: {_, client_opts},
           total_events: total_events
         } = state
       )
       when demand > 0 do
    {messages, new_state} = receive_messages_from_splunk(state, demand)
    new_demand = demand - length(messages)
    max_events = client_opts[:max_events]

    receive_timer =
      case {messages, new_state} do
        {[], %{recive_interval: interval}} -> schedule_receive_messages(interval)
        {_, %{processed_events: ^max_events}} -> schedule_shutdown()
        {_, %{processed_events: ^total_events}} -> schedule_shutdown()
        _ -> schedule_receive_messages(0)
      end

    {:noreply, messages, %{new_state | demand: new_demand, receive_timer: receive_timer}}
  end

  defp handle_receive_messages(state), do: {:noreply, [], state}

  defp receive_messages_from_splunk(
         %{sid: sid, splunk_client: {client, client_opts}} = state,
         demand
       ) do
    metadata = %{sid: sid, demand: demand}
    count = calculate_count(client_opts, demand, state.processed_events)

    client_opts =
      Keyword.put(client_opts, :query,
        output_mode: "json",
        count: count,
        offset: calculate_offset(state)
      )

    case count do
      0 ->
        {[], state}

      _ ->
        messages =
          :telemetry.span(
            [:off_broadway_splunk, :receive_messages],
            metadata,
            fn ->
              messages = client.receive_messages(sid, demand, client_opts)
              {messages, Map.put(metadata, :received, length(messages))}
            end
          )

        {messages,
         %{
           state
           | processed_requests: state.processed_requests + 1,
             processed_events: state.processed_events + length(messages)
         }}
    end
  end

  defp calculate_count(client_opts, demand, processed_events) do
    case client_opts[:max_events] do
      nil ->
        demand

      max_events ->
        capacity = max_events - processed_events
        min(demand - (demand - capacity), demand)
    end
  end

  defp calculate_offset(%{splunk_client: {_, client_opts}, processed_requests: 0}),
    do: client_opts[:offset]

  defp calculate_offset(%{splunk_client: {_, client_opts}, processed_events: processed_events}) do
    case {client_opts[:offset], processed_events} do
      {offset, processed_events} when offset < 0 -> -abs(abs(offset) + processed_events)
      {offset, processed_events} when offset >= 0 -> offset + processed_events
    end
  end

  defp schedule_receive_messages(interval),
    do: Process.send_after(self(), :receive_messages, interval)

  defp schedule_shutdown,
    do: Process.send_after(self(), :shutdown_broadway, 0)
end