defmodule XMAVLink.Utils do
@moduledoc ~s"""
MAVLink support functions used during code generation and runtime
Parts of this module are ported from corresponding implementations
in mavutils.py
"""
import Bitwise
import List, only: [flatten: 1]
import Enum, only: [sort_by: 2, join: 2, map: 2, reverse: 1]
@doc """
Sort parsed message fields into wire order according
to https://mavlink.io/en/guide/serialization.html
List extension fields separately so that we can
not include them for MAVLink 1 messages
"""
@spec wire_order([%{type: String.t(), is_extension: boolean}]) :: [[%{}]]
def wire_order(fields) do
type_order_map = %{
uint64_t: 1,
int64_t: 1,
double: 1,
uint32_t: 2,
int32_t: 2,
float: 2,
uint16_t: 3,
int16_t: 3,
uint8_t: 4,
uint8_t_mavlink_version: 4,
int8_t: 4,
char: 4
}
[
sort_by(
Enum.filter(fields, &(!&1.is_extension)),
&Map.fetch(type_order_map, String.to_atom(&1.type))
),
Enum.filter(fields, & &1.is_extension)
]
end
def eight_bit_checksum(value) do
Bitwise.bxor(value &&& 0xFF, value >>> 8)
end
@doc """
Calculate an x25 checksum of a list or binary based on
pymavlink mavcrc.x25crc
"""
@spec x25_crc([] | binary()) :: pos_integer
def x25_crc(list) when is_list(list) do
x25_crc(0xFFFF, flatten(list))
end
def x25_crc(bin) when is_binary(bin) do
x25_crc(0xFFFF, bin)
end
def x25_crc(crc, []), do: crc
def x25_crc(crc, <<>>), do: crc
def x25_crc(crc, [head | tail]) do
crc |> x25_accumulate(head) |> x25_crc(tail)
end
def x25_crc(crc, <<head::size(8), tail::binary>>) do
crc |> x25_accumulate(head) |> x25_crc(tail)
end
defp x25_accumulate(crc, value) do
tmp = Bitwise.bxor(value, crc &&& 0xFF)
tmp = Bitwise.bxor(tmp, tmp <<< 4) &&& 0xFF
crc = Bitwise.bxor(Bitwise.bxor(Bitwise.bxor(crc >>> 8, tmp <<< 8), tmp <<< 3), tmp >>> 4)
crc &&& 0xFFFF
end
@doc "Helper function for messages to pack string fields"
@spec pack_string(binary, non_neg_integer) :: binary
def pack_string(s, ordinality) do
s |> String.pad_trailing(ordinality, <<0>>)
end
@doc "Helper function for messages to pack array fields"
@spec pack_array(list(), integer, (any() -> binary())) :: binary()
def pack_array(a, ordinality, _) when length(a) > ordinality,
do: {:error, "Maximum elements allowed is \#{ordinality}"}
def pack_array(a, ordinality, field_packer) when length(a) < ordinality,
do: pack_array(a ++ [0], ordinality, field_packer)
def pack_array(a, _, field_packer), do: a |> map(field_packer) |> join(<<>>)
@doc "Helper function for decode() to unpack array fields"
# TODO bitstring generator instead? https://elixir-lang.org/getting-started/comprehensions.html
@spec unpack_array(binary(), (binary() -> {any(), list()})) :: list()
def unpack_array(bin, fun), do: unpack_array(bin, fun, [])
def unpack_array(<<>>, _, lst), do: reverse(lst)
def unpack_array(bin, fun, lst) do
{elem, rest} = fun.(bin)
unpack_array(rest, fun, [elem | lst])
end
@doc """
Resolve an address string to an IP address tuple.
Accepts both IP addresses (e.g., "192.168.1.1") and DNS hostnames
(e.g., "service.namespace.svc.cluster.local").
Uses Erlang's `:inet.getaddr/2` for resolution, which handles both
IP addresses and DNS lookups.
Returns `{:ok, ip_tuple}` on success or `{:error, reason}` on failure.
## Examples
iex> resolve_address("127.0.0.1")
{:ok, {127, 0, 0, 1}}
iex> resolve_address("localhost")
{:ok, {127, 0, 0, 1}}
"""
def resolve_address(address) when is_binary(address) do
# Convert string to charlist for :inet.getaddr
charlist_address = String.to_charlist(address)
case :inet.getaddr(charlist_address, :inet) do
{:ok, ip_tuple} ->
{:ok, ip_tuple}
{:error, reason} ->
{:error, reason}
end
end
@doc "Parse an ip address string into a tuple"
def parse_ip_address(address) when is_binary(address) do
parse_ip_address(String.split(address, "."), [], 0)
end
def parse_ip_address([], address, 4) do
List.to_tuple(reverse(address))
end
def parse_ip_address([], _, _) do
{:error, :invalid_ip_address}
end
def parse_ip_address([component | rest], address, count) do
case Integer.parse(component) do
:error ->
{:error, :invalid_ip_address}
{n, _} ->
cond do
n >= 0 and n <= 255 ->
parse_ip_address(rest, [n | address], count + 1)
true ->
{:error, :invalid_ip_address}
end
end
end
def parse_positive_integer(port) when is_binary(port) do
case Integer.parse(port) do
:error ->
:error
{n, _} when n > 0 ->
n
_ ->
:error
end
end
def pack_float(f) when is_float(f), do: <<f::little-signed-float-size(32)>>
# Have received these from QGroundControl
def pack_float(:nan), do: <<0, 0, 192, 127>>
def unpack_float(<<f::little-signed-float-size(32)>>), do: f
def unpack_float(<<0, 0, 192, 127>>), do: :nan
def pack_double(f) when is_float(f), do: <<f::little-signed-float-size(64)>>
# Quick test in C gave this for double NaN
def pack_double(:nan), do: <<0, 0, 0, 0, 0, 0, 248, 127>>
def unpack_double(<<f::little-signed-float-size(64)>>), do: f
def unpack_double(<<0, 0, 0, 0, 0, 0, 248, 127>>), do: :nan
end