lib/mavlink/router.ex

defmodule XMAVLink.Router do
  @moduledoc """
  Connect to serial, udp and tcp ports and listen for, validate and
  forward MAVLink messages towards their destinations on other connections
  and/or Elixir processes subscribing to messages.

  The rules for MAVLink packet forwarding are described here:

    https://mavlink.io/en/guide/routing.html

  and here:

    http://ardupilot.org/dev/docs/mavlink-routing-in-ardupilot.html
  """

  use GenServer
  require Logger

  import XMAVLink.Utils, only: [resolve_address: 1, parse_positive_integer: 1]
  import Enum, only: [map: 2]

  alias XMAVLink.Types
  alias XMAVLink.Frame
  alias XMAVLink.Message
  alias XMAVLink.Router
  alias XMAVLink.LocalConnection
  alias XMAVLink.SerialConnection
  alias XMAVLink.TCPOutConnection
  alias XMAVLink.UDPInConnection
  alias XMAVLink.UDPOutConnection
  alias Circuits.UART

  @typedoc """
  Represents the state of the XMAVLink.Router. Initial values should be set in config.exs.

  ## Fields
  - dialect:            The XMAVLink dialect module generated by mix mavlink from a MAVLink definition file
  - connection_strings: Configuration strings describing the network and serial connections MAVLink messages
                        are to be received and sent over. Allowed formats are:

                        ```udpin:<local ip>:<local port>
                        udpout:<remote ip>:<remote port>
                        tcpout:<remote ip>:<remote port>
                        serial:<device>:<baud rate>```

                        Note there is no tcpin connection - tcp is rarely used for MAVLink, the exception
                        being SITL testing which requires a tcpout connection.

  - connections:        A map containing the state of the connections described by connection_strings along
                        with the local connection, which is automatically created and responsible for allowing
                        Elixir processes to receive and send MAVLink messages. The map is keyed by the port,
                        device or :local to allow the corresponding connection state to be easily retrieved
                        when we receive a message. See MAVLink.*Connection for map values. Note LocalConnection
                        contains the system/component id, subscriber list and next sequence number.
  - routes:             A map from a {system id, component id} tuple to the connection key a message from that
                        system/component was last received on. Used to forward messages to that system/component.
  """
  defstruct [
    # Generated dialect module
    dialect: nil,
    # Connection descriptions from user
    connection_strings: [],
    # %{socket|port|local: XMAVLink.*_Connection}
    connections: %{},
    # Connection and MAVLink version tuple keyed by MAVLink addresses
    routes: %{}
  ]

  # Can't used qualified type as map key
  @type mavlink_address :: Types.mavlink_address()
  @type mavlink_connection :: Types.connection()
  @type t :: %Router{
          dialect: module | nil,
          connection_strings: [String.t()],
          connections: %{},
          routes: %{mavlink_address: {mavlink_connection, Types.version()}}
        }

  ##############
  # Router API #
  ##############

  @doc """
  Start the MAVLink Router service. You should not call this directly, use the MAVLink application
  to start this under a supervision tree with the services it expects to use.

  ## Parameters
  - dialect:            Name of the module generated by mix mavlink task to represent the enumerations
                        and messages of a particular MAVLink dialect
  - system:             The System id of this system  1..255, typically low for vehicles and high for
                        ground stations
  - component:          The component id of this system 1..255, typically 1 for the autopilot
  - connection_strings: A list of strings in the following formats:

                        udpin:<local ip>:<local port>
                        udpout:<remote ip>:<remote port>
                        tcpout:<remote ip>:<remote port>
                        serial:<device>:<baud rate>

  - opts:               Standard GenServer options
  """
  @spec start_link(
          %{system: 1..255, component: 1..255, dialect: module, connection_strings: [String.t()]},
          [{atom, any}]
        ) :: {:ok, pid}
  def start_link(args, opts \\ []) do
    GenServer.start_link(
      __MODULE__,
      args,
      [{:name, __MODULE__} | opts]
    )
  end

  @doc """
  Subscribes the calling process to MAVLink messages from the installed dialect matching the query.

  ## Parameters

  - query:  Keyword list of zero or more of the following query keywords:

    message:          message_module | :unknown (use latter with as_frame)
    source_system:    integer 0..255
    source_component: integer 0..255
    target_system:    integer 0..255
    target_component: integer 0..255
    as_frame:         true|false (default false, shows entire message frame with sender/target details)

  ## Example

  ```
    XMAVLink.Router.subscribe message: XMAVLink.Message.Heartbeat, source_system: 1
  ```
  """
  @type subscribe_query_id_key ::
          :source_system | :source_component | :target_system | :target_component
  @spec subscribe([
          {:message, Message.t()} | {subscribe_query_id_key, 0..255} | {:as_frame, boolean}
        ]) :: :ok
  def subscribe(query \\ []) do
    with message <- Keyword.get(query, :message),
         true <- message == nil or Code.ensure_loaded?(message) do
      GenServer.cast(
        __MODULE__,
        {
          :subscribe,
          [
            message: nil,
            source_system: 0,
            source_component: 0,
            target_system: 0,
            target_component: 0,
            as_frame: false
          ]
          |> Keyword.merge(query)
          |> Enum.into(%{}),
          self()
        }
      )
    else
      false ->
        {:error, :invalid_message}
    end
  end

  @doc """
  Un-subscribes calling process from all existing subscriptions

  ## Example

  ```
    XMAVLink.Router.unsubscribe
  ```
  """
  @spec unsubscribe() :: :ok
  def unsubscribe(), do: GenServer.cast(__MODULE__, {:unsubscribe, self()})

  @doc """
  Send a MAVLink message to one or more recipients using available
  connections. For now if destination is unreachable it will log
  a warning

  ## Parameters

  - message: A MAVLink message structure from the installed dialect
  - version: Force sending using a specific MAVLink protocol (default 2)

  ## Example

  ```
    XMAVLink.Router.pack_and_send(
      %Common.RcChannelsOverride{
        target_system: 1,
        target_component: 1,
        chan1_raw: 1500,
        chan2_raw: 1500,
        chan3_raw: 1500,
        chan4_raw: 1500,
        chan5_raw: 1500,
        chan6_raw: 1500,
        chan7_raw: 1500,
        chan8_raw: 1500,
        chan9_raw: 0,
        chan10_raw: 0,
        chan11_raw: 0,
        chan12_raw: 0,
        chan13_raw: 0,
        chan14_raw: 0,
        chan15_raw: 0,
        chan16_raw: 0,
        chan17_raw: 0,
        chan18_raw: 0
      }
    )
  ```
  """
  def pack_and_send(message, version \\ 2) do
    # We can only pack payload at this point because we need router state to get source
    # system/component and sequence number for frame
    try do
      {:ok, message_id, {:ok, crc_extra, _, target}, payload} = Message.pack(message, version)

      {target_system, target_component} =
        if target != :broadcast do
          {message.target_system, Map.get(message, :target_component, 0)}
        else
          {0, 0}
        end

      # Although Router is a GenServer we still use send for symmetry because this arrives as
      # a vanilla message, just like messages from a udp/tcp/serial port, and is dealt with in
      # a similar way using handle_info()
      send(
        __MODULE__,
        {
          :local,
          struct(Frame,
            version: version,
            message_id: message_id,
            target_system: target_system,
            target_component: target_component,
            target: target,
            message: message,
            payload: payload,
            crc_extra: crc_extra
          )
        }
      )

      :ok
    rescue
      # Need to catch Protocol.UndefinedError - happens with SimState (Common) and Simstate (APM)
      # messages because non-case-sensitive filesystems (including OSX thanks @noobz) can't tell
      # the difference between generated module beam files. Work around is comment out one of the
      # message definitions and regenerate the dialect module.
      Protocol.UndefinedError ->
        {:error, :protocol_undefined}
    end
  end

  #######################
  # GenServer Callbacks #
  #######################

  @impl true
  # No dialect, no play.
  def init(%{dialect: nil}) do
    {:error, :no_mavlink_dialect_set}
  end

  # Start connections (they will send :add_connection messages
  # to us if successful) and initialise Router state
  def init(args) do
    LocalConnection.connect(:local, args.system, args.component)
    _ = map(args.connection_strings, &connect/1)
    {:ok, %Router{dialect: args.dialect, connection_strings: args.connection_strings}}
  end

  @impl true
  # Call to subscribe() API
  def handle_cast({:subscribe, query, pid}, state) do
    {:noreply,
     update_in(
       state,
       [Access.key!(:connections), :local],
       &LocalConnection.subscribe(query, pid, &1)
     )}
  end

  # Call to unsubscribe() API
  def handle_cast({:unsubscribe, pid}, state) do
    {:noreply,
     update_in(state, [Access.key!(:connections), :local], &LocalConnection.unsubscribe(pid, &1))}
  end

  @impl true
  # Receive data on UDP connection
  def handle_info(
        message = {:udp, socket, address, port, _},
        state = %Router{connections: connections, dialect: dialect}
      ) do
    {
      :noreply,
      case connections[{socket, address, port}] do
        connection = %UDPInConnection{} ->
          UDPInConnection.handle_info(message, connection, dialect)

        connection = %UDPOutConnection{} ->
          UDPOutConnection.handle_info(message, connection, dialect)

        nil ->
          # New previously unseen UDPIn client
          UDPInConnection.handle_info(message, nil, dialect)
      end
      |> update_route_info(state)
      |> route
    }
  end

  # Receive data on TCP connection
  def handle_info(message = {:tcp, socket, _}, state) do
    {
      :noreply,
      TCPOutConnection.handle_info(message, state.connections[socket], state.dialect)
      |> update_route_info(state)
      |> route
    }
  end

  # Unlike UDP, TCP connections can close
  def handle_info({:tcp_closed, socket}, state) do
    %TCPOutConnection{address: address, port: port} = state.connections[socket]
    spawn(TCPOutConnection, :connect, [["tcpout", address, port], self()])
    {:noreply, remove_connection(socket, state)}
  end

  # Received data on serial connection
  def handle_info(message = {:circuits_uart, port, raw}, state) when is_binary(raw) do
    {
      :noreply,
      SerialConnection.handle_info(message, state.connections[port], state.dialect)
      |> update_route_info(state)
      |> route
    }
  end

  # Received error on serial connection
  def handle_info({:circuits_uart, port, {:error, _reason}}, state) do
    %SerialConnection{baud: baud, uart: uart} = state.connections[port]

    spawn(SerialConnection, :connect, [
      ["serial", port, baud, :poolboy.checkout(XMAVLink.UARTPool)],
      self()
    ])

    :ok = UART.close(uart)
    # After checkout to make sure we get a fresh UART, this one might be reused later
    :poolboy.checkin(XMAVLink.UARTPool, uart)
    {:noreply, remove_connection(port, state)}
  end

  # A local subscribing Elixir process has crashed, remove them from our subscriber list
  def handle_info({:DOWN, _, :process, pid, _}, state),
    do:
      {:noreply,
       update_in(
         state,
         [Access.key!(:connections), :local],
         &LocalConnection.subscriber_down(pid, &1)
       )}

  # A call to pack_and_send() from a local Elixir process, sent as a vanilla message for symmetry with other connection types
  def handle_info({:local, frame}, state) do
    {
      :noreply,
      LocalConnection.handle_info({:local, frame}, state.connections.local, state.dialect)
      |> update_route_info(state)
      |> route
    }
  end

  # A spawned *Connection.connect() call has successfully got a
  # connection, and wants to add it to our connection list.
  def handle_info(
        {:add_connection, connection_key, connection},
        state = %Router{connections: connections}
      ) do
    {
      :noreply,
      struct(
        state,
        connections: Map.put(connections, connection_key, connection)
      )
    }
  end

  # Ignore weird messages
  def handle_info(_msg, state) do
    {:noreply, state}
  end

  ####################
  # Helper Functions #
  ####################

  # Handle user configured connections by spawning a process to try to connect. If successful they will send
  # us an :add_connection message with the details. The local connection gets added automatically.
  defp connect(connection_string) when is_binary(connection_string),
    do: connect(String.split(connection_string, [":", ","]))

  defp connect(tokens = ["udpin" | _]),
    do: spawn(UDPInConnection, :connect, [validate_address_and_port(tokens), self()])

  defp connect(tokens = ["udpout" | _]),
    do: spawn(UDPOutConnection, :connect, [validate_address_and_port(tokens), self()])

  defp connect(tokens = ["tcpout" | _]),
    do: spawn(TCPOutConnection, :connect, [validate_address_and_port(tokens), self()])

  defp connect(tokens = ["serial" | _]),
    do: spawn(SerialConnection, :connect, [validate_port_and_baud(tokens), self()])

  defp connect([invalid_protocol | _]),
    do: raise(ArgumentError, message: "invalid protocol #{invalid_protocol}")

  # Parse network connection strings
  defp validate_address_and_port([protocol, address, port]) do
    case {resolve_address(address), parse_positive_integer(port)} do
      {{:error, reason}, _} ->
        raise ArgumentError,
          message: "invalid address #{address}: #{inspect(reason)}"

      {_, :error} ->
        raise ArgumentError, message: "invalid port #{port}"

      {{:ok, parsed_address}, parsed_port} ->
        [protocol, parsed_address, parsed_port]
    end
  end

  # Parse serial port connection string
  defp validate_port_and_baud(["serial", port, baud]) do
    case {is_binary(port), parse_positive_integer(baud)} do
      {false, _} ->
        raise ArgumentError, message: "Invalid port #{port}"

      {_, :error} ->
        raise ArgumentError, message: "invalid baud rate #{baud}"

      {true, parsed_baud} ->
        # Have to checkout from uart pool in main process because
        # poolboy monitors the process that calls checkout, and
        # returns the UART to the pool when the caller dies. This
        # happens immediately to our spawned connect() calls so we
        # can't do this there, which would otherwise be a logical
        # place to do it.
        ["serial", port, parsed_baud, :poolboy.checkout(XMAVLink.UARTPool)]
    end
  end

  # A handle_info() received an error and wants us to forget the borked connection
  defp remove_connection(connection_key, state = %Router{connections: connections}) do
    struct(state, connections: Map.delete(connections, connection_key))
  end

  # Map system/component ids to connections on which they have been seen for targeted messages
  # Keep a list of all connections we have received messages from for broadcast messages
  defp update_route_info(
         {:ok, source_connection_key, source_connection,
          frame = %Frame{
            source_system: source_system,
            source_component: source_component
          }},
         state = %Router{routes: routes, connections: connections}
       ) do
    {
      :ok,
      source_connection_key,
      frame,
      struct(
        state,
        # Don't add system/components from local connection to routes because local
        # automatically matches everything in matching_system_components() and we
        # don't want to receive messages twice
        routes:
          case source_connection_key do
            :local ->
              routes

            _ ->
              Map.put(
                routes,
                {source_system, source_component},
                source_connection_key
              )
          end,
        connections:
          Map.put(
            connections,
            source_connection_key,
            source_connection
          )
      )
    }
  end

  # Connection state still needs to be updated if there is an error
  defp update_route_info(
         {:error, reason, connection_key, connection},
         state = %Router{connections: connections}
       ) do
    {
      :error,
      reason,
      struct(
        state,
        connections:
          Map.put(
            connections,
            connection_key,
            connection
          )
      )
    }
  end

  # Broadcast un-targeted messages to all connections except the
  # source we received the message from, unless it was local
  defp route(
         {:ok, source_connection_key, frame = %Frame{target: :broadcast},
          state = %Router{connections: connections}}
       ) do
    for {connection_key, connection} <- connections do
      if match?(:local, connection_key) or !match?(^connection_key, source_connection_key) do
        forward(connection, frame)
      end
    end

    state
  end

  # Only send targeted messages to observed system/components and local
  # Log warning if a message sent locally cannot reach its remote destination
  defp route(
         {:ok, _,
          frame = %Frame{
            source_system: source_system,
            source_component: source_component,
            target_system: target_system,
            target_component: target_component,
            message: %{__struct__: message_type}
          }, state = %Router{connections: connections}}
       ) do
    recipients = matching_system_components(target_system, target_component, state)

    if match?(
         {^recipients, ^source_system, ^source_component},
         {[:local], connections.local.system, connections.local.component}
       ) do
      :ok =
        Logger.debug(
          "Could not send message #{Atom.to_string(message_type)} to #{target_system}/#{target_component}: destination unreachable"
        )
    end

    for connection_key <- recipients do
      forward(connections[connection_key], frame)
    end

    state
  end

  # Swallow any errors from the handle_info |> update_connection_info pipeline
  defp route({:error, _reason, state = %Router{}}), do: state

  # Known system/components matching target with 0 wildcard
  # Always include local connection because we get to snoop
  # on everybody's messages
  defp matching_system_components(q_system, q_component, %Router{routes: routes}) do
    [
      :local
      | Enum.filter(
          routes,
          fn {{sid, cid}, _} ->
            (q_system == 0 or q_system == sid) and
              (q_component == 0 or q_component == cid)
          end
        )
        |> Enum.map(fn {_, ck} -> ck end)
    ]
  end

  # Delegate sending a message to connection-type specific code
  defp forward(connection = %UDPInConnection{}, frame),
    do: UDPInConnection.forward(connection, frame)

  defp forward(connection = %UDPOutConnection{}, frame),
    do: UDPOutConnection.forward(connection, frame)

  defp forward(connection = %TCPOutConnection{}, frame),
    do: TCPOutConnection.forward(connection, frame)

  defp forward(connection = %SerialConnection{}, frame),
    do: SerialConnection.forward(connection, frame)

  defp forward(connection = %LocalConnection{}, frame),
    do: LocalConnection.forward(connection, frame)
end