lib/mavlink_util/cache_manager.ex

defmodule MAVLink.Util.CacheManager do
  @moduledoc """

  Populate and keep updated a set of protected ETS tables representing:
  
  - the visible MAV systems
  - the most recently received messages for each MAV and message type
  - the most recently received set of parameters for each MAV

  Using ETS tables allows clients to perform read only API operations directly
  on the tables, preventing this GenServer from becoming a bottleneck.
  """
  
  use GenServer
  require Logger
  alias MAVLink.Router, as: MAV
  alias MAVLink.Util.ParamRequest
  alias APM.Message.Heartbeat
  alias APM.Message.ParamValue
  import MAVLink.Util.FocusManager, only: [focus: 0]
  
  
  @messages :messages
  @systems :systems
  @params :params
  @one_second_loop :one_second_loop
  @five_second_loop :five_second_loop
  @ten_second_loop :ten_second_loop
  
  
  defstruct []
  
  
  # API
  
  def start_link(state, opts \\ []) do
    GenServer.start_link(__MODULE__, state, [{:name, __MODULE__} | opts])
  end
  
  
  def mavs() do
    scids = :ets.foldl(fn ({scid, _}, acc) -> [scid | acc] end, [], @systems)
    Logger.info("Listing #{length(scids)} visible vehicles")
    {:ok, scids}
  end
  
  
  def msg() do
    with {:ok, scid} <- focus() do
      msg(scid)
    end
  end
  
  def msg({system_id, component_id, _}) do
    {
      :ok,
      :ets.foldl(
        fn
          {{^system_id, ^component_id, msg_type}, {received, msg}}, acc ->
            Enum.into [{msg_type, {now() - received, msg}}], acc
          _, acc ->
            acc
        end, %{}, @messages)
    }
  end
  
  def msg(name) do
    with {:ok, scid} <- focus() do
      msg(scid, name)
    end
  end
  
  def msg({system_id, component_id, _}, msg_type) when is_atom(msg_type) do
    with [{_key, {received, message}}] <- :ets.lookup(@messages, {system_id, component_id, msg_type}) do
      Logger.info("Most recent \"#{dequalify_msg_type msg_type}\" message")
      {:ok, now() - received, message}
    else
      _ ->
        Logger.warn("Error attempting to retrieve message of type \"#{dequalify_msg_type msg_type}\"")
        {:error, :no_such_message}
    end
  end
  
  
  def params() do
    with {:ok, scid} <- focus() do
      params(scid)
    end
  end
  
  def params(scid={_, _, _}) do
    params(scid, "")
  end
  
  def params(match) when is_binary(match) do
    with {:ok, scid} <- focus() do
      params(scid, match)
    end
  end
  
  def params({system_id, component_id, _mavlink_version}, match) when is_binary(match) do
    with match_upcase <- String.upcase(match),
         param_map when is_map(param_map) <- :ets.foldl(
           fn
             {{^system_id, ^component_id, param_id},
              {_, %ParamValue{param_value: param_value}}}, acc ->
               if String.contains?(param_id, match_upcase) do
                 Enum.into [{param_id |> String.downcase |> String.to_atom, param_value}], acc
               else
                 acc
               end
             _, acc ->
               acc
           end, %{}, @params) do
      Logger.info("Listing #{param_map |> Map.keys |> length} parameters matching \"#{match}\"")
      {:ok, param_map}
    else
      _ ->
        Logger.warn("Error attempting to query params matching \"#{match}\"")
        {:error, :query_failed}
    end
  end
  
  
  @impl true
  def init(_opts) do
    :ets.new(@messages, [:named_table, :protected, {:read_concurrency, true}, :set])
    :ets.new(@systems, [:named_table, :protected, {:read_concurrency, true}, :ordered_set])
    :ets.new(@params, [:named_table, :protected, {:read_concurrency, true}, :ordered_set])
    
    MAV.subscribe as_frame: true
    
    {
      :ok,
      %MAVLink.Util.CacheManager{}
      |> one_second_loop
      |> five_second_loop
      |> ten_second_loop
    }
  end

  
  @impl true
  def handle_call(_msg, _caller, state) do
    {:reply, :ok, state}
  end

  
  @impl true
  def handle_cast(_msg, state) do
    {:noreply, state}
  end
  
  
  @impl true
  def handle_info(
        %MAVLink.Frame{
          message: message = %{__struct__: message_type},
          source_system: source_system,
          source_component: source_component,
          version: source_version
        },
        state) do
    # Get the previously cached message of this type from the MAV, if any
    previous_message_list = :ets.lookup(@messages, {source_system, source_component, message_type})
    
    # Replace with the new message
    :ets.insert(@messages, {{source_system, source_component, message_type}, {now(), message}})
    
    # Delegate any message-specific behaviour to handle_mav_message()
    case previous_message_list do
      [] ->
        {:noreply, handle_mav_message(source_system, source_component, nil, message, source_version, state)}
      [previous_message] ->
        {:noreply, handle_mav_message(source_system, source_component, previous_message, message, source_version, state)}
    end
  end
  
  def handle_info(@one_second_loop, state) do
    :timer.send_after(1_000, @one_second_loop)
    {:noreply, one_second_loop(state)}
  end
  
  def handle_info(@five_second_loop, state) do
    :timer.send_after(5_000, @one_second_loop)
    {:noreply, five_second_loop(state)}
  end
  
  def handle_info(@ten_second_loop, state) do
    :timer.send_after(10_000, @one_second_loop)
    {:noreply, ten_second_loop(state)}
  end
  
  
  defp handle_mav_message(source_system_id, source_component_id, nil,
         %Heartbeat{type: type, mavlink_version: mavlink_minor_version},
         mavlink_major_version, state) do
    # First time this MAV system seen, create a system record
    :ets.insert(
      @systems,
      {
        {source_system_id, source_component_id},
        %{  # TODO System struct
          mavlink_major_version: mavlink_major_version,
          mavlink_minor_version: mavlink_minor_version,
          param_count: 0,
          param_count_loaded: 0}
      }
    )
    Logger.info("First sighting of vehicle #{source_system_id}.#{source_component_id}: #{APM.describe type}")
    spawn_link(ParamRequest, :param_request_list, [source_system_id, source_component_id, mavlink_major_version])
    state
  end
  
  defp handle_mav_message(source_system_id, source_component_id, _,
         param_value_msg = %ParamValue{param_id: param_id, param_count: param_count}, _, state) do
    with [{_, system=%{param_count_loaded: param_count_loaded}}] <- :ets.lookup(@systems, {source_system_id, source_component_id}),
         is_new <- :ets.lookup(@params, {source_system_id, source_component_id, param_id}) |> length |> Kernel.==(0),
         true <- :ets.insert(@params, {{source_system_id, source_component_id, param_id}, {now(), param_value_msg}}) do
      # TODO Hidden parameters can become un-hidden, increasing param_count, in which case we need to spawn param_request_list again.
      :ets.insert(
        @systems,
        {
          {source_system_id, source_component_id},
          %{system |
            param_count: param_count,
            param_count_loaded: (if is_new, do: param_count_loaded + 1, else: param_count_loaded)}
        }
      )
    end
    state
  end
  
  defp handle_mav_message(_, _, _, _, _, state), do: state
  
  
  defp one_second_loop(state) do
    state
  end
  
  
  defp five_second_loop(state) do
    state
  end
  
  
  defp ten_second_loop(state) do
    state
  end
  
  
  defp now(), do: :erlang.monotonic_time(:milli_seconds)

  
  defp dequalify_msg_type(msg_type) do
    to_string(msg_type)
    |> String.split(".")
    |> (fn parts -> parts |> Enum.reverse |> List.first end).()
  end
  
end