lib/tai/venue_adapters/ftx/stream/process_order_book.ex

defmodule Tai.VenueAdapters.Ftx.Stream.ProcessOrderBook do
  use GenServer
  alias Tai.Markets.OrderBook

  defmodule State do
    @type venue_id :: Tai.Venue.id()
    @type product_symbol :: Tai.Venues.Product.symbol()
    @type t :: %State{
            venue: venue_id,
            symbol: product_symbol,
            table: %{optional(String.t()) => number}
          }

    @enforce_keys ~w[venue symbol table]a
    defstruct ~w[venue symbol table]a
  end

  @type venue_id :: Tai.Venue.id()
  @type product :: Tai.Venues.Product.t()
  @type venue_symbol :: Tai.Venues.Product.venue_symbol()
  @type state :: State.t()

  @spec start_link(product) :: GenServer.on_start()
  def start_link(product) do
    state = %State{venue: product.venue_id, symbol: product.symbol, table: %{}}
    name = to_name(product.venue_id, product.venue_symbol)

    GenServer.start_link(__MODULE__, state, name: name)
  end

  @spec to_name(venue_id, venue_symbol) :: atom
  def to_name(venue, symbol), do: :"#{__MODULE__}_#{venue}_#{symbol}"

  @impl true
  def init(state), do: {:ok, state}

  @impl true
  def handle_cast({:snapshot, data, received_at}, state) do
    {data, received_at, state}
    |> build_change_set()
    |> OrderBook.replace()

    {:noreply, state}
  end

  @impl true
  def handle_cast({:update, data, received_at}, state) do
    {data, received_at, state}
    |> build_change_set()
    |> OrderBook.apply()

    {:noreply, state}
  end

  @timestamp "time"
  @ns_in_s 1000 * 1000 * 1000
  defp build_change_set({data, received_at, state}) do
    ns = Map.fetch!(data, @timestamp) * @ns_in_s
    {:ok, venue_timestamp} = ns |> trunc() |> DateTime.from_unix(:nanosecond)
    normalized_bids = data |> normalize_changes(:bid)
    normalized_asks = data |> normalize_changes(:ask)

    %OrderBook.ChangeSet{
      venue: state.venue,
      symbol: state.symbol,
      last_venue_timestamp: venue_timestamp,
      last_received_at: received_at,
      changes: Enum.concat(normalized_bids, normalized_asks)
    }
  end

  @bids "bids"
  @asks "asks"
  defp normalize_changes(data, :bid), do: data |> Map.get(@bids, []) |> normalize_side(:bid)
  defp normalize_changes(data, :ask), do: data |> Map.get(@asks, []) |> normalize_side(:ask)

  defp normalize_side(data, side) do
    data
    |> Enum.map(&List.to_tuple/1)
    |> Enum.map(fn
      {price, 0.0} -> {:delete, side, price}
      {price, size} -> {:upsert, side, price, size}
    end)
  end
end