lib/ex_twelve_data/real_time_prices.ex

defmodule ExTwelveData.RealTimePrices do
  @moduledoc """
  WebSocket client to get real-time prices from Twelve Data.
  """

  use WebSockex

  require Logger

  alias ExTwelveData.RealTimePrices.Handler

  @type options :: [option]

  @type option ::
          WebSockex.option()
          | {:api_key, binary}
          | {:handler, Handler}

  @endpoint "wss://ws.twelvedata.com/v1/quotes/price"
  @heartbeat_seconds 10
  @heartbeat_message Jason.encode!(%{action: "heartbeat"})

  @spec start_link(options) :: {:error, any} | {:ok, pid}
  def start_link(opts) do
    Logger.info("~> Connecting to Twelve Data")

    handler = Keyword.fetch!(opts, :handler)

    WebSockex.start_link(
      @endpoint,
      __MODULE__,
      %{handler: handler},
      websockex_opts(opts)
    )
  end

  @spec websockex_opts(options) :: options
  defp websockex_opts(opts) do
    api_key = Keyword.fetch!(opts, :api_key)

    # TODO CAStore should probably be optional, and users should be able to pass in their own CA certificates file.
    ssl_options = [
      verify: :verify_peer,
      depth: 99,
      cacertfile: CAStore.file_path(),
      customize_hostname_check: [
        match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
      ]
    ]

    extra_headers = [
      {"X-TD-APIKEY", api_key}
    ]

    Keyword.merge([ssl_options: ssl_options, extra_headers: extra_headers, insecure: false], opts)
  end

  @doc """
  Specify a list of symbols you're interested to.

  Subsequent calls will append new symbols to the list.
  See `unsubscribe/2` and `reset/1` to remove
  """
  @spec subscribe(pid, [String.t()]) :: {:error, any} | {:ok}
  def subscribe(client, symbols) do
    msg =
      Jason.encode!(%{
        action: "subscribe",
        params: %{
          symbols: Enum.join(symbols, ",")
        }
      })

    Logger.debug("~> Subscribing to symbols: #{msg}")
    WebSockex.send_frame(client, {:text, msg})
  end

  @doc """
  Send a list of symbols that you're no longer interested to.

  Twelve Data will stop sending updates.
  """
  @spec unsubscribe(pid, [String.t()]) :: {:error, any} | {:ok}
  def unsubscribe(client, symbols) do
    msg =
      Jason.encode!(%{
        action: "unsubscribe",
        params: %{
          symbols: Enum.join(symbols, ",")
        }
      })

    Logger.debug("~> Unsubscribing from symbols: #{msg}")
    WebSockex.send_frame(client, {:text, msg})
  end

  @doc """
  Reset the subscription to all price updates.
  """
  @spec reset(pid) :: {:error, any} | {:ok}
  def reset(client) do
    msg = Jason.encode!(%{action: "reset"})

    Logger.debug("~> Resetting...")
    WebSockex.send_frame(client, {:text, msg})
  end

  def handle_connect(conn, state) do
    Logger.info("<~ Connected to Twelve Data")
    schedule_next_heartbeat()
    super(conn, state)
  end

  def handle_disconnect(_connection_status_map, state) do
    Logger.warning("Disconnected from Twelve Data! Reconnecting...")
    {:reconnect, state}
  end

  def handle_frame({:text, msg}, state) do
    Logger.debug("<~ Received message: #{msg}")

    case Jason.decode(msg, keys: :atoms) do
      {:ok, obj} ->
        Logger.debug("Processing message: #{inspect(obj)}")
        {process_message(obj, state), state}

      {:error, _} ->
        Logger.warning("Failed to parse received message as JSON: #{msg}")
        {:ok, state}
    end
  end

  def handle_info(:heartbeat, state) do
    # TODO At this stage, we should also schedule a message to close the connection, keep a reference to it,
    #      and cancel it when we receive the heartbeat reply. This prevents situations where the WebSocket connection
    #      is open, we can send heartbeats, but the server is unresponsive.
    Logger.debug("~> Sending heartbeat")
    schedule_next_heartbeat()
    {:reply, {:text, @heartbeat_message}, state}
  end

  defp process_message(
         %{
           event: "heartbeat",
           status: status
         },
         _state
       ) do
    case status do
      "ok" ->
        :ok

      _ ->
        Logger.error("Received heartbeat with status: #{status}")
        :close
    end
  end

  defp process_message(
         %{
           event: "subscribe-status",
           status: status
         },
         _state
       ) do
    case status do
      "ok" ->
        :ok

      _ ->
        Logger.error("Subscribe failed with status: #{status}")
        :close
    end
  end

  defp process_message(
         %{
           event: "unsubscribe-status",
           status: status
         },
         _state
       ) do
    case status do
      "ok" ->
        :ok

      _ ->
        Logger.error("Unsubscribe failed with status: #{status}")
        :close
    end
  end

  defp process_message(
         %{
           event: "reset-status",
           status: status
         },
         _state
       ) do
    case status do
      "ok" ->
        :ok

      _ ->
        Logger.error("Reset failed with status: #{status}")
        :close
    end
  end

  defp process_message(%{event: "price"} = obj, %{handler: handler}) do
    Logger.debug("Price update received: #{inspect(obj)}")
    handler.handle_price_update(obj)
    :ok
  end

  defp process_message(obj, _state) do
    Logger.warning("Received unknown event: #{inspect(obj)}")
    :ok
  end

  defp schedule_next_heartbeat do
    Logger.debug("Scheduling next heartbeat in #{@heartbeat_seconds}s...")
    Process.send_after(self(), :heartbeat, @heartbeat_seconds * 1000)
  end
end