lib/scenic/pubsub.ex

#
#  Created by Boyd Multerer on August 20, 2018.
#  Changed to Scenic.PubSub on 2021-06-08
#  Copyright © 2018-2021 Kry10 Limited. All rights reserved.
#
# Centralized channel data pub-sub with cache
# Was originally Scenic.Sensor.

defmodule Scenic.PubSub do
  use GenServer

  @moduledoc """
  `Scenic.PubSub` is a combination pub/sub server and data cache.

  It is intended to be the interface between sensors (or other data sources) and Scenic scenes.

  ## Why Scenic.PubSub

  Sensors (or other data sources) and scenes often need to communicate, but tend to operate on different timelines.

  Some sensors update fairly slowly or don't behave well when asked to get data at random times by multiple clients.
  `Scenic.PubSub` is backed by a `GenServer` that collects data from a data source in a well-behaved manner,
  yet is able to serve that data on demand or by subscription to many clients.


  ## Global Scope

  It is important to note that `Scenic.PubSub` is global in scope. In other words, anything published
  into `Scenic.PubSub` is visible to all `ViewPorts` and `Scenes`.

  ## Registering Data Sources

  Before a process can start publishing data from a source, it must register a source id with `Scenic.PubSub`.
  This source id should be an atom. This prevents other processes from stepping on that data and alerts any
  subscribing processes that the data is coming online.

        Scenic.PubSub.register( source_id )

  The `source_id` parameter must be an atom that names the sensor. Subscribers will use this id to request
  data or subscriptions to the source.

  You can can also unregister data sources if they are no longer available.

        Scenic.PubSub.unregister( source_id )

  Simply exiting the data source process does also cleans up its registration.


  ## Publishing Data

  When a sensor process publishes data, two things happen. First, that data is cached in an `:ets` table so
  that future requests for that data from scenes happen quickly and don't need to bother the data
  source process. Second, any processes that have subscribed to that source are sent a message containing the new data.

        Scenic.PubSub.publish( source_id, value )

  The `source_id` parameter must be the atom that was previously registered.

  The `value` parameter can be anything that makes sense for the data source.


  ## Subscribing to a Data Source

  Scenes (or any other process) can subscribe to a data source. They will receive messages when the source updates its data, comes online, or goes away.

        Scenic.PubSub.subscribe( source_id )

  The `source_id` parameter is the atom registered for the data source. Note that the name source does NOT
  need to be registered when a listening process subscribes to it. When the source process eventually registers and
  starts publishing data, the listening subscribers will be notified.


  The subscribing process will then start receiving messages that can be handled with `handle_info/2`

  event | message sent to subscribers
  --- | ---
  data published | `{{Scenic.PubSub, :data}, {source_id, value, timestamp}}` 
  source registered | `{{Scenic.PubSub, :registered}, {source_id, opts}}` 
  source unregistered | `{{Scenic.PubSub, :unregistered}, source_id}` 



  Scenes can also unsubscribe if they are no longer interested in updates.

        Scenic.PubSub.unsubscribe( source_id )

  ## Other functions

  Any process can get data from a source on demand, whether or not it is a subscriber.

        Scenic.PubSub.get( source_id )
        >> {:ok, data}

  Any process can list the currently registered data sources.

        Scenic.PubSub.list()
        >> [{source_id, opts, pid}]
  """

  # ets table names
  @table __MODULE__
  @name __MODULE__

  @data {__MODULE__, :data}
  @registered {__MODULE__, :registered}
  @unregistered {__MODULE__, :unregistered}

  defmodule Error do
    @moduledoc false
    defexception message: nil, source_id: nil
  end

  # ============================================================================
  # client api

  # --------------------------------------------------------
  @doc """
  Retrieve the cached data value for a named data source.

  This data is pulled from an `:ets` table and does not put load on the data source itself.

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

        data

  If the data source is either not registered, or has not yet published any data, get returns

        nil
  """

  @spec get(source_id :: atom) :: any | nil
  def get(source_id) when is_atom(source_id) do
    case :ets.lookup(@table, source_id) do
      [{_key, data, _timestamp}] -> data
      _ -> nil
    end
  end

  # --------------------------------------------------------
  @doc """
  Retrieve the cached data value for a named data source.

  Raises an error if the value is not registered

  This data is pulled from an `:ets` table and does not put load on the data source itself.

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

        data

  If the data source is either not registered, or has not yet published any data, get returns

        nil
  """

  @spec get!(source_id :: atom) :: any
  def get!(source_id) when is_atom(source_id) do
    case fetch(source_id) do
      {:ok, data} -> data
      _ -> raise Error, message: "#{inspect(source_id)} is not registered", source_id: source_id
    end
  end

  # --------------------------------------------------------
  @doc """
  Retrieve the cached data for a named data source.

  This data is pulled from an `:ets` table and does not put load on the data source itself.

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

        {:ok, {source_id, data, timestamp}}

  * `source_id` is the atom representing the data source.
  * `data` source_id whatever data the data source last published.
  * `timestamp` is the time - from `:os.system_time(:micro_seconds)` - the last data was published.

  If the data source is either not registered, or has not yet published any data, get returns

        {:error, :no_data} 
  """

  @spec fetch(source_id :: atom) :: {:ok, any} | {:error, :not_found}
  def fetch(source_id) when is_atom(source_id) do
    case :ets.lookup(@table, source_id) do
      [{_key, data, _timestamp}] ->
        {:ok, data}

      # no data
      _ ->
        {:error, :not_found}
    end
  end

  # --------------------------------------------------------
  @doc """
  Retrieve the full cached data for a named data source.

  This data is pulled from an `:ets` table and does not put load on the data source itself.

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

        {:ok, {source_id, data, timestamp}}

  * `source_id` is the atom representing the data source.
  * `data` source_id whatever data the data source last published.
  * `timestamp` is the time - from `:os.system_time(:micro_seconds)` - the last data was published.

  If the data source is either not registered, or has not yet published any data, get returns

        {:error, :not_found} 
  """

  @spec query(source_id :: atom) :: {:ok, any} | {:error, :not_found}
  def query(source_id) when is_atom(source_id) do
    case :ets.lookup(@table, source_id) do
      [data] ->
        {:ok, data}

      # no data
      _ ->
        {:error, :not_found}
    end
  end

  # --------------------------------------------------------
  @doc """
  List the registered data sources.

  ## Return Value

  `list/0` returns a list of registered data sources

        [{source_id, version, description, pid}]

  * `source_id` is the atom representing the data source.
  * `opts` options list of metadata about the data source.
    * `:version` is the version string supplied by the data source during registration.
    * `:description` is the description string supplied by the data source during registration.
    * `:registered_at` The system time the data source was registered at.
  * `pid` is the pid of the data source's process.
  """
  @spec list() :: [{atom, Keyword.t(), pid}]
  def list() do
    :ets.match(@table, {{:registration, :"$1"}, :"$2", :"$3"})
    |> Enum.map(fn [key, opts, pid] -> {key, opts, pid} end)
  end

  # --------------------------------------------------------
  @doc """
  Publish a data point from a data source.

  When a data source uses `publish/2` to publish data, that data is recorded in the
  cache and a
        {{Scenic.PubSub, :data}, {source_id, my_value, timestamp}}
  message is sent to each subscriber. The timestamp is the current time in microseconds as returned
  from `:os.system_time(:micro_seconds)`.

  ## Parameters
  * `source_id` an atom that is registered to a data source.
  * `data` the data to publish.

  ## Return Value

  On success, returns `:ok`

  It returns `{:error, :not_registered}` if the caller is not the
  registered process for the data source.
  """
  @spec publish(source_id :: atom, data :: any) :: :ok
  def publish(source_id, data) when is_atom(source_id) do
    timestamp = :os.system_time(:micro_seconds)
    pid = self()

    # enforce that this is coming from the registered data source pid
    case :ets.lookup(@table, {:registration, source_id}) do
      [{_, _, ^pid}] ->
        send(@name, {:put_data, source_id, data, timestamp})
        :ok

      # no data
      _ ->
        {:error, :not_registered}
    end
  end

  # --------------------------------------------------------
  @doc """
  Subscribe the calling process to receive events about a data source.

  The messages the subscriber will start receiving about a data source are:

  event | message sent to subscribers
  --- | ---
  data published | `{{Scenic.PubSub, :data}, {source_id, value, timestamp}}` 
  source registered | `{{Scenic.PubSub, :registered}, {source_id, opts}}` 
  source unregistered | `{{Scenic.PubSub, :unregistered}, source_id}` 

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

  On success, returns `:ok`
  """
  @spec subscribe(source_id :: atom) :: :ok
  def subscribe(source_id) when is_atom(source_id) do
    GenServer.call(@name, {:subscribe, source_id, self()})
  end

  # --------------------------------------------------------
  @doc """
  Unsubscribe the calling process from receive events about a data source.

  The caller will stop receiving events about a data source

  ## Parameters
  * `source_id` an atom that is registered to a data source.

  ## Return Value

  Returns `:ok`
  """
  @spec unsubscribe(source_id :: atom) :: :ok
  def unsubscribe(source_id) when is_atom(source_id) do
    send(@name, {:unsubscribe, source_id, self()})
    :ok
  end

  # --------------------------------------------------------
  @register_opts_schema [
    version: [type: :string, doc: "Data format version"],
    description: [type: :string, doc: "Your appropriate description"]
  ]
  @doc """
  Register the calling process as a data source for the named id.

  ## Parameters
  * `source_id` the data source being registered.
  * `opts` optional information about the data source.

  Supported options:\n#{NimbleOptions.docs(@register_opts_schema)}

  ## Return Value

  On success, returns `{:ok, source_id}`

  If `source_id` is already registered to another process, it returns

      {:error, :already_registered}
  """
  @spec register(source_id :: atom, opts :: Keyword.t()) ::
          {:ok, atom} | {:error, :already_registered}
  def register(source_id, opts \\ []) when is_atom(source_id) do
    opts = Enum.into(opts, [])

    case NimbleOptions.validate(opts, @register_opts_schema) do
      {:ok, opts} -> opts
      {:error, error} -> raise Error, message: error, source_id: source_id
    end

    opts = Keyword.put(opts, :registered_at, :os.system_time(:micro_seconds))

    GenServer.call(
      @name,
      {:register, source_id, opts, self()}
    )
  end

  # --------------------------------------------------------
  @doc """
  Unregister the calling process as a data source for a data source.

  ## Parameters
  * `source_id` the data source being registered.

  ## Return Value

  Returns `:ok`
  """
  @spec unregister(source_id :: atom) :: :ok
  def unregister(source_id) when is_atom(source_id) do
    send(@name, {:unregister, source_id, self()})
    :ok
  end

  # ============================================================================
  # internal api

  # --------------------------------------------------------
  @doc false
  def start_link(_) do
    GenServer.start_link(__MODULE__, :ok, name: @name)
  end

  # --------------------------------------------------------
  @doc false
  def init(:ok) do
    # set up the initial state
    state = %{
      data_table_id: :ets.new(@table, [:named_table]),
      subs_id: %{},
      subs_pid: %{}
    }

    # trap exits so we don't just crash when a subscriber goes away
    Process.flag(:trap_exit, true)

    {:ok, state}
  end

  # ============================================================================

  # --------------------------------------------------------
  # a data source (or whatever) is putting data
  @doc false
  # the client api enforced the pid check
  # yes, you could get around that by sending this message directly
  # not best-practice, but is an escape valve.
  # timestamp should be from :os.system_time(:micro_seconds)
  def handle_info({:put_data, source_id, data, timestamp}, state) do
    :ets.insert(@table, {source_id, data, timestamp})
    send_subs(source_id, @data, {source_id, data, timestamp}, state)
    {:noreply, state}
  end

  # --------------------------------------------------------
  @doc false
  def handle_info({:unsubscribe, source_id, pid}, state) do
    {:noreply, unsubscribe(pid, source_id, state)}
  end

  # ============================================================================
  # handle linked processes going down

  # --------------------------------------------------------
  def handle_info({:EXIT, pid, _reason}, state) do
    # unsubscribe everything this pid was listening to
    state = do_unsubscribe(pid, :all, state)

    # if this pid was registered as a data source, unregister it
    :ets.match(@table, {{:registration, :"$1"}, :_, :_, pid})
    |> Enum.each(fn [id] -> do_unregister(id, pid, state) end)

    {:noreply, state}
  end

  # --------------------------------------------------------
  @doc false
  def handle_info({:unregister, source_id, pid}, state) do
    do_unregister(source_id, pid, state)
    {:noreply, state}
  end

  # ============================================================================
  # CALLs - mostly for postive confirmation of sign-up style things

  # --------------------------------------------------------
  @doc false
  def handle_call({:subscribe, source_id, pid}, _from, state) do
    {reply, state} = do_subscribe(pid, source_id, state)

    # send the already-set value if one is set
    case query(source_id) do
      {:ok, data} -> send(pid, {@data, data})
      _ -> :ok
    end

    {:reply, reply, state}
  end

  # --------------------------------------------------------
  @doc false
  # handle data source registration
  def handle_call({:register, source_id, opts, pid}, _from, state) do
    key = {:registration, source_id}

    {reply, state} =
      case :ets.lookup(@table, key) do
        # registered to pid - ok to change
        [{_, _, ^pid}] ->
          do_register(pid, source_id, opts, state)

        # previously crashed
        [{_, _, nil}] ->
          do_register(pid, source_id, opts, state)

        # registered to other. fail
        [_] ->
          {{:error, :already_registered}, state}

        [] ->
          do_register(pid, source_id, opts, state)
      end

    {:reply, reply, state}
  end

  # ============================================================================
  # handle data source registrations

  # --------------------------------------------------------
  defp do_register(pid, source_id, opts, state) do
    key = {:registration, source_id}
    :ets.insert(@table, {key, opts, pid})
    # link the data source
    Process.link(pid)
    # alert the subscribers
    send_subs(source_id, @registered, {source_id, opts}, state)
    # reply is sent back to the data source
    {{:ok, source_id}, state}
  end

  # --------------------------------------------------------
  defp do_unregister(source_id, pid, state) do
    reg_key = {:registration, source_id}

    # first, get the registration and confirm this pid is registered
    case :ets.lookup(@table, reg_key) do
      [{_, _, ^pid}] ->
        # alert the subscribers
        send_subs(source_id, @unregistered, source_id, state)

        # delete the table entries
        :ets.delete(@table, reg_key)
        :ets.delete(@table, source_id)

        unlink_pid(pid, state)
        :ok

      # no registered. do nothing
      _ ->
        :ok
    end
  end

  # ============================================================================
  # handle client subscriptions

  # --------------------------------------------------------
  @spec do_subscribe(pid :: GenServer.server(), source_id :: atom, state :: map) :: any
  defp do_subscribe(pid, source_id, %{subs_id: subs_id, subs_pid: subs_pid} = state) do
    # record the subscription
    subs_id =
      Map.put(
        subs_id,
        source_id,
        [pid | Map.get(subs_id, source_id, [])] |> Enum.uniq()
      )

    subs_pid =
      Map.put(
        subs_pid,
        pid,
        [source_id | Map.get(subs_pid, pid, [])] |> Enum.uniq()
      )

    # make sure the subscriber is linked
    Process.link(pid)

    {:ok, %{state | subs_id: subs_id, subs_pid: subs_pid}}
  end

  # --------------------------------------------------------
  @spec do_unsubscribe(pid :: GenServer.server(), source_id :: atom, state :: map) :: any
  defp do_unsubscribe(pid, :all, %{subs_pid: subs_pid} = state) do
    Map.get(subs_pid, pid, [])
    |> Enum.reduce(state, &unsubscribe(pid, &1, &2))
  end

  # --------------------------------------------------------
  defp unsubscribe(pid, source_id, %{subs_id: subs_id, subs_pid: subs_pid} = state) do
    # clean up the subs for a given source_id
    subs_by_id =
      Map.get(subs_id, source_id, [])
      |> Enum.reject(fn sub_pid -> sub_pid == pid end)

    subs_id = Map.put(subs_id, source_id, subs_by_id)

    # part two
    subs_by_pid =
      Map.get(subs_pid, pid, [])
      |> Enum.reject(fn sub_id -> sub_id == source_id end)

    subs_pid = Map.put(subs_pid, pid, subs_by_pid)

    state = %{state | subs_id: subs_id, subs_pid: subs_pid}

    # if pid no longer subscribed to anything, then some further cleanup
    state =
      case subs_by_pid do
        [] ->
          {_, state} = pop_in(state, [:subs_pid, pid])
          state

        _ ->
          state
      end

    # if channel has no subscribers, then some further cleanup
    state =
      case subs_by_id do
        [] ->
          {_, state} = pop_in(state, [:subs_id, source_id])
          state

        _ ->
          state
      end

    # does the right thing. only unlinks if no longer subscribing
    # to anything and is not a channel
    unlink_pid(pid, state)

    state
  end

  # --------------------------------------------------------
  defp send_subs(source_id, verb, msg, %{subs_id: subs}) do
    msg = {verb, msg}

    subs
    |> Map.get(source_id, [])
    |> Enum.each(&send(&1, msg))
  end

  # --------------------------------------------------------
  # only unlink a pid if it is not a registered channel AND it
  # has no subscriptions. return the state
  defp unlink_pid(pid, %{subs_pid: subs_pid}) do
    no_subs =
      case subs_pid[pid] do
        nil -> true
        [] -> true
        _ -> false
      end

    not_sensor =
      case :ets.match(@table, {{:registration, :"$1"}, :_, pid}) do
        [] -> true
        _ -> false
      end

    if no_subs && not_sensor do
      Process.unlink(pid)
    end
  end
end