Skip to main content

lib/ccxt/raw_payload.ex

defmodule Ccxt.RawPayload do
  @moduledoc """
  Raw payload retention helpers for CCXT Pro data.

  The module creates database-facing attrs and insert plans for raw websocket
  events and websocket API responses. It does not write to a database and does
  not depend on Ecto.
  """

  import Bitwise

  alias Ccxt.StructurePersistence
  alias Ccxt.StructureSchema

  @type kind :: :ws_event | :ws_api_response
  @type opts :: keyword() | map()
  @type raw_attrs :: map()
  @type raw_plan :: %{
          kind: kind(),
          table: String.t(),
          operation: :insert,
          identity_fields: [atom()],
          fields: [atom()],
          conflict_target: [],
          update_fields: []
        }

  @raw_tables %{
    ws_event: "ccxt_raw_ws_events",
    ws_api_response: "ccxt_raw_ws_api_responses"
  }

  @fields [
    :id,
    :exchange_id,
    :env,
    :market_type,
    :account_type,
    :transport,
    :channel,
    :stream,
    :request_id,
    :message_hash,
    :url,
    :payload,
    :received_at
  ]

  @spec normalize_ws_event(term(), opts()) :: {:ok, raw_attrs()}
  def normalize_ws_event(payload, opts \\ []), do: normalize(:ws_event, payload, opts)

  @spec normalize_ws_api_response(term(), opts()) :: {:ok, raw_attrs()}
  def normalize_ws_api_response(payload, opts \\ []),
    do: normalize(:ws_api_response, payload, opts)

  @spec normalize(kind(), term(), opts()) :: {:ok, raw_attrs()}
  def normalize(kind, payload, opts \\ []) when kind in [:ws_event, :ws_api_response] do
    opts = normalize_opts(opts)

    attrs = %{
      id: opt_get(opts, :id, uuid_v4()),
      exchange_id: opt_get(opts, :exchange_id, "binance"),
      env: opt_get(opts, :env, opt_get(opts, :binance_env, "prod")),
      market_type: opt_get(opts, :market_type),
      account_type: opt_get(opts, :account_type),
      transport: opt_get(opts, :transport, default_transport(kind)),
      channel: opt_get(opts, :channel),
      stream: opt_get(opts, :stream, payload_stream(payload)),
      request_id: opt_get(opts, :request_id, payload_request_id(payload)),
      message_hash: opt_get(opts, :message_hash, message_hash(payload)),
      url: opt_get(opts, :url),
      payload: jsonb_value(payload),
      received_at: opt_get(opts, :received_at, DateTime.utc_now(:microsecond))
    }

    {:ok, attrs}
  end

  @spec normalize_structure_with_raw(
          StructureSchema.structure(),
          term(),
          term(),
          opts()
        ) ::
          {:ok,
           %{
             raw: raw_attrs(),
             raw_plan: raw_plan(),
             attrs: map() | list(map()),
             plans: [StructurePersistence.plan()]
           }}
          | {:error, term()}
  def normalize_structure_with_raw(structure, parsed_value, raw_payload, opts \\ []) do
    opts = normalize_opts(opts)
    raw_kind = opt_get(opts, :raw_kind, :ws_event)

    with {:ok, raw} <- normalize(raw_kind, raw_payload, opts),
         {:ok, raw_plan} <- plan(raw_kind),
         structure_opts = Map.put(opts, :raw_ref_id, raw.id),
         {:ok, %{attrs: attrs, plans: plans}} <-
           StructurePersistence.normalize_and_plan(structure, parsed_value, structure_opts) do
      {:ok, %{raw: raw, raw_plan: raw_plan, attrs: attrs, plans: plans}}
    end
  end

  @spec plan(kind()) :: {:ok, raw_plan()} | {:error, term()}
  def plan(kind) when kind in [:ws_event, :ws_api_response] do
    {:ok,
     %{
       kind: kind,
       table: Map.fetch!(@raw_tables, kind),
       operation: :insert,
       identity_fields: [:id],
       fields: @fields,
       conflict_target: [],
       update_fields: []
     }}
  end

  def plan(kind), do: {:error, {:unknown_raw_payload_kind, kind}}

  @spec plan!(kind()) :: raw_plan()
  def plan!(kind) do
    case plan(kind) do
      {:ok, plan} ->
        plan

      {:error, reason} ->
        raise ArgumentError, "could not build raw payload plan: #{inspect(reason)}"
    end
  end

  @spec ecto_options(raw_plan()) :: keyword()
  def ecto_options(%{operation: :insert}), do: []

  @spec message_hash(term()) :: String.t()
  def message_hash(payload) do
    :sha256
    |> :crypto.hash(:erlang.term_to_binary(canonical(payload)))
    |> Base.encode16(case: :lower)
  end

  defp default_transport(:ws_event), do: "websocket"
  defp default_transport(:ws_api_response), do: "websocket_api"

  defp payload_stream(%{"stream" => stream}) when is_binary(stream), do: stream
  defp payload_stream(%{stream: stream}) when is_binary(stream), do: stream
  defp payload_stream(_payload), do: nil

  defp payload_request_id(%{"id" => id}) when not is_nil(id), do: to_string(id)
  defp payload_request_id(%{id: id}) when not is_nil(id), do: to_string(id)
  defp payload_request_id(_payload), do: nil

  defp jsonb_value(value) when is_map(value), do: value
  defp jsonb_value(value) when is_list(value), do: %{"data" => value}
  defp jsonb_value(value), do: %{"value" => value}

  defp canonical(value) when is_map(value) do
    value
    |> Enum.map(fn {key, item} -> {to_string(key), canonical(item)} end)
    |> Enum.sort_by(&elem(&1, 0))
  end

  defp canonical(value) when is_list(value), do: Enum.map(value, &canonical/1)
  defp canonical(value), do: value

  defp uuid_v4 do
    <<a::32, b::16, c::16, d::16, e::48>> = :crypto.strong_rand_bytes(16)
    c = (c &&& 0x0FFF) ||| 0x4000
    d = (d &&& 0x3FFF) ||| 0x8000

    [
      Base.encode16(<<a::32>>, case: :lower),
      Base.encode16(<<b::16>>, case: :lower),
      Base.encode16(<<c::16>>, case: :lower),
      Base.encode16(<<d::16>>, case: :lower),
      Base.encode16(<<e::48>>, case: :lower)
    ]
    |> Enum.join("-")
  end

  defp normalize_opts(opts) when is_list(opts), do: Map.new(opts)
  defp normalize_opts(opts) when is_map(opts), do: opts

  defp opt_get(opts, key, default \\ nil),
    do: Map.get(opts, key, Map.get(opts, to_string(key), default))
end