lib/components/recomposer.ex

defmodule ALF.Components.Recomposer do
  use ALF.Components.Basic

  defstruct Basic.common_attributes() ++
              [
                type: :recomposer,
                module: nil,
                function: nil,
                opts: [],
                source_code: nil,
                collected_ips: [],
                new_collected_ips: %{}
              ]

  alias ALF.{DSLError, IP, ErrorIP}

  @dsl_options [:name, :opts]

  @spec start_link(t()) :: GenServer.on_start()
  def start_link(%__MODULE__{} = state) do
    GenStage.start_link(__MODULE__, state)
  end

  @impl true
  def init(state) do
    state = %{
      state
      | pid: self(),
        opts: init_opts(state.module, state.opts),
        source_code: state.source_code || read_source_code(state.module, state.function)
    }

    {:producer_consumer, state, subscribe_to: state.subscribe_to}
  end

  def init_sync(state, telemetry_enabled) do
    %{
      state
      | pid: make_ref(),
        opts: init_opts(state.module, state.opts),
        source_code: state.source_code || read_source_code(state.module, state.function),
        telemetry_enabled: telemetry_enabled
    }
  end

  @impl true
  def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry_enabled: true} = state) do
    :telemetry.span(
      [:alf, :component],
      telemetry_data(ip, state),
      fn ->
        case process_ip(ip, state) do
          {nil, state} ->
            {{:noreply, [], state}, telemetry_data(nil, state)}

          {%IP{} = ip, state} ->
            {{:noreply, [ip], state}, telemetry_data(ip, state)}

          {%ErrorIP{} = error_ip, state} ->
            {{:noreply, [], state}, telemetry_data(error_ip, state)}
        end
      end
    )
  end

  def handle_events([%ALF.IP{} = ip], _from, %__MODULE__{telemetry_enabled: false} = state) do
    case process_ip(ip, state) do
      {nil, state} ->
        {:noreply, [], state}

      {%IP{} = ip, state} ->
        {:noreply, [ip], state}

      {%ErrorIP{}, state} ->
        {:noreply, [], state}
    end
  end

  defp process_ip(current_ip, state) do
    collected_data =
      Enum.map(Map.get(state.new_collected_ips, current_ip.stream_ref, []), & &1.event)

    case call_function(
           state.module,
           state.function,
           current_ip.event,
           collected_data,
           state.opts
         ) do
      {:ok, :continue} ->
        send_result(current_ip, :destroyed)

        collected = Map.get(state.new_collected_ips, current_ip.stream_ref, []) ++ [current_ip]

        {nil,
         %{
           state
           | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected)
         }}

      {:ok, {nil, events}} ->
        send_result(current_ip, :destroyed)

        collected =
          Enum.map(events, fn event ->
            build_ip(event, current_ip, [{state.name, current_ip.event} | current_ip.history])
          end)

        {nil,
         %{
           state
           | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected)
         }}

      {:ok, {event, events}} ->
        ip = build_ip(event, current_ip, [{state.name, current_ip.event} | current_ip.history])

        send_result(ip, :created_recomposer)

        collected =
          Enum.map(events, fn event ->
            build_ip(event, ip, [{state.name, ip.event} | ip.history])
          end)

        {ip,
         %{
           state
           | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, collected)
         }}

      {:ok, event} ->
        ip = build_ip(event, current_ip, [{state.name, current_ip.event} | current_ip.history])

        send_result(ip, :created_recomposer)

        {ip,
         %{
           state
           | new_collected_ips: Map.put(state.new_collected_ips, current_ip.stream_ref, [])
         }}

      {:error, error, stacktrace} ->
        error_ip = send_error_result(current_ip, error, stacktrace, state)
        {error_ip, state}
    end
  end

  def sync_process(ip, %__MODULE__{telemetry_enabled: false} = state) do
    do_sync_process(ip, state)
  end

  def sync_process(ip, %__MODULE__{telemetry_enabled: true} = state) do
    :telemetry.span(
      [:alf, :component],
      telemetry_data(ip, state),
      fn ->
        ip = do_sync_process(ip, state)
        {ip, telemetry_data(ip, state)}
      end
    )
  end

  defp do_sync_process(ip, state) do
    collected_ips = get_from_process_dict({state.pid, ip.stream_ref})
    collected_data = Enum.map(collected_ips, & &1.event)

    case call_function(
           state.module,
           state.function,
           ip.event,
           collected_data,
           state.opts
         ) do
      {:ok, :continue} ->
        collected_ips = collected_ips ++ [ip]
        put_to_process_dict({state.pid, ip.stream_ref}, collected_ips)
        nil

      {:ok, {nil, events}} ->
        collected =
          Enum.map(events, fn event ->
            build_ip(event, ip, [{state.name, ip.event} | ip.history])
          end)

        put_to_process_dict({state.pid, ip.stream_ref}, collected)
        nil

      {:ok, {event, events}} ->
        ip = build_ip(event, ip, [{state.name, ip.event} | ip.history])

        collected =
          Enum.map(events, fn event ->
            build_ip(event, ip, [{state.name, ip.event} | ip.history])
          end)

        put_to_process_dict({state.pid, ip.stream_ref}, collected)
        ip

      {:ok, event} ->
        put_to_process_dict({state.pid, ip.stream_ref}, [])
        build_ip(event, ip, [{state.name, ip.event} | ip.history])

      {:error, error, stacktrace} ->
        send_error_result(ip, error, stacktrace, state)
    end
  end

  defp build_ip(event, ip, history) do
    %IP{
      stream_ref: ip.stream_ref,
      ref: ip.ref,
      destination: ip.destination,
      init_event: event,
      event: event,
      manager_name: ip.manager_name,
      recomposed: true,
      history: history,
      sync_path: ip.sync_path
    }
  end

  def validate_options(name, options) do
    wrong_options = Keyword.keys(options) -- @dsl_options

    unless is_atom(name) do
      raise DSLError, "Recomposer name must be an atom: #{inspect(name)}"
    end

    if Enum.any?(wrong_options) do
      raise DSLError,
            "Wrong options for the #{name} recomposer: #{inspect(wrong_options)}. " <>
              "Available options are #{inspect(@dsl_options)}"
    end
  end

  defp call_function(module, function, event, collected_data, opts)
       when is_atom(module) and is_atom(function) do
    {:ok, apply(module, function, [event, collected_data, opts])}
  rescue
    error ->
      {:error, error, __STACKTRACE__}
  catch
    kind, value ->
      {:error, kind, value}
  end

  defp get_from_process_dict(key), do: Process.get(key, [])

  defp put_to_process_dict(key, ips), do: Process.put(key, ips)
end