lib/off_broadway/splunk/leader.ex

defmodule OffBroadway.Splunk.Leader do
  @moduledoc """
  The `OffBroadway.Splunk.Leader` module is responsible to poll Splunk
  for status on a SID and notify the `OffBroadway.Splunk.Producer` when
  Splunk is ready to deliver messages for given SID.
  """

  defmodule State do
    defstruct [
      :done_progress,
      :event_count,
      :is_done,
      :is_zombie,
      :broadway,
      :published,
      :sid,
      :splunk_client
    ]

    @type t :: %__MODULE__{
            done_progress: nil | Integer.t() | Float.t(),
            event_count: Integer.t(),
            is_done: boolean(),
            is_zombie: boolean(),
            broadway: Atom.t(),
            published: String.t(),
            sid: String.t(),
            splunk_client: Tuple.t()
          }
    use ExConstructor
  end

  use GenServer
  require Logger

  alias Decimal, as: D

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

  @impl true
  def init(opts) do
    client = opts[:splunk_client]
    {:ok, client_opts} = client.init(opts)

    state = %{
      progress: 0,
      is_done: false,
      splunk_client: {client, client_opts}
    }

    Process.send_after(self(), :receive_job_status, 0)
    {:ok, State.new(opts) |> Map.merge(state)}
  end

  @impl true
  def handle_info(:receive_job_status, %State{is_zombie: true} = state) do
    Logger.error("Job is in zombie state - Shutting down")
    {:stop, :normal, state}
  end

  def handle_info(:receive_job_status, %{sid: sid, is_done: false} = state) do
    case receive_job_status(state) do
      {:ok, %{status: 200} = response} ->
        state = update_state_from_response(state, response)
        receive_interval = calculate_receive_interval(state)

        unless state.is_done do
          Logger.info(
            "SID #{sid} is #{Float.ceil(state.done_progress * 100, 2)}% complete, " <>
              "rescheduling update in #{receive_interval} seconds"
          )
        end

        Process.send_after(self(), :receive_job_status, receive_interval * 1000)
        {:noreply, state}

      {:ok, %{status: 404}} ->
        Logger.error("SID #{sid} does not exist - Shutting down")
        {:stop, :normal, state}

      reason ->
        Logger.error("SID #{sid} failed with reason #{inspect(reason)} - Shutting down")
        {:stop, :normal, state}
    end
  end

  def handle_info(
        :receive_job_status,
        %State{sid: sid, is_done: true, broadway: broadway} = state
      ) do
    Logger.info("Splunk is done processing SID #{sid} - Ready to consume events")

    Broadway.producer_names(broadway)
    |> Enum.random()
    |> GenStage.cast({:receive_messages_ready, total_events: state.event_count})

    {:noreply, state}
  end

  @spec receive_job_status(state :: State.t()) :: Tesla.Env.t()
  defp receive_job_status(
         %{sid: sid, done_progress: progress, splunk_client: {client, client_opts}} = state
       ) do
    metadata = %{sid: sid, progress: progress}

    :telemetry.span(
      [:off_broadway_splunk, :job_status],
      metadata,
      fn ->
        {:ok, %{status: 200} = response} = env = client.receive_status(sid, client_opts)
        state = update_state_from_response(state, response)
        {env, %{metadata | progress: state.done_progress}}
      end
    )
  end

  @spec update_state_from_response(state :: State.t(), response :: Tesla.Env.t()) :: State.t()
  defp update_state_from_response(state, %{
         status: 200,
         body: %{"entry" => [%{"content" => content} = entry | _rest]}
       }) do
    merge_non_nil_state_fields(State.new(entry), state)
    |> merge_non_nil_state_fields(State.new(content))
  end

  @spec merge_non_nil_state_fields(state_a :: State.t(), state_b :: State.t()) :: State.t()
  defp merge_non_nil_state_fields(state_a, state_b) do
    Map.merge(state_a, state_b, fn
      _key, old_value, new_value when is_nil(new_value) -> old_value
      _key, _old_value, new_value -> new_value
    end)
  end

  # Calculates the next receive interval for fetching the job meta data.
  #
  # Algorithm is as follows:
  #  T = time difference since job was started
  #  P = job progress (normalized value)
  #
  #  seconds until next check = (T * (1 / P)) - T
  #
  @spec calculate_receive_interval(state :: State.t()) :: Integer.t()
  defp calculate_receive_interval(%State{done_progress: 1}), do: 0

  defp calculate_receive_interval(%State{published: published, done_progress: progress}) do
    with {:ok, published_dt, _offset} = DateTime.from_iso8601(published),
         diff <- DateTime.diff(DateTime.utc_now(), published_dt, :second) do
      D.mult(diff, D.div(1, D.from_float(progress)))
      |> D.sub(diff)
      |> D.to_float()
      |> ceil()
    end
  end
end