defmodule Ccxt.RawPayload do
@moduledoc """
Raw payload retention helpers for CCXT Pro data.
The module creates database-facing attrs and insert plans for raw websocket
events and websocket API responses. It does not write to a database and does
not depend on Ecto.
"""
import Bitwise
alias Ccxt.StructurePersistence
alias Ccxt.StructureSchema
@type kind :: :ws_event | :ws_api_response
@type opts :: keyword() | map()
@type raw_attrs :: map()
@type raw_plan :: %{
kind: kind(),
table: String.t(),
operation: :insert,
identity_fields: [atom()],
fields: [atom()],
conflict_target: [],
update_fields: []
}
@raw_tables %{
ws_event: "ccxt_raw_ws_events",
ws_api_response: "ccxt_raw_ws_api_responses"
}
@fields [
:id,
:exchange_id,
:env,
:market_type,
:account_type,
:transport,
:channel,
:stream,
:request_id,
:message_hash,
:url,
:payload,
:received_at
]
@spec normalize_ws_event(term(), opts()) :: {:ok, raw_attrs()}
def normalize_ws_event(payload, opts \\ []), do: normalize(:ws_event, payload, opts)
@spec normalize_ws_api_response(term(), opts()) :: {:ok, raw_attrs()}
def normalize_ws_api_response(payload, opts \\ []),
do: normalize(:ws_api_response, payload, opts)
@spec normalize(kind(), term(), opts()) :: {:ok, raw_attrs()}
def normalize(kind, payload, opts \\ []) when kind in [:ws_event, :ws_api_response] do
opts = normalize_opts(opts)
attrs = %{
id: opt_get(opts, :id, uuid_v4()),
exchange_id: opt_get(opts, :exchange_id, "binance"),
env: opt_get(opts, :env, opt_get(opts, :binance_env, "prod")),
market_type: opt_get(opts, :market_type),
account_type: opt_get(opts, :account_type),
transport: opt_get(opts, :transport, default_transport(kind)),
channel: opt_get(opts, :channel),
stream: opt_get(opts, :stream, payload_stream(payload)),
request_id: opt_get(opts, :request_id, payload_request_id(payload)),
message_hash: opt_get(opts, :message_hash, message_hash(payload)),
url: opt_get(opts, :url),
payload: jsonb_value(payload),
received_at: opt_get(opts, :received_at, DateTime.utc_now(:microsecond))
}
{:ok, attrs}
end
@spec normalize_structure_with_raw(
StructureSchema.structure(),
term(),
term(),
opts()
) ::
{:ok,
%{
raw: raw_attrs(),
raw_plan: raw_plan(),
attrs: map() | list(map()),
plans: [StructurePersistence.plan()]
}}
| {:error, term()}
def normalize_structure_with_raw(structure, parsed_value, raw_payload, opts \\ []) do
opts = normalize_opts(opts)
raw_kind = opt_get(opts, :raw_kind, :ws_event)
with {:ok, raw} <- normalize(raw_kind, raw_payload, opts),
{:ok, raw_plan} <- plan(raw_kind),
structure_opts = Map.put(opts, :raw_ref_id, raw.id),
{:ok, %{attrs: attrs, plans: plans}} <-
StructurePersistence.normalize_and_plan(structure, parsed_value, structure_opts) do
{:ok, %{raw: raw, raw_plan: raw_plan, attrs: attrs, plans: plans}}
end
end
@spec plan(kind()) :: {:ok, raw_plan()} | {:error, term()}
def plan(kind) when kind in [:ws_event, :ws_api_response] do
{:ok,
%{
kind: kind,
table: Map.fetch!(@raw_tables, kind),
operation: :insert,
identity_fields: [:id],
fields: @fields,
conflict_target: [],
update_fields: []
}}
end
def plan(kind), do: {:error, {:unknown_raw_payload_kind, kind}}
@spec plan!(kind()) :: raw_plan()
def plan!(kind) do
case plan(kind) do
{:ok, plan} ->
plan
{:error, reason} ->
raise ArgumentError, "could not build raw payload plan: #{inspect(reason)}"
end
end
@spec ecto_options(raw_plan()) :: keyword()
def ecto_options(%{operation: :insert}), do: []
@spec message_hash(term()) :: String.t()
def message_hash(payload) do
:sha256
|> :crypto.hash(:erlang.term_to_binary(canonical(payload)))
|> Base.encode16(case: :lower)
end
defp default_transport(:ws_event), do: "websocket"
defp default_transport(:ws_api_response), do: "websocket_api"
defp payload_stream(%{"stream" => stream}) when is_binary(stream), do: stream
defp payload_stream(%{stream: stream}) when is_binary(stream), do: stream
defp payload_stream(_payload), do: nil
defp payload_request_id(%{"id" => id}) when not is_nil(id), do: to_string(id)
defp payload_request_id(%{id: id}) when not is_nil(id), do: to_string(id)
defp payload_request_id(_payload), do: nil
defp jsonb_value(value) when is_map(value), do: value
defp jsonb_value(value) when is_list(value), do: %{"data" => value}
defp jsonb_value(value), do: %{"value" => value}
defp canonical(value) when is_map(value) do
value
|> Enum.map(fn {key, item} -> {to_string(key), canonical(item)} end)
|> Enum.sort_by(&elem(&1, 0))
end
defp canonical(value) when is_list(value), do: Enum.map(value, &canonical/1)
defp canonical(value), do: value
defp uuid_v4 do
<<a::32, b::16, c::16, d::16, e::48>> = :crypto.strong_rand_bytes(16)
c = (c &&& 0x0FFF) ||| 0x4000
d = (d &&& 0x3FFF) ||| 0x8000
[
Base.encode16(<<a::32>>, case: :lower),
Base.encode16(<<b::16>>, case: :lower),
Base.encode16(<<c::16>>, case: :lower),
Base.encode16(<<d::16>>, case: :lower),
Base.encode16(<<e::48>>, case: :lower)
]
|> Enum.join("-")
end
defp normalize_opts(opts) when is_list(opts), do: Map.new(opts)
defp normalize_opts(opts) when is_map(opts), do: opts
defp opt_get(opts, key, default \\ nil),
do: Map.get(opts, key, Map.get(opts, to_string(key), default))
end