lib/reactor/middleware/telemetry.ex

defmodule Reactor.Middleware.Telemetry do
  @moduledoc """
  A Reactor middleware that emits telemetry events.

  The following events are emitted:

  * `[:reactor, :run, :start]`
  * `[:reactor, :run, :stop]`
  * `[:reactor, :step, :run, :start]`
  * `[:reactor, :step, :run, :stop]`
  * `[:reactor, :step, :process, :start]`
  * `[:reactor, :step, :process, :stop]`
  * `[:reactor, :step, :compensate, :start]`
  * `[:reactor, :step, :compensate, :stop]`
  * `[:reactor, :step, :undo, :start]`
  * `[:reactor, :step, :undo, :stop]`
  """

  use Reactor.Middleware

  @doc false
  @impl true
  def init(context) do
    start = System.monotonic_time()

    metadata = context.__reactor__

    :telemetry.execute(
      [:reactor, :run, :start],
      %{system_time: System.system_time()},
      metadata
    )

    context =
      context
      |> Map.put(__MODULE__, %{
        start_time: start,
        metadata: metadata
      })

    {:ok, context}
  end

  @doc false
  @impl true
  def complete(result, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do
    end_time = System.monotonic_time()
    duration = end_time - start_time

    metadata =
      metadata
      |> Map.merge(%{
        status: :ok,
        result: result
      })

    :telemetry.execute(
      [:reactor, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )

    {:ok, result}
  end

  @doc false
  @impl true
  def error(error_or_errors, %{__MODULE__ => %{start_time: start_time, metadata: metadata}}) do
    end_time = System.monotonic_time()
    duration = end_time - start_time

    metadata =
      metadata
      |> Map.merge(%{
        status: :error,
        errors: error_or_errors
      })

    :telemetry.execute(
      [:reactor, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )

    :ok
  end

  @doc false
  @impl true
  def halt(%{__MODULE__ => %{start_time: start_time, metadata: metadata}} = context) do
    end_time = System.monotonic_time()
    duration = end_time - start_time

    metadata =
      metadata
      |> Map.merge(%{
        status: :halt
      })

    :telemetry.execute(
      [:reactor, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )

    {:ok, context}
  end

  @doc false
  @impl true
  def event({:process_start, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        pid: pid
      })

    start_time = System.monotonic_time()
    Process.put({__MODULE__, :process_start_time}, start_time)

    :telemetry.execute(
      [:reactor, :step, :process, :start],
      %{system_time: System.system_time()},
      metadata
    )
  end

  def event({:process_terminate, pid}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        pid: pid
      })

    start_time = Process.delete({__MODULE__, :process_start_time})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :process, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:run_start, arguments}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        arguments: arguments
      })

    start_time = System.monotonic_time()
    Process.put({__MODULE__, :step_start_time, step.name}, start_time)

    :telemetry.execute(
      [:reactor, :step, :run, :start],
      %{system_time: System.system_time()},
      metadata
    )
  end

  def event({:run_complete, result}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: result,
        status: :ok
      })

    start_time = Process.delete({__MODULE__, :step_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:run_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: errors,
        status: :error
      })

    start_time = Process.delete({__MODULE__, :step_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:run_halt, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: value,
        status: :halt
      })

    start_time = Process.delete({__MODULE__, :step_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:run_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: value,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :step_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event(:run_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :step_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :run, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:compensate_start, reason}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: reason,
        status: :compensate
      })

    start_time = System.monotonic_time()
    Process.put({__MODULE__, :compensate_start_time, step.name}, start_time)

    :telemetry.execute(
      [:reactor, :step, :compensate, :start],
      %{system_time: System.system_time()},
      metadata
    )
  end

  def event(:compensate_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :compensate, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event(:compensate_complete, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :ok
      })

    start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :compensate, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:compensate_retry, value}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: value,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :compensate, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:compensate_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: errors,
        status: :error
      })

    start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :compensate, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:compensate_continue, result}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: result,
        status: :ok
      })

    start_time = Process.delete({__MODULE__, :compensate_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :compensate, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event(:undo_start, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :undo
      })

    start_time = System.monotonic_time()
    Process.put({__MODULE__, :undo_start_time, step.name}, start_time)

    :telemetry.execute(
      [:reactor, :step, :undo, :start],
      %{system_time: System.system_time()},
      metadata
    )
  end

  def event({:undo_error, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: errors,
        status: :error
      })

    start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :undo, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event({:undo_retry, errors}, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: errors,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :undo, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event(:undo_retry, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :retry
      })

    start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :undo, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end

  def event(:undo_complete, step, %{__MODULE__ => %{metadata: metadata}}) do
    metadata =
      metadata
      |> Map.merge(%{
        step: step,
        result: nil,
        status: :ok
      })

    start_time = Process.delete({__MODULE__, :undo_start_time, step.name})
    end_time = System.monotonic_time()
    duration = end_time - start_time

    :telemetry.execute(
      [:reactor, :step, :undo, :stop],
      %{
        system_time: System.system_time(),
        duration: duration
      },
      metadata
    )
  end
end