lib/opc_ua/client.ex

defmodule OpcUA.Client do
  use OpcUA.Common

  @config_keys ["requestedSessionTimeout", "secureChannelLifeTime", "timeout"]

  alias OpcUA.NodeId

  @moduledoc """

  OPC UA Client API module.

  This module provides functions for configuration, read/write nodes attributes and discovery of a OPC UA Client.

  `OpcUA.Client` is implemented as a `__using__` macro so that you can put it in any module,
  you can initialize your Client manually (see `test/client_tests`) or by overwriting
  `configuration/1` and `monitored_items/1` to autoset the configuration and subscription items. It also helps you to
  handle Client's "subscription" events (monitorItems) by overwriting `handle_subscription/2` callback.

  The following example shows a module that takes its configuration from the environment (see `test/client_tests/terraform_test.exs`):

  ```elixir
  defmodule MyClient do
    use OpcUA.Client

    # Use the `init` function to configure your Client.
    def init({parent_pid, 103} = _user_init_state, opc_ua_client_pid) do
      %{parent_pid: parent_pid, opc_ua_client_pid: opc_ua_client_pid}
    end

    def configuration(_user_init_state), do: Application.get_env(:my_client, :configuration, [])
    def monitored_items(_user_init_state), do: Application.get_env(:my_client, :monitored_items, [])

    def handle_subscription_timeout(subscription_id, state) do
      send(state.parent_pid, {:subscription_timeout, subscription_id})
      state
    end

    def handle_deleted_subscription(subscription_id, state) do
      send(state.parent_pid, {:subscription_delete, subscription_id})
      state
    end

    def handle_monitored_data(changed_data_event, state) do
      send(state.parent_pid, {:value_changed, changed_data_event})
      state
    end

    def handle_deleted_monitored_item(subscription_id, monitored_id, state) do
      send(state.parent_pid, {:item_deleted, {subscription_id, monitored_id}})
      state
    end
  end
  ```

  Because it is small a GenServer, it accepts the same [options](https://hexdocs.pm/elixir/GenServer.html#module-how-to-supervise) for supervision
  to configure the child spec and passes them along to `GenServer`:

  ```elixir
  defmodule MyModule do
    use OpcUA.Client, restart: :transient, shutdown: 10_000
  end
  ```
  """

  @type conn_params ::
          {:hostname, binary()}
          | {:port, non_neg_integer()}
          | {:users, keyword()}

  @type config_options ::
          {:config, map()}
          | {:conn, conn_params}

  @doc """
  Optional callback that gets the Server configuration and discovery connection parameters.
  """
  @callback configuration(term()) :: config_options

  # TODO:
  @type monitored_items_options ::
          {:subscription, float()}
          | {:monitored_item, %OpcUA.MonitoredItem{}}

  @callback monitored_items(term()) :: monitored_items_options

  @doc """
  Optional callback that handles node values updates from a Client to a Server.

  It's first argument is a tuple, in which its first element is the `subscription_id`
  of the subscription that the monitored item belongs to. the second element
  is the 'monitored_item_id' which is an unique number asigned to a monitored item when
  its created and the third element of the tuple is the new value of the monitored item.

  the second argument it's the GenServer state (Parent process).
  """
  @callback handle_monitored_data({integer(), integer(), any()}, term()) :: term()

  @doc """
  Optional callback that handles a deleted monitored items events.

  It's first argument is the `subscription_id` of the subscription that the monitored
  item belongs to. The second element is the 'monitored_item_id' which is an unique
  number asigned to a monitored item when its created.

  The third argument it's the GenServer state (Parent process).
  """
  @callback handle_deleted_monitored_item(integer(), integer(), term()) :: term()

  @doc """
  Optional callback that handles a subscriptions timeout events.

  It's first argument is the `subscription_id` of the subscription.

  The second argument it's the GenServer state (Parent process).
  """
  @callback handle_subscription_timeout(integer(), term()) :: term()

  @doc """
  Optional callback that handles a subscriptions timeout events.

  It's first argument is the `subscription_id` of the subscription.

  The second argument it's the GenServer state (Parent process).
  """
  @callback handle_deleted_subscription(integer(), term()) :: term()

  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      use GenServer, Keyword.drop(opts, [:configuration])
      @behaviour OpcUA.Client
      @mix_env Mix.env()

      alias __MODULE__

      def start_link(user_initial_params \\ []) do
        GenServer.start_link(__MODULE__, user_initial_params, unquote(opts))
      end

      @impl true
      def init(user_initial_params) do
        send(self(), :init)
        {:ok, user_initial_params}
      end

      @impl true
      def handle_info(:init, user_initial_params) do
        # Client Terraform
        {:ok, c_pid} = OpcUA.Client.start_link()

        configuration = apply(__MODULE__, :configuration, [user_initial_params])
        monitored_items = apply(__MODULE__, :monitored_items, [user_initial_params])

        #OpcUA.Client.set_config(c_pid)

        # configutation = [config: list(), connection: list()]
        set_client_config(c_pid, configuration, :config)
        set_client_config(c_pid, configuration, :conn)

        # monitored_tiems = [subscription: 100.3, monitored_item: %MonitoredItem{}, ...]
        set_client_monitored_items(c_pid, monitored_items)

        # User initialization.
        user_state = apply(__MODULE__, :init, [user_initial_params, c_pid])

        {:noreply, user_state}
      end

      def handle_info({:timeout, subscription_id}, state) do
        state = apply(__MODULE__, :handle_subscription_timeout, [subscription_id, state])
        {:noreply, state}
      end

      def handle_info({:delete, subscription_id}, state) do
        state = apply(__MODULE__, :handle_deleted_subscription, [subscription_id, state])
        {:noreply, state}
      end

      def handle_info({:data, subscription_id, monitored_id, value}, state) do
        state =
          apply(__MODULE__, :handle_monitored_data, [
            {subscription_id, monitored_id, value},
            state
          ])

        {:noreply, state}
      end

      def handle_info({:delete, subscription_id, monitored_id}, state) do
        state =
          apply(__MODULE__, :handle_deleted_monitored_item, [subscription_id, monitored_id, state])

        {:noreply, state}
      end

      @impl true
      def handle_subscription_timeout(subscription_id, state) do
        require Logger

        Logger.warn(
          "No handle_subscription_timeout/2 clause in #{__MODULE__} provided for #{
            inspect(subscription_id)
          }"
        )

        state
      end

      @impl true
      def handle_deleted_subscription(subscription_id, state) do
        require Logger

        Logger.warn(
          "No handle_deleted_subscription/2 clause in #{__MODULE__} provided for #{
            inspect(subscription_id)
          }"
        )

        state
      end

      @impl true
      def handle_monitored_data(changed_data_event, state) do
        require Logger

        Logger.warn(
          "No handle_monitored_data/2 clause in #{__MODULE__} provided for #{
            inspect(changed_data_event)
          }"
        )

        state
      end

      @impl true
      def handle_deleted_monitored_item(subscription_id, monitored_id, state) do
        require Logger

        Logger.warn(
          "No handle_deleted_monitored_item/3 clause in #{__MODULE__} provided for #{
            inspect({subscription_id, monitored_id})
          }"
        )

        state
      end

      @impl true
      def configuration(_user_init_state), do: []

      @impl true
      def monitored_items(_user_init_state), do: []

      defp set_client_config(c_pid, configuration, type) do
        config_params = Keyword.get(configuration, type, [])

        Enum.each(config_params, fn config_param ->
          if(@mix_env != :test) do
            GenServer.call(c_pid, {type, config_param})
          else
            # Valgrind
            GenServer.call(c_pid, {type, config_param}, :infinity)
          end
        end)
      end

      defp set_client_monitored_items(c_pid, monitored_items) do
        Enum.each(monitored_items, fn {item_type, monitored_item} ->
          item_args = get_monitored_item_args(monitored_item)
          GenServer.call(c_pid, {:subscription, {item_type, item_args}})
        end)
      end

      defp get_monitored_item_args(monitored_item) when is_float(monitored_item),
        do: monitored_item

      defp get_monitored_item_args(monitored_item) when is_struct(monitored_item),
        do: monitored_item[:args]

      defoverridable start_link: 0,
                     start_link: 1,
                     configuration: 1,
                     monitored_items: 1,
                     handle_subscription_timeout: 2,
                     handle_deleted_subscription: 2,
                     handle_monitored_data: 2,
                     handle_deleted_monitored_item: 3
    end
  end

  # Configuration & Lifecycle functions

  @doc """
    Starts up a OPC UA Client GenServer.
  """
  @spec start_link(term(), list()) :: {:ok, pid} | {:error, term} | {:error, :einval}
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, {args, self()}, opts)
  end

  @doc """
    Stops a OPC UA Client GenServer.
  """
  @spec stop(GenServer.server()) :: :ok
  def stop(pid) do
    GenServer.stop(pid)
  end

  @doc """
    Gets the state of the OPC UA Client.
  """
  @spec get_state(GenServer.server()) :: {:ok, binary()} | {:error, term} | {:error, :einval}
  def get_state(pid) do
    GenServer.call(pid, {:config, {:get_state, nil}})
  end

  @doc """
    Sets the OPC UA Client configuration.
  """
  @spec set_config(GenServer.server(), map()) :: :ok | {:error, term} | {:error, :einval}
  def set_config(pid, args \\ %{}) when is_map(args) do
    GenServer.call(pid, {:config, {:set_config, args}})
  end

  @doc """
    Sets the OPC UA Client configuration with all security policies for the given certificates.
    The following must be filled:
      * `:private_key` -> binary() or function().
      * `:certificate` -> binary() or function().
      * `:security_mode` -> interger().
    NOTE: [none: 1, sign: 2, sign_and_encrypt: 3]
  """
  @spec set_config_with_certs(GenServer.server(), list()) :: :ok | {:error, term} | {:error, :einval}
  def set_config_with_certs(pid, args) when is_list(args) do
    if(@mix_env != :test) do
      GenServer.call(pid, {:config, {:set_config_with_certs, args}})
    else
      # Valgrind
      GenServer.call(pid, {:config, {:set_config_with_certs, args}}, :infinity)
    end
  end

  @doc """
    Gets the OPC UA Client current Configuration.
  """
  @spec get_config(GenServer.server()) :: {:ok, map()} | {:error, term} | {:error, :einval}
  def get_config(pid) do
    GenServer.call(pid, {:config, {:get_config, nil}})
  end

  @doc """
    Resets the OPC UA Client.
  """
  @spec reset(GenServer.server()) :: :ok | {:error, term} | {:error, :einval}
  def reset(pid) do
    GenServer.call(pid, {:config, {:reset_client, nil}})
  end

  # Connection functions

  @doc """
    Connects the OPC UA Client by a url.
    The following must be filled:
    * `:url` -> binary().
  """
  @spec connect_by_url(GenServer.server(), list()) :: :ok | {:error, term} | {:error, :einval}
  def connect_by_url(pid, args) when is_list(args) do
    if(@mix_env != :test) do
      GenServer.call(pid, {:conn, {:by_url, args}})
    else
      # Valgrind
      GenServer.call(pid, {:conn, {:by_url, args}}, :infinity)
    end
  end

  @doc """
    Connects the OPC UA Client by a url using a username and a password.
    The following must be filled:
    * `:url` -> binary().
    * `:user` -> binary().
    * `:password` -> binary().
  """
  @spec connect_by_username(GenServer.server(), list()) ::
          :ok | {:error, term} | {:error, :einval}
  def connect_by_username(pid, args) when is_list(args) do
    if(@mix_env != :test) do
      GenServer.call(pid, {:conn, {:by_username, args}})
    else
      # Valgrind
      GenServer.call(pid, {:conn, {:by_username, args}}, :infinity)
    end
  end

  @doc """
    Connects the OPC UA Client by a url without a session.
    The following must be filled:
    * `:url` -> binary().
  """
  @spec connect_no_session(GenServer.server(), list()) :: :ok | {:error, term} | {:error, :einval}
  def connect_no_session(pid, args) when is_list(args) do
    GenServer.call(pid, {:conn, {:no_session, args}})
  end

  @doc """
    Disconnects the OPC UA Client.
  """
  @spec disconnect(GenServer.server()) :: :ok | {:error, term} | {:error, :einval}
  def disconnect(pid) do
    GenServer.call(pid, {:conn, {:disconnect, nil}})
  end

  # Discovery functions

  @doc """
    Finds Servers Connected to a Discovery Server.
    The following must be filled:
    * `:url` -> binary().
  """
  @spec find_servers_on_network(GenServer.server(), binary()) ::
          :ok | {:error, term} | {:error, :einval}
  def find_servers_on_network(pid, url) when is_binary(url) do
    GenServer.call(pid, {:discovery, {:find_servers_on_network, url}})
  end

  @doc """
    Finds Servers Connected to a Discovery Server.
    The following must be filled:
    * `:url` -> binary().
  """
  @spec find_servers(GenServer.server(), binary()) :: :ok | {:error, term} | {:error, :einval}
  def find_servers(pid, url) when is_binary(url) do
    GenServer.call(pid, {:discovery, {:find_servers, url}})
  end

  @doc """
    Get endpoints from a OPC UA Server.
    The following must be filled:
    * `:url` -> binary().
  """
  @spec get_endpoints(GenServer.server(), binary()) :: :ok | {:error, term} | {:error, :einval}
  def get_endpoints(pid, url) when is_binary(url) do
    GenServer.call(pid, {:discovery, {:get_endpoints, url}})
  end

  # Subscriptions and Monitored Items functions.

  @doc """
    Sends an OPC UA Server request to start subscription (to monitored items, events, etc).
  """
  @spec add_subscription(GenServer.server()) ::
          {:ok, integer()} | {:error, term} | {:error, :einval}
  def add_subscription(pid, publishing_interval \\ 500.0) when is_float(publishing_interval) do
    GenServer.call(pid, {:subscription, {:subscription, publishing_interval}})
  end

  @doc """
    Sends an OPC UA Server request to delete a subscription.
  """
  @spec delete_subscription(GenServer.server(), integer()) ::
          :ok | {:error, term} | {:error, :einval}
  def delete_subscription(pid, subscription_id) when is_integer(subscription_id) do
    GenServer.call(pid, {:subscription, {:delete, subscription_id}})
  end

  @doc """
    Adds a monitored item used to request a server for notifications of each change of value in a specific node.
    The following option must be filled:
    * `:subscription_id` -> integer().
    * `:monitored_item` -> %NodeId{}.
  """
  @spec add_monitored_item(GenServer.server(), list()) ::
          {:ok, integer()} | {:error, term} | {:error, :einval}
  def add_monitored_item(pid, args) when is_list(args) do
    GenServer.call(pid, {:subscription, {:monitored_item, args}})
  end

  @doc """
    Adds a monitored item used to request a server for notifications of each change of value in a specific node.
    The following option must be filled:
    * `:subscription_id` -> integer().
    * `:monitored_item_id` -> integer().
  """
  @spec delete_monitored_item(GenServer.server(), list()) ::
          :ok | {:error, term} | {:error, :einval}
  def delete_monitored_item(pid, args) when is_list(args) do
    GenServer.call(pid, {:subscription, {:delete_monitored_item, args}})
  end

  # Read nodes Attributes

  @doc """
    Reads 'user_write_mask' attribute of a node in the server.
  """
  @spec read_node_user_write_mask(GenServer.server(), %NodeId{}) ::
          :ok | {:error, binary()} | {:error, :einval}
  def read_node_user_write_mask(pid, %NodeId{} = node_id) do
    GenServer.call(pid, {:read, {:user_write_mask, node_id}})
  end

  @doc """
    Reads 'user_access_level' attribute of a node in the server.
  """
  @spec read_node_user_access_level(GenServer.server(), %NodeId{}) ::
          :ok | {:error, binary()} | {:error, :einval}
  def read_node_user_access_level(pid, %NodeId{} = node_id) do
    GenServer.call(pid, {:read, {:user_access_level, node_id}})
  end

  @doc """
    Reads 'user_executable' attribute of a node in the server.
  """
  @spec read_node_user_executable(GenServer.server(), %NodeId{}) ::
          :ok | {:error, binary()} | {:error, :einval}
  def read_node_user_executable(pid, %NodeId{} = node_id) do
    GenServer.call(pid, {:read, {:user_executable, node_id}})
  end

  # Write nodes Attributes

  @doc """
    Change 'node_id' attribute of a node in the server.
  """
  @spec write_node_node_id(GenServer.server(), %NodeId{}, %NodeId{}) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_node_id(pid, %NodeId{} = node_id, %NodeId{} = new_node_id) do
    GenServer.call(pid, {:write, {:node_id, node_id, new_node_id}})
  end

  @doc """
    Change 'symmetric' attribute of a node in the server.
  """
  @spec write_node_symmetric(GenServer.server(), %NodeId{}, boolean()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_symmetric(pid, %NodeId{} = node_id, symmetric) when is_boolean(symmetric) do
    GenServer.call(pid, {:write, {:symmetric, node_id, symmetric}})
  end

  @doc """
    Change 'node_class' attribute of a node in the server.
    Avalable value are:
      UNSPECIFIED = 0,
      OBJECT = 1,
      VARIABLE = 2,
      METHOD = 4,
      OBJECTTYPE = 8,
      VARIABLETYPE = 16,
      REFERENCETYPE = 32,
      DATATYPE = 64,
      VIEW = 128,
  """
  @spec write_node_node_class(GenServer.server(), %NodeId{}, integer()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_node_class(pid, %NodeId{} = node_id, node_class)
      when node_class in [0, 1, 2, 4, 8, 16, 32, 64, 128] do
    GenServer.call(pid, {:write, {:node_class, node_id, node_class}})
  end

  @doc """
    Change 'user_write_mask' attribute of a node in the server.
  """
  @spec write_node_user_write_mask(GenServer.server(), %NodeId{}, integer()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_user_write_mask(pid, %NodeId{} = node_id, user_write_mask)
      when is_integer(user_write_mask) do
    GenServer.call(pid, {:write, {:user_write_mask, node_id, user_write_mask}})
  end

  @doc """
    Change 'contains_no_loops' attribute of a node in the server.
  """
  @spec write_node_contains_no_loops(GenServer.server(), %NodeId{}, boolean()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_contains_no_loops(pid, %NodeId{} = node_id, contains_no_loops)
      when is_boolean(contains_no_loops) do
    GenServer.call(pid, {:write, {:contains_no_loops, node_id, contains_no_loops}})
  end

  @doc """
    Change 'user_access_level' attribute of a node in the server.
  """
  @spec write_node_user_access_level(GenServer.server(), %NodeId{}, integer()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_user_access_level(pid, %NodeId{} = node_id, user_access_level)
      when is_integer(user_access_level) do
    GenServer.call(pid, {:write, {:user_access_level, node_id, user_access_level}})
  end

  @doc """
    Change 'user_executable' attribute of a node in the server.
  """
  @spec write_node_user_executable(GenServer.server(), %NodeId{}, boolean()) ::
          :ok | {:error, binary()} | {:error, :einval}
  def write_node_user_executable(pid, %NodeId{} = node_id, user_executable)
      when is_boolean(user_executable) do
    GenServer.call(pid, {:write, {:user_executable, node_id, user_executable}})
  end

  @doc false
  def command(pid, request) do
    GenServer.call(pid, request)
  end

  # Handlers
  def init({_args, controlling_process}) do
    lib_dir =
      :opex62541
      |> :code.priv_dir()
      |> to_string()
      |> set_ld_library_path()

    executable = lib_dir <> "/opc_ua_client"

    port = open_port(executable, use_valgrind?())

    state = %State{port: port, controlling_process: controlling_process}
    {:ok, state}
  end

  # Lifecycle Handlers

  def handle_call({:config, {:get_state, nil}}, caller_info, state) do
    call_port(state, :get_client_state, caller_info, nil)
    {:noreply, state}
  end

  def handle_call({:config, {:set_config, args}}, caller_info, state) do
    c_args =
      Enum.reduce(args, %{}, fn {key, value}, acc ->
        if is_nil(value) or key not in @config_keys do
          acc
        else
          Map.put(acc, key, value)
        end
      end)

    call_port(state, :set_client_config, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:config, {:get_config, nil}}, caller_info, state) do
    call_port(state, :get_client_config, caller_info, nil)
    {:noreply, state}
  end

  def handle_call({:config, {:reset_client, nil}}, caller_info, state) do
    call_port(state, :reset_client, caller_info, nil)
    {:noreply, state}
  end

  # Encryption

  def handle_call({:config, {:set_config_with_certs, args}}, caller_info, state) do
    with  cert <- Keyword.fetch!(args, :certificate),
          pkey <- Keyword.fetch!(args, :private_key),
          security_mode <- Keyword.get(args, :security_mode, 1),
          certificate <- get_binary_data(cert),
          private_key <- get_binary_data(pkey),
          true <- is_binary(certificate),
          true <- is_binary(private_key),
          true <- security_mode in [1, 2, 3] do
      c_args = {security_mode, certificate, private_key}
      call_port(state, :set_config_with_security_policies, caller_info, c_args)
      {:noreply, state}
    else
      _ ->
        {:reply, {:error, :einval} ,state}
    end
  end

  # Connect to a Server Handlers

  def handle_call({:conn, {:by_url, args}}, caller_info, state) do
    url = Keyword.fetch!(args, :url)
    call_port(state, :connect_client_by_url, caller_info, url)
    {:noreply, state}
  end

  def handle_call({:conn, {:by_username, args}}, caller_info, state) do
    url = Keyword.fetch!(args, :url)
    username = Keyword.fetch!(args, :user)
    password = Keyword.fetch!(args, :password)

    c_args = {url, username, password}
    call_port(state, :connect_client_by_username, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:conn, {:no_session, args}}, caller_info, state) do
    url = Keyword.fetch!(args, :url)
    call_port(state, :connect_client_no_session, caller_info, url)
    {:noreply, state}
  end

  def handle_call({:conn, {:disconnect, nil}}, caller_info, state) do
    call_port(state, :disconnect_client, caller_info, nil)
    {:noreply, state}
  end

  # Discovery Handlers.

  def handle_call({:discovery, {:find_servers_on_network, url}}, caller_info, state) do
    call_port(state, :find_servers_on_network, caller_info, url)
    {:noreply, state}
  end

  def handle_call({:discovery, {:find_servers, url}}, caller_info, state) do
    call_port(state, :find_servers, caller_info, url)
    {:noreply, state}
  end

  def handle_call({:discovery, {:get_endpoints, url}}, caller_info, state) do
    call_port(state, :get_endpoints, caller_info, url)
    {:noreply, state}
  end

  # Subscriptions and Monitored Items functions.

  def handle_call({:subscription, {:subscription, publishing_interval}}, caller_info, state) do
    call_port(state, :add_subscription, caller_info, publishing_interval)
    {:noreply, state}
  end

  def handle_call({:subscription, {:delete, subscription_id}}, caller_info, state) do
    call_port(state, :delete_subscription, caller_info, subscription_id)
    {:noreply, state}
  end

  def handle_call({:subscription, {:monitored_item, args}}, caller_info, state) do
    with monitored_item <- Keyword.fetch!(args, :monitored_item) |> to_c(),
         subscription_id <- Keyword.fetch!(args, :subscription_id),
         sampling_time <- Keyword.get(args, :sampling_time, 250.0),
         true <- is_integer(subscription_id),
         true <- is_float(sampling_time) do
      c_args = {monitored_item, subscription_id, sampling_time}
      call_port(state, :add_monitored_item, caller_info, c_args)
      {:noreply, state}
    else
      _ ->
        {:reply, {:error, :einval}, state}
    end
  end

  def handle_call({:subscription, {:delete_monitored_item, args}}, caller_info, state) do
    with monitored_item_id <- Keyword.fetch!(args, :monitored_item_id),
         subscription_id <- Keyword.fetch!(args, :subscription_id),
         true <- is_integer(monitored_item_id),
         true <- is_integer(subscription_id) do
      c_args = {subscription_id, monitored_item_id}
      call_port(state, :delete_monitored_item, caller_info, c_args)
      {:noreply, state}
    else
      _ ->
        {:reply, {:error, :einval}, state}
    end
  end

  # Write nodes Attributes

  def handle_call({:read, {:user_write_mask, node_id}}, caller_info, state) do
    c_args = to_c(node_id)
    call_port(state, :read_node_user_write_mask, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:read, {:user_access_level, node_id}}, caller_info, state) do
    c_args = to_c(node_id)
    call_port(state, :read_node_user_access_level, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:read, {:user_executable, node_id}}, caller_info, state) do
    c_args = to_c(node_id)
    call_port(state, :read_node_user_executable, caller_info, c_args)
    {:noreply, state}
  end

  # Write nodes Attributes

  def handle_call({:write, {:node_id, node_id, new_node_id}}, caller_info, state) do
    c_args = {to_c(node_id), to_c(new_node_id)}
    call_port(state, :write_node_node_id, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:node_class, node_id, node_class}}, caller_info, state) do
    c_args = {to_c(node_id), node_class}
    call_port(state, :write_node_node_class, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:user_write_mask, node_id, user_write_mask}}, caller_info, state) do
    c_args = {to_c(node_id), user_write_mask}
    call_port(state, :write_node_user_write_mask, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:symmetric, node_id, symmetric}}, caller_info, state) do
    c_args = {to_c(node_id), symmetric}
    call_port(state, :write_node_symmetric, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:contains_no_loops, node_id, contains_no_loops}}, caller_info, state) do
    c_args = {to_c(node_id), contains_no_loops}
    call_port(state, :write_node_contains_no_loops, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:user_access_level, node_id, user_access_level}}, caller_info, state) do
    c_args = {to_c(node_id), user_access_level}
    call_port(state, :write_node_user_access_level, caller_info, c_args)
    {:noreply, state}
  end

  def handle_call({:write, {:user_executable, node_id, user_executable}}, caller_info, state) do
    c_args = {to_c(node_id), user_executable}
    call_port(state, :write_node_user_executable, caller_info, c_args)
    {:noreply, state}
  end

  # Catch all

  def handle_call(invalid_call, _caller_info, state) do
    Logger.error("#{__MODULE__} Invalid call: #{inspect(invalid_call)}")
    {:reply, {:error, :einval}, state}
  end

  def handle_info({_port, {:exit_status, code}}, state) do
    Logger.warn("(#{__MODULE__}) Error code: #{inspect(code)}.")
    # retrying delay
    Process.sleep(@c_timeout)
    {:stop, :restart, state}
  end

  def handle_info({:EXIT, _port, reason}, state) do
    Logger.debug("(#{__MODULE__}) Exit reason: #{inspect(reason)}")
    # retrying delay
    Process.sleep(@c_timeout)
    {:stop, :restart, state}
  end

  def handle_info(msg, state) do
    Logger.warn("(#{__MODULE__}) Unhandled message: #{inspect(msg)}.")
    {:noreply, state}
  end

  # Subscription C message handlers

  defp handle_c_response(
         {:subscription, {:data, subscription_id, monitored_id, c_value}},
         %{controlling_process: c_pid} = state
       ) do
    value = parse_c_value(c_value)
    send(c_pid, {:data, subscription_id, monitored_id, value})
    state
  end

  defp handle_c_response(
         {:subscription, message},
         %{controlling_process: c_pid} = state
       ) do
    send(c_pid, message)
    state
  end

  # Lifecycle C Handlers

  defp handle_c_response({:get_client_state, caller_metadata, client_state}, state) do
    str_client_state = charlist_to_string(client_state)
    GenServer.reply(caller_metadata, str_client_state)
    state
  end

  defp handle_c_response({:set_client_config, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:get_client_config, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:reset_client, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  # Encryption Handlers

  defp handle_c_response({:set_config_with_security_policies, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  # Connect to a Server C Handlers

  defp handle_c_response({:connect_client_by_url, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:connect_client_by_username, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:connect_client_no_session, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:disconnect_client, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  # Discovery functions C Handlers

  defp handle_c_response({:find_servers_on_network, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:find_servers, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:get_endpoints, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  # Subscriptions and Monitored Items functions.

  defp handle_c_response({:add_subscription, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:delete_subscription, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:add_monitored_item, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  defp handle_c_response({:delete_monitored_item, caller_metadata, c_response}, state) do
    GenServer.reply(caller_metadata, c_response)
    state
  end

  # Read nodes Attributes

  defp handle_c_response({:read_node_user_write_mask, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:read_node_user_access_level, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:read_node_user_executable, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end


  # Write nodes Attributes

  defp handle_c_response({:write_node_node_id, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_node_class, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_user_write_mask, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_symmetric, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_contains_no_loops, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_user_access_level, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end

  defp handle_c_response({:write_node_user_executable, caller_metadata, data}, state) do
    GenServer.reply(caller_metadata, data)
    state
  end
end