lib/bella/watcher/worker.ex

defmodule Bella.Watcher.Worker do
  @moduledoc """
  Continuously watch a list `Operation` for `add`, `modify`, and `delete` events.
  """

  use GenServer

  alias Bella.Sys.Event
  alias Bella.Watcher.Core
  alias Bella.Watcher.State

  @state_opts [
    :client,
    :connection,
    :connection_func,
    :extra,
    :watcher,
    :resource_version,
    :should_retry_watch,
    :watch_timeout,
    :initial_delay,
    :max_delay
  ]

  def start_link, do: start_link([])

  def start_link(opts) do
    {state_opts, opts} = Keyword.split(opts, @state_opts)

    GenServer.start_link(__MODULE__, state_opts, opts)
  end

  def state(pid) do
    GenServer.call(pid, :state)
  end

  @impl GenServer
  def init(state_opts) do
    state = State.new(state_opts)
    Event.watcher_initialized(%{}, State.metadata(state))
    Process.send_after(self(), :watch, state.initial_delay)
    {:ok, %State{state | current_delay: state.initial_delay}}
  end

  @impl GenServer
  def handle_call(:state, _from, %State{} = state) do
    {:reply, state, state}
  end

  defp do_watch(state) do
    case {State.should_retry(state), Core.watch(self(), state)} do
      {_, {:ok, ref}} ->
        Event.watcher_watch_started(%{}, State.metadata(state))
        %State{state | k8s_watcher_ref: ref}

      {true, _} ->
        delay = State.next_delay(state)
        Event.watcher_watch_failed(%{}, State.metadata(state))
        Process.send_after(self(), :watch, delay)
        %State{state | k8s_watcher_ref: nil, current_delay: delay}
    end
  end

  @impl GenServer
  def handle_info(:watch, %State{resource_version: curr_rv} = state) do
    rv = curr_rv || Core.get_resource_version(state)
    state = %{state | resource_version: rv}

    if is_first_watch(curr_rv, rv) do
      _res = Core.get_before(state)
    end

    {:noreply, do_watch(state)}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncHeaders{}, state), do: {:noreply, state}

  @impl GenServer
  def handle_info(%HTTPoison.AsyncStatus{code: 200}, state) do
    Event.watcher_watch_succeeded(%{}, State.metadata(state))
    {:noreply, state}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncStatus{code: code}, state) do
    metadata = state |> State.metadata() |> Map.put(:code, code)
    Event.watcher_watch_failed(%{}, metadata)
    {:stop, :normal, state}
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, %State{} = state) do
    metadata = State.metadata(state)
    Event.watcher_chunk_received(%{}, metadata)

    {lines, buffer} =
      state.buffer
      |> Bella.Watcher.ResponseBuffer.add_chunk(chunk)
      |> Bella.Watcher.ResponseBuffer.get_lines()

    case Core.process_lines(lines, state) do
      {:ok, new_rv} ->
        Event.watcher_chunk_finished(%{}, metadata)
        {:noreply, %State{state | buffer: buffer, resource_version: new_rv}}

      {:error, :gone} ->
        Event.watcher_chunk_finished(%{}, metadata)
        {:stop, :normal, state}

      _ ->
        {:noreply, state}
    end
  end

  @impl GenServer
  def handle_info(%HTTPoison.AsyncEnd{}, %State{} = state) do
    Event.watcher_watch_finished(%{}, State.metadata(state))
    send(self(), :watch)
    {:noreply, state}
  end

  @impl GenServer
  def handle_info(%HTTPoison.Error{reason: {:closed, :timeout}}, %State{} = state) do
    Event.watcher_watch_timedout(%{}, State.metadata(state))
    send(self(), :watch)
    {:noreply, state}
  end

  @impl GenServer
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %State{k8s_watcher_ref: k8s_ref} = state) do
    case ref == k8s_ref do
      true ->
        # If the watcher is down then restart it.
        Event.watcher_watch_down(%{}, State.metadata(state))
        state = %State{state | k8s_watcher_ref: nil}

        send(self(), :watch)
        {:noreply, state}

      _ ->
        # Otherwise we assume that it was an async task dispatched to the
        {:noreply, state}
    end
  end

  @impl GenServer
  def handle_info(_other, %State{} = state) do
    {:noreply, state}
  end

  defp is_first_watch(previous_rv, new_rv), do: previous_rv == nil && new_rv != nil
end