lib/input_event.ex

defmodule InputEvent do
  @moduledoc """
  Elixir interface to Linux input event devices
  """
  use GenServer

  alias InputEvent.Info
  alias InputEvent.Report

  require Logger

  @typedoc "An unknown event type"
  @type type_number() :: 0..0xFFFF
  @typedoc "The type of event"
  @type type_name() ::
          :ev_syn
          | :ev_key
          | :ev_rel
          | :ev_abs
          | :ev_msc
          | :ev_sw
          | :ev_led
          | :ev_snd
          | :ev_rep
          | :ev_ff
          | :ev_pwr
          | :ev_ff_status

  @typedoc """
  Event type

  Usually these are translated to an atom that corresponds with the Linux event type.
  """
  @type type() :: type_name() | type_number()

  @type code_number() :: 0..0xFFFF

  @typedoc """
  Event code

  Usually these are translated to an atom that corresponds with the Linux event code.
  Event codes depend on the event type.
  """
  @type code() :: atom() | code_number()

  @typedoc """
  Event value

  See the event type and code for how to interpret the value. For example, it could be a
  0 or 1 signifying a key press or release, or it could be an x or y coordinate or delta.
  """
  @type value() :: integer()

  @typedoc """
  Event structure
  """
  @type event() :: {type(), code(), value()}

  @typedoc """
  Message that is sent to the caller when input event received
  """
  @type events_message() :: {:input_event, String.t(), [event()]}

  @input_event_report 1
  @input_event_version 2
  @input_event_name 3
  @input_event_id 4
  @input_event_report_info 5
  @input_event_ready 6

  @typedoc """
  Options for the InputEvent Genserver
  """
  @type options() :: [
          path: String.t(),
          grab: boolean(),
          receiver: pid() | atom(),
          repeat_delay: pos_integer(),
          repeat_period: pos_integer()
        ]

  @doc """
  Start a GenServer that reports events from the specified input event device

  Options:
  * `:path` - the path to the input event device (e.g., `"/dev/input/event0"`)
  * `:grab` - set to true to prevent events from being passed to other applications (defaults to `false`)
  * `:repeat_delay` - delay in milliseconds before a key press repeats
  * `:repeat_period` - period in milliseconds in which a key press will repeat
  * `:receiver` - the pid or name of the process that receives events (defaults to the process that calls `start_link/1`

  Note that passing the device path rather than a keyword list to
  `start_link/1` is deprecated.

  When adjusting the key repeat rate, you must set BOTH `:repeat_delay` and
  `:repeat_period` for `input_event` to make the change. Be careful setting
  repeat timing in multiple places on the same device path! You might override
  your own settings!
  """
  @spec start_link(String.t() | options()) :: GenServer.on_start()
  def start_link(path) when is_binary(path) do
    start_link(path: path)
  end

  def start_link(options) when is_list(options) do
    options[:path] || raise ArgumentError, "InputEvent requires a input event device path"
    updated_options = Keyword.put_new(options, :receiver, self())
    GenServer.start_link(__MODULE__, updated_options)
  end

  @doc """
  Return information about this input event device
  """
  @spec info(GenServer.server()) :: Info.t()
  def info(server) do
    GenServer.call(server, :info)
  end

  @doc """
  Stop the InputEvent GenServer.
  """
  @spec stop(GenServer.server()) :: :ok
  def stop(server) do
    GenServer.stop(server)
  end

  @doc """
  Scan the system for input devices and return information on each one.
  """
  @spec enumerate() :: [{String.t(), Info.t()}]
  defdelegate enumerate(), to: InputEvent.Enumerate

  @impl GenServer
  def init(init_args) do
    executable = :code.priv_dir(:input_event) ++ ~c"/input_event"

    path = Keyword.fetch!(init_args, :path)
    grab = Keyword.get(init_args, :grab, false)
    receiver = Keyword.fetch!(init_args, :receiver)

    repeat_delay = Keyword.get(init_args, :repeat_delay)
    repeat_period = Keyword.get(init_args, :repeat_period)

    repeat_args =
      if is_integer(repeat_delay) and is_integer(repeat_period) do
        [to_string(repeat_delay), to_string(repeat_period)]
      else
        []
      end

    port =
      Port.open({:spawn_executable, executable}, [
        {:args, [path, grab] ++ repeat_args},
        {:packet, 2},
        :use_stdio,
        :binary,
        :exit_status
      ])

    state = %{
      port: port,
      path: path,
      info: %Info{},
      callback: receiver,
      ready: false,
      deferred: []
    }

    {:ok, state}
  end

  @impl GenServer
  def handle_call(:info, _from, %{ready: true} = state) do
    {:reply, state.info, state}
  end

  def handle_call(:info, from, state) do
    {:noreply, %{state | deferred: [from | state.deferred]}}
  end

  @impl GenServer
  def handle_info({_port, {:data, data}}, state) do
    new_state = process_notification(state, data)
    {:noreply, new_state}
  end

  def handle_info({_port, {:exit_status, _rc}}, state) do
    send(state.callback, {:input_event, state.path, :disconnect})
    {:stop, :port_crashed, state}
  end

  def handle_info(other, state) do
    Logger.warning("InputEvent: ignoring #{inspect(other)}")
    {:noreply, state}
  end

  defp process_notification(state, <<@input_event_report, _sub, raw_events::binary>>) do
    callback = state.callback

    if callback do
      raw_events
      |> Report.decode()
      |> Enum.each(fn events ->
        send(callback, {:input_event, state.path, events})
      end)
    end

    state
  end

  defp process_notification(state, <<@input_event_version, _sub, version::binary>>) do
    new_info = %{state.info | input_event_version: version}
    %{state | info: new_info}
  end

  defp process_notification(state, <<@input_event_name, _sub, name::binary>>) do
    new_info = %{state.info | name: name}
    %{state | info: new_info}
  end

  defp process_notification(
         state,
         <<@input_event_id, _sub, bus::native-16, vendor::native-16, product::native-16,
           version::native-16>>
       ) do
    new_info = %{state.info | bus: bus, vendor: vendor, product: product, version: version}
    %{state | info: new_info}
  end

  defp process_notification(state, <<@input_event_report_info, type, raw_report_info::binary>>) do
    old_report_info = state.info.report_info
    report_info = Info.decode_report_info(type, raw_report_info)
    new_info = %{state.info | report_info: [report_info | old_report_info]}
    %{state | info: new_info}
  end

  defp process_notification(state, <<@input_event_ready, _sub>>) do
    Enum.each(state.deferred, fn client -> GenServer.reply(client, state.info) end)
    %{state | ready: true, deferred: []}
  end
end