lib/components/unplug.ex

defmodule ALF.Components.Unplug do
  use ALF.Components.Basic

  defstruct Basic.common_attributes() ++
              [
                type: :unplug,
                module: nil,
                opts: [],
                source_code: nil
              ]

  def start_link(%__MODULE__{} = state) do
    GenStage.start_link(__MODULE__, state)
  end

  def init(state) do
    state = %{
      state
      | pid: self(),
        opts: init_opts(state.module, state.opts),
        source_code: read_source_code(state.module, :unplug)
    }

    {:producer_consumer, state, subscribe_to: state.subscribe_to}
  end

  def handle_events([%IP{} = ip], _from, %__MODULE__{telemetry_enabled: true} = state) do
    :telemetry.span(
      [:alf, :component],
      telemetry_data(ip, state),
      fn ->
        case process_ip(ip, state) do
          %IP{} = ip ->
            {{:noreply, [ip], state}, telemetry_data(ip, state)}

          %ErrorIP{} = ip ->
            {{:noreply, [], state}, telemetry_data(ip, state)}
        end
      end
    )
  end

  def handle_events([%IP{} = ip], _from, %__MODULE__{telemetry_enabled: false} = state) do
    case process_ip(ip, state) do
      %IP{} = ip ->
        {:noreply, [ip], state}

      %ErrorIP{} ->
        {:noreply, [], state}
    end
  end

  defp process_ip(ip, state) do
    ip = %{ip | history: [{state.name, ip.event} | ip.history]}

    prev_event = Map.fetch!(ip.plugs, state.name)
    ip_plugs = Map.delete(ip.plugs, state.name)
    ip = %{ip | plugs: ip_plugs}

    case call_unplug_function(state.module, ip.event, prev_event, state.opts) do
      {:error, error, stacktrace} ->
        send_error_result(ip, error, stacktrace, state)

      new_datum ->
        %{ip | event: new_datum}
    end
  end

  defp call_unplug_function(module, event, prev_event, opts) do
    apply(module, :unplug, [event, prev_event, opts])
  rescue
    error ->
      {:error, error, __STACKTRACE__}
  end
end