Skip to main content

lib/rx/backends/port_arrow/protocol.ex

defmodule Rx.Backends.PortArrow.Protocol do
  @moduledoc false

  def encode(header, body \\ "") when is_map(header) and is_binary(body) do
    header_json = Jason.encode!(header)
    <<byte_size(header_json)::32, header_json::binary, byte_size(body)::64, body::binary>>
  end

  def decode(buffer) when is_binary(buffer) do
    with {:ok, header_len} <- read_u32(buffer),
         true <- byte_size(buffer) >= 4 + header_len + 8,
         <<_::32, header_json::binary-size(header_len), body_len::64, rest::binary>> <- buffer,
         true <- byte_size(rest) >= body_len,
         {:ok, header} <- Jason.decode(header_json) do
      <<body::binary-size(body_len), tail::binary>> = rest
      {:ok, {header, body}, tail}
    else
      _ -> :more
    end
  end

  defp read_u32(<<len::32, _::binary>>), do: {:ok, len}
  defp read_u32(_), do: :more
end