lib/ex_twelve_data/real_time_prices/subscriptions_manager.ex

defmodule ExTwelveData.RealTimePrices.SubscriptionsManager do
  @moduledoc """
  High-level client to manage subscriptions to the real-time prices endpoint.

  It avoids hitting the rate limits, and batches subscribe and unsubscribe requests.
  SubscriptionsManager expects a list of symbols, and creates a subscription for **at least** those symbols.
  It is implemented as a GenServer, which schedules a message every 600ms (100 events/minute), to subscribe/unsubscribe.

  > ## What do the 100 events per minute limit stand for?
  > This does not affect how many price updates might be received from the server, but instead,
  > it limits how many events (subscribe/unsubscribe/reset) could be sent to the server from the client side.
  >
  > -- from https://support.twelvedata.com/en/articles/5194610-websocket-faq
  """

  use GenServer

  alias ExTwelveData.RealTimePrices
  alias ExTwelveData.RealTimePrices.SubscriptionsManager
  alias ExTwelveData.RealTimePrices.SubscriptionsManager.QuotaTracker

  @type options :: [option]

  @type option ::
          GenServer.option()
          | {:pid, pid()}
          | {:provider, SubscriptionsManager.Provider}
          | {:max_subscriptions, integer}

  # 1 event / 600ms -> 100 events per minute
  @clock_period_ms 600

  @spec start_link(options) :: {:error, any} | {:ok, pid}
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def init(opts) do
    pid = Keyword.fetch!(opts, :pid)
    provider = Keyword.fetch!(opts, :provider)
    max_subscriptions = Keyword.fetch!(opts, :max_subscriptions)

    schedule_next_message()

    {:ok,
     %{tracked: MapSet.new(), pid: pid, provider: provider, max_subscriptions: max_subscriptions}}
  end

  def handle_call(_msg, _from, state) do
    {:reply, :ok, state}
  end

  def handle_cast(_msg, state) do
    {:noreply, state}
  end

  @doc """
  Runs every @clock_period_ms to honour the rate limit.

  It checks with the configured {SubscriptionsManager.Provider} behaviour what to add/remove,
  and sends the corresponding subscribe/unsubscribe message to Twelve Data.
  """
  def handle_info(:clock, state) do
    %{
      tracked: current,
      pid: pid,
      provider: provider,
      max_subscriptions: max_subscriptions
    } = state

    new = provider.get_symbols()

    new_state =
      case QuotaTracker.action(current, new, max_subscriptions) do
        :noop ->
          state

        {:add, to_add, new_tracked} ->
          RealTimePrices.subscribe(pid, MapSet.to_list(to_add))
          %{state | tracked: new_tracked}

        {:remove, to_remove, new_tracked} ->
          RealTimePrices.unsubscribe(pid, MapSet.to_list(to_remove))
          %{state | tracked: new_tracked}
      end

    schedule_next_message()
    {:noreply, new_state}
  end

  defp schedule_next_message do
    Process.send_after(self(), :clock, @clock_period_ms)
  end
end