lib/components/stage.ex

defmodule ALF.Components.Stage do
  use ALF.Components.Basic

  defstruct Basic.common_attributes() ++
              [
                type: :stage,
                module: nil,
                function: nil,
                count: 1,
                number: 0,
                stage_set_ref: nil,
                opts: [],
                source_code: nil
              ]

  alias ALF.{Manager.Streamer, DoneStatement, DSLError}

  @dsl_options [:opts, :count, :name]

  def start_link(%__MODULE__{} = state) do
    GenStage.start_link(__MODULE__, state)
  end

  def init(state) do
    state = %{
      state
      | pid: self(),
        opts: init_opts(state.module, state.opts),
        source_code: read_source_code(state.module, state.function),
        subscribers: []
    }

    {:producer_consumer, state, subscribe_to: state.subscribe_to}
  end

  def inc_count(%__MODULE__{pid: pid}), do: GenStage.call(pid, :inc_count)
  def dec_count(%__MODULE__{pid: pid}), do: GenStage.call(pid, :dec_count)

  def handle_call(:inc_count, _from, state) do
    state = %{state | count: state.count + 1}
    {:reply, state, [], state}
  end

  def handle_call(:dec_count, _from, state) do
    state = %{state | count: state.count - 1}
    {:reply, state, [], state}
  end

  def handle_events([%IP{} = ip], _from, %__MODULE__{telemetry_enabled: true} = state) do
    :telemetry.span(
      [:alf, :component],
      telemetry_data(ip, state),
      fn ->
        case process_ip(ip, state) do
          %IP{done!: false} = ip ->
            {{:noreply, [ip], state}, telemetry_data(ip, state)}

          %IP{done!: true} = ip ->
            {{:noreply, [], state}, telemetry_data(ip, state)}

          %ErrorIP{} = ip ->
            {{:noreply, [], state}, telemetry_data(ip, state)}
        end
      end
    )
  end

  def handle_events([%IP{} = ip], _from, %__MODULE__{telemetry_enabled: false} = state) do
    case process_ip(ip, state) do
      %IP{done!: false} = ip ->
        {:noreply, [ip], state}

      %IP{done!: true} ->
        {:noreply, [], state}

      %ErrorIP{} ->
        {:noreply, [], state}
    end
  end

  def validate_options(atom, options) do
    wrong_options = Keyword.keys(options) -- @dsl_options

    unless is_atom(atom) do
      raise DSLError, "Stage must be an atom: #{inspect(atom)}"
    end

    if Enum.any?(wrong_options) do
      raise DSLError,
            "Wrong options for the #{atom} stage: #{inspect(wrong_options)}. " <>
              "Available options are #{inspect(@dsl_options)}"
    end
  end

  defp process_ip(ip, state) do
    ip = %{ip | history: [{{state.name, state.number}, ip.event} | ip.history]}

    case try_apply(ip.event, {state.module, state.function, state.opts}) do
      {:ok, new_event} ->
        %{ip | event: new_event}

      {:error, %DoneStatement{event: event}, _stacktrace} ->
        ip = %{ip | event: event, done!: true}
        Streamer.cast_result_ready(ip.manager_name, ip)
        ip

      {:error, error, stacktrace} ->
        send_error_result(ip, error, stacktrace, state)
    end
  end

  defp try_apply(event, {module, function, opts}) do
    new_datum = apply(module, function, [event, opts])
    {:ok, new_datum}
  rescue
    error ->
      {:error, error, __STACKTRACE__}
  catch
    kind, value ->
      {:error, kind, value}
  end
end