lib/components/basic.ex

defmodule ALF.Components.Basic do
  alias ALF.{ErrorIP, Manager, IP}

  @common_attributes [
    name: nil,
    pid: nil,
    pipeline_module: nil,
    set_ref: nil,
    count: 1,
    number: 0,
    opts: [],
    subscribed_to: [],
    subscribers: [],
    telemetry: false
  ]

  def common_attributes, do: @common_attributes

  def build_component(component_module, atom, count, opts, current_module) do
    if module_exist?(atom) do
      struct(component_module, %{
        pipeline_module: current_module,
        name: atom,
        module: atom,
        function: :call,
        opts: opts || [],
        count: count
      })
    else
      struct(component_module, %{
        pipeline_module: current_module,
        name: atom,
        module: current_module,
        function: atom,
        opts: opts || [],
        count: count
      })
    end
  end

  def build_error_ip(ip, error, stacktrace, state) do
    %ErrorIP{
      ip: ip,
      pipeline_module: ip.pipeline_module,
      destination: ip.destination,
      ref: ip.ref,
      stream_ref: ip.stream_ref,
      error: error,
      debug: ip.debug,
      history: ip.history,
      stacktrace: stacktrace,
      component: state,
      plugs: ip.plugs
    }
  end

  def __state__(pid) when is_pid(pid) do
    GenStage.call(pid, :__state__)
  end

  def telemetry_data(nil, state) do
    %{ip: nil, component: component_telemetry_data(state)}
  end

  def telemetry_data(ips, state) when is_list(ips) do
    %{ips: Enum.map(ips, &ip_telemetry_data/1), component: component_telemetry_data(state)}
  end

  def telemetry_data(ip, state) do
    %{ip: ip_telemetry_data(ip), component: component_telemetry_data(state)}
  end

  defp ip_telemetry_data(ip) do
    Map.take(ip, [:type, :event, :ref, :error, :stacktrace])
  end

  defp component_telemetry_data(state) do
    Map.take(state, [:pid, :name, :number, :pipeline_module, :type, :set_ref])
  end

  defp module_exist?(module) do
    case Code.ensure_compiled(module) do
      {:module, ^module} ->
        true

      {:error, _} ->
        false
    end
  end

  defmacro __using__(_opts) do
    quote do
      use GenStage

      alias ALF.{Components.Basic, IP, ErrorIP, Manager.Streamer, SourceCode}

      @type t :: %__MODULE__{}

      def __state__(pid) when is_pid(pid), do: Basic.__state__(pid)

      def subscribers(pid) do
        GenStage.call(pid, :subscribers)
      end

      def init_opts(module, opts) do
        if function_exported?(module, :init, 1) do
          apply(module, :init, [opts])
        else
          opts
        end
      end

      @type result :: :composed | :destroyed

      @spec send_result(IP.t() | ErrorIP.t(), result | IP.t() | ErrorIP.t()) ::
              IP.t() | ErrorIP.t()
      def send_result(ip, result) do
        ref = if ip.stream_ref, do: ip.stream_ref, else: ip.ref
        if ip.destination, do: send(ip.destination, {ref, result})
        ip
      end

      @spec send_error_result(IP.t(), any, list, __MODULE__.t()) :: ErrorIP.t()
      def send_error_result(ip, error, stacktrace, state) do
        error_ip = build_error_ip(ip, error, stacktrace, state)
        send_result(error_ip, error_ip)
      end

      @impl true
      def handle_call(:subscribers, _form, state) do
        {:reply, state.subscribers, [], state}
      end

      def handle_call(:__state__, _from, state) do
        {:reply, state, [], state}
      end

      @impl true
      def handle_subscribe(:consumer, subscription_options, from, state) do
        subscribers = [{from, subscription_options} | state.subscribers]
        new_state = %{state | subscribers: subscribers}
        Manager.component_updated(new_state)
        {:automatic, new_state}
      end

      def handle_subscribe(:producer, subscription_options, from, state) do
        subscribed_to = [{from, subscription_options} | state.subscribed_to]
        new_state = %{state | subscribed_to: subscribed_to}
        Manager.component_updated(new_state)
        {:automatic, new_state}
      end

      @impl true
      def handle_cancel(_any, from, state) do
        subscribed_to = Enum.filter(state.subscribed_to, &(&1 != from))
        subscribers = Enum.filter(state.subscribers, &(&1 != from))
        state = %{state | subscribed_to: subscribed_to, subscribers: subscribers}
        Manager.component_updated(state)
        {:noreply, [], state}
      end

      def telemetry_data(ip, state), do: ALF.Components.Basic.telemetry_data(ip, state)

      def read_source_code(module) do
        SourceCode.module_source(module)
      rescue
        error ->
          inspect(error)
      end

      def read_source_code(module, :call) do
        SourceCode.module_source(module)
      rescue
        error ->
          inspect(error)
      end

      def read_source_code(module, function) do
        do_read_source_code(module, function)
      rescue
        error ->
          inspect(error)
      end

      def component_added(component) do
        Manager.component_added(component)
      end

      def history(ip, state) do
        if ip.debug do
          [{{state.name, state.number}, ip.event} | ip.history]
        else
          []
        end
      end

      defp do_read_source_code(module, function) do
        doc = SourceCode.function_doc(module, function)
        source = SourceCode.function_source(module, function)

        if doc do
          "@doc \"#{doc}\"\n#{source}"
        else
          source
        end
      end

      def build_error_ip(ip, error, stacktrace, state) do
        ALF.Components.Basic.build_error_ip(ip, error, stacktrace, state)
      end
    end
  end
end