Skip to main content

notebooks/port_arrow_native_benchmark.livemd

# PortArrow vs Native R Backend Benchmark

```elixir
Mix.install([
  {:rx, "~> 0.1"},
  {:kino, "~> 0.19.0"},
  {:benchee, "~> 1.5"},
  {:explorer, "~> 0.11"}
])
```

## Run Order

This notebook compares full Elixir -> R -> Elixir transitions for the external
PortArrow Rscript backend and the experimental native embedded R backend.

For the head-to-head comparison, run PortArrow first, then native in the same
Livebook runtime. The harness switches away from the resettable PortArrow
Rscript process once and then initializes native. Once native has initialized,
embedded R lives inside the Livebook runtime's BEAM process and cannot be
switched back to PortArrow without restarting the runtime.

Native runs require starting Livebook with exactly one native implementation
gate:

```bash
RX_BUILD_NIF=1 livebook server
RX_BUILD_RUST_NIF=1 livebook server
```

Native runs use `stats::lm` and base R plot capture. On Darwin/macOS, the
native child-BEAM harness covers package/namespace loading plus direct R
child-process probes. This benchmark constructs its R `data.frame` from
primitive globals and does not exercise Arrow IPC dataframe routing.

## Backend And Size

```elixir
backend_input =
  Kino.Input.select(
    "R backend",
    [
      port_arrow: "PortArrow separate Rscript process",
      native: "Native embedded NIF"
    ],
    default: :port_arrow
  )
```

```elixir
row_count_input = Kino.Input.number("Rows", default: 100_000)
```

```elixir
defmodule PortArrowNativeBenchmark do
  @default_row_count 100_000
  @benchmark_steps [
    :build_dataset,
    :transfer_dataset_to_r,
    :fit_linear_model,
    :extract_model_summary,
    :print_model_text,
    :capture_regression_plots
  ]

  def default_row_count, do: @default_row_count
  def benchmark_steps, do: @benchmark_steps

  def init_backend(:port_arrow) do
    case Rx.backend() do
      :native ->
        raise """
        Cannot switch from native back to PortArrow in this Livebook runtime.

        Restart the Livebook runtime, choose PortArrow, and run this notebook again.
        """

      _ ->
        :ok = Rx.use_backend(:port_arrow, r_binary: "Rscript", lib_paths: [])

        %{
          selected: :port_arrow,
          rx_backend: Rx.backend(),
          plots_available?: true,
          note: "PortArrow can be reset because R runs in a separate Rscript process."
        }
    end
  end

  def init_backend(:native) do
    c_native? = System.get_env("RX_BUILD_NIF") == "1"
    rust_native? = System.get_env("RX_BUILD_RUST_NIF") == "1"

    if c_native? and rust_native? do
      raise "Set exactly one of RX_BUILD_NIF=1 or RX_BUILD_RUST_NIF=1."
    end

    unless c_native? or rust_native? do
      raise """
      Native backend loading is disabled.

      Start Livebook with exactly one native implementation gate:
          RX_BUILD_NIF=1 livebook server
          RX_BUILD_RUST_NIF=1 livebook server
      """
    end

    case Rx.backend() do
      :native ->
        %{
          selected: :native,
          rx_backend: :native,
          plots_available?: true,
          note: "Native was already initialized in this Livebook runtime."
        }

      _ ->
        r_home = discover_r_home!()
        lib_r_path = discover_lib_r_path!(r_home)

        :ok =
          Rx.use_backend(:native,
            r_home: r_home,
            lib_r_path: lib_r_path,
            lib_paths: []
          )

        %{
          selected: :native,
          rx_backend: Rx.backend(),
          r_home: r_home,
          lib_r_path: lib_r_path,
          plots_available?: true,
          note: "Native plot capture uses the same Rx.plot/3 PNG path as PortArrow."
        }
    end
  end

  def init_backend(other), do: raise("unknown backend choice: #{inspect(other)}")

  def normalize_row_count(value) do
    row_count =
      cond do
        is_integer(value) ->
          value

        is_float(value) ->
          round(value)

        is_binary(value) ->
          value
          |> String.trim()
          |> String.replace("_", "")
          |> Integer.parse()
          |> case do
            {parsed, ""} -> parsed
            _ -> raise ArgumentError, "row count must be an integer"
          end

        true ->
          raise ArgumentError, "row count must be an integer, got: #{inspect(value)}"
      end

    if row_count < 10 do
      raise ArgumentError, "row count must be at least 10 for linear regression"
    end

    row_count
  end

  def build_dataset(row_count \\ @default_row_count) do
    row_count = normalize_row_count(row_count)

    xs =
      for i <- 1..row_count do
        i / 100.0
      end

    ys =
      for i <- 1..row_count do
        x = i / 100.0
        trend = 7.0 + 1.75 * x
        wave = :math.sin(i / 97.0) * 0.35 + :math.cos(i / 211.0) * 0.15
        band = (rem(i, 29) - 14) * 0.01

        trend + wave + band
      end

    %{row_count: row_count, x: xs, y: ys}
  end

  def build_numeric_dataframe(row_count) do
    row_count = normalize_row_count(row_count)
    xs = Enum.map(1..row_count, &(&1 * 1.0))
    ys = Enum.map(xs, &(2.0 * &1 + 1.0))

    %Rx.DataFrame{
      names: ["x", "y"],
      columns: %{"x" => xs, "y" => ys},
      types: %{"x" => :double, "y" => :double},
      n_rows: row_count
    }
  end

  def build_mixed_dataframe(row_count) do
    row_count = normalize_row_count(row_count)
    indexes = Enum.to_list(1..row_count)

    %Rx.DataFrame{
      names: ["ok", "count", "score", "label"],
      columns: %{
        "ok" => Enum.map(indexes, &(rem(&1, 2) == 0)),
        "count" => indexes,
        "score" => Enum.map(indexes, &(&1 * 1.0)),
        "label" => Enum.map(indexes, &"row-#{&1}")
      },
      types: %{"ok" => :logical, "count" => :integer, "score" => :double, "label" => :character},
      n_rows: row_count
    }
  end

  def build_na_dataframe(row_count) do
    row_count = normalize_row_count(row_count)
    indexes = Enum.to_list(1..row_count)

    %Rx.DataFrame{
      names: ["ok", "count", "score", "label"],
      columns: %{
        "ok" =>
          Enum.map(indexes, fn i ->
            if rem(i, 10) == 0, do: %Rx.NA{type: :logical}, else: rem(i, 2) == 0
          end),
        "count" =>
          Enum.map(indexes, fn i ->
            if rem(i, 10) == 0, do: %Rx.NA{type: :integer}, else: i
          end),
        "score" =>
          Enum.map(indexes, fn i ->
            if rem(i, 10) == 0, do: %Rx.NA{type: :double}, else: i * 1.0
          end),
        "label" =>
          Enum.map(indexes, fn i ->
            if rem(i, 10) == 0, do: %Rx.NA{type: :character}, else: "row-#{i}"
          end)
      },
      types: %{"ok" => :logical, "count" => :integer, "score" => :double, "label" => :character},
      n_rows: row_count
    }
  end

  def dataframe_benchmark_plan(row_count, opts \\ []) do
    mixed = build_mixed_dataframe(row_count)

    jobs = %{
      "current_manual_to_r" => fn _row_count -> current_manual_to_r(mixed) end,
      "current_manual_from_r" => fn row_count -> current_manual_from_r(row_count) end,
      "arrow_to_r" => fn _row_count -> arrow_to_r(mixed) end,
      "arrow_from_r" => fn row_count -> arrow_from_r(row_count) end,
      "no_arrow_to_r" => fn _row_count -> no_arrow_to_r(mixed) end,
      "no_arrow_from_r" => fn row_count -> no_arrow_from_r(row_count) end,
      "roundtrip_transform" => fn _row_count -> roundtrip_transform(mixed, engine: :no_arrow) end
    }

    filter_unavailable_arrow_jobs(jobs, opts)
  end

  def dataframe_benchmark_jobs(row_count), do: dataframe_benchmark_plan(row_count).jobs

  def current_manual_to_r(%Rx.DataFrame{} = dataframe) do
    {object, _globals} =
      Rx.eval(
        "data.frame(ok = data$ok, count = data$count, score = data$score, label = data$label, stringsAsFactors = FALSE, check.names = FALSE)",
        %{"data" => dataframe.columns}
      )

    object
  end

  def current_manual_from_r(row_count) do
    {object, _} =
      Rx.eval(
        "as.list(data.frame(ok = rep(c(TRUE, FALSE), length.out = n), count = seq_len(n), score = seq_len(n) * 1.0, label = paste0('row-', seq_len(n)), stringsAsFactors = FALSE, check.names = FALSE))",
        %{"n" => row_count}
      )

    Rx.decode(object)
  end

  def arrow_to_r(%Rx.DataFrame{} = dataframe) do
    {:ok, object} = Rx.DataFrame.to_r(dataframe, engine: :arrow)
    object
  end

  def arrow_from_r(row_count) do
    {object, _} =
      Rx.eval(
        "data.frame(ok = rep(c(TRUE, FALSE), length.out = n), count = seq_len(n), score = seq_len(n) * 1.0, label = paste0('row-', seq_len(n)), stringsAsFactors = FALSE, check.names = FALSE)",
        %{"n" => row_count}
      )

    {:ok, dataframe} = Rx.DataFrame.from_r(object, engine: :arrow)
    dataframe
  end

  def no_arrow_to_r(dataframe) do
    {:ok, object} = Rx.DataFrame.to_r(dataframe, engine: :no_arrow)
    object
  end

  def no_arrow_from_r(row_count) do
    {object, _} =
      Rx.eval(
        "data.frame(ok = rep(c(TRUE, FALSE), length.out = n), count = seq_len(n), score = seq_len(n) * 1.0, label = paste0('row-', seq_len(n)), stringsAsFactors = FALSE, check.names = FALSE)",
        %{"n" => row_count}
      )

    {:ok, dataframe} = Rx.DataFrame.from_r(object, engine: :no_arrow)
    dataframe
  end

  def roundtrip_transform(dataframe, opts) do
    engine = Keyword.fetch!(opts, :engine)
    {:ok, object} = Rx.DataFrame.to_r(dataframe, engine: engine)

    {transformed, _} =
      Rx.eval(
        "transform(df, score = score * 2.0, label = paste0(label, '-done'))",
        %{"df" => object}
      )

    {:ok, result} = Rx.DataFrame.from_r(transformed, engine: engine)
    result
  end

  def filter_unavailable_arrow_jobs(jobs, opts \\ []) do
    arrow_available? =
      Keyword.get_lazy(opts, :arrow_available?, fn ->
        Code.ensure_loaded?(Explorer.DataFrame) and r_package_available?("arrow")
      end)

    if arrow_available? do
      %{jobs: jobs, skipped: %{}}
    else
      skipped = %{
        "arrow_to_r" => :missing_arrow_or_explorer,
        "arrow_from_r" => :missing_arrow_or_explorer
      }

      %{jobs: Map.drop(jobs, Map.keys(skipped)), skipped: skipped}
    end
  end

  def r_package_available?(package) do
    {result, _globals} =
      Rx.eval("requireNamespace(pkg, quietly = TRUE)", %{"pkg" => package})

    Rx.decode(result)
  rescue
    _error -> false
  end

  def run_dataframe_engine_smoke(row_count) do
    row_count = normalize_row_count(row_count)
    setup = init_backend(:port_arrow)
    no_arrow_from_r = no_arrow_from_r(row_count)
    no_arrow_object = no_arrow_to_r(build_mixed_dataframe(row_count))
    {rows, _} = Rx.eval("nrow(df)", %{"df" => no_arrow_object})

    %{
      backend: setup.rx_backend,
      row_count: row_count,
      no_arrow_from_r: no_arrow_from_r,
      no_arrow_to_r_rows: Rx.decode(rows)
    }
  end

  def transfer_dataset_to_r(%{row_count: row_count, x: xs, y: ys}) do
    {df, globals} =
      Rx.eval(
        ~S"""
        df <- data.frame(x = data$x, y = data$y)
        stopifnot(nrow(df) == n)
        df
        """,
        %{"data" => %{"x" => xs, "y" => ys}, "n" => row_count}
      )

    %{df: df, globals: globals, row_count: row_count}
  end

  def fit_linear_model(%{df: df, row_count: row_count}) do
    {model, globals} =
      Rx.eval(
        ~S"""
        model <- stats::lm(y ~ x, data = df)
        model
        """,
        %{"df" => df}
      )

    %{model: model, globals: globals, row_count: row_count}
  end

  def extract_model_summary(%{model: model}) do
    {summary, _globals} =
      Rx.eval(
        ~S"""
        model_summary <- summary(model)
        model_residuals <- stats::residuals(model)
        model_coefficients <- stats::coef(model)

        list(
          intercept = unname(model_coefficients[[1]]),
          slope = unname(model_coefficients[[2]]),
          r_squared = unname(model_summary$r.squared),
          sigma = unname(model_summary$sigma),
          n = as.integer(stats::nobs(model)),
          residual_mean = mean(model_residuals),
          residual_sd = stats::sd(model_residuals)
        )
        """,
        %{"model" => model}
      )

    Rx.decode(summary)
  end

  def print_model_text(%{model: model}) do
    Rx.print(model, width: 100, max_print: 4000)
  end

  def capture_regression_plots(%{model: model}, %{df: df}, opts \\ []) do
    include_plots? = Keyword.get(opts, :include_plots, false)
    backend = Rx.backend()

    plots =
      Rx.plot(
        ~S"""
        plot(
          df$x,
          df$y,
          main = "Deterministic Elixir data with lm fit",
          xlab = "x",
          ylab = "y",
          pch = 16,
          cex = 0.35,
          col = "#2C7FB8"
        )
        abline(model, col = "#D95F02", lwd = 2)

        plot(
          stats::fitted(model),
          stats::residuals(model),
          main = "Residuals vs fitted",
          xlab = "Fitted",
          ylab = "Residual",
          pch = 16,
          cex = 0.35,
          col = "#4D9221"
        )
        abline(h = 0, col = "#333333", lty = 2)
        """,
        %{"df" => df, "model" => model},
        width: 720,
        height: 480,
        res: 96
      )

    result = %{
      status: :captured,
      current_backend: backend,
      plot_count: length(plots),
      byte_sizes: Enum.map(plots, &byte_size(&1.data))
    }

    if include_plots?, do: Map.put(result, :plots, plots), else: result
  end

  def run_pipeline_once(row_count) do
    dataset = build_dataset(row_count)
    transferred = transfer_dataset_to_r(dataset)
    fitted = fit_linear_model(transferred)
    summary = extract_model_summary(fitted)
    model_text = print_model_text(fitted)
    plot_capture = capture_regression_plots(fitted, transferred)

    %{
      backend: Rx.backend(),
      row_count: dataset.row_count,
      coefficients: %{
        intercept: summary["intercept"],
        slope: summary["slope"]
      },
      metrics: Map.take(summary, ["r_squared", "sigma", "n", "residual_mean", "residual_sd"]),
      model_text_bytes: byte_size(model_text),
      plot_capture: plot_capture
    }
  end

  def run_head_to_head(row_count, opts \\ []) do
    if Rx.backend() == :native do
      raise """
      Cannot start the head-to-head benchmark after native has initialized.

      Restart the Livebook runtime so the harness can run PortArrow first, then native.
      """
    end

    row_count = normalize_row_count(row_count)
    port_arrow = benchmark_backend_steps(:port_arrow, row_count, opts)
    native = benchmark_backend_steps(:native, row_count, opts)

    %{
      row_count: row_count,
      port_arrow: port_arrow,
      native: native,
      comparison: compare_backend_timings(port_arrow, native)
    }
  end

  def benchmark_backend_steps(backend, row_count, opts \\ []) do
    row_count = normalize_row_count(row_count)
    repetitions = positive_integer_option(opts, :repetitions, 3)
    include_plot? = Keyword.get(opts, :include_plot?, true)
    plot_repetitions = positive_integer_option(opts, :plot_repetitions, 1)

    setup = init_backend(backend)

    {build_timing, dataset} =
      measure_step(:build_dataset, repetitions, fn ->
        build_dataset(row_count)
      end)

    {transfer_timing, transferred} =
      measure_step(:transfer_dataset_to_r, repetitions, fn ->
        transfer_dataset_to_r(dataset)
      end)

    {fit_timing, fitted} =
      measure_step(:fit_linear_model, repetitions, fn ->
        fit_linear_model(transferred)
      end)

    {summary_timing, summary} =
      measure_step(:extract_model_summary, repetitions, fn ->
        extract_model_summary(fitted)
      end)

    {print_timing, model_text} =
      measure_step(:print_model_text, repetitions, fn ->
        print_model_text(fitted)
      end)

    {plot_timing, plot_capture} =
      measure_plot_step(include_plot?, plot_repetitions, fitted, transferred)

    %{
      backend: backend,
      current_backend: Rx.backend(),
      setup: setup,
      row_count: row_count,
      step_order: @benchmark_steps,
      timings: %{
        build_dataset: build_timing,
        transfer_dataset_to_r: transfer_timing,
        fit_linear_model: fit_timing,
        extract_model_summary: summary_timing,
        print_model_text: print_timing,
        capture_regression_plots: plot_timing
      },
      sample: %{
        coefficients: %{
          intercept: summary["intercept"],
          slope: summary["slope"]
        },
        metrics: Map.take(summary, ["r_squared", "sigma", "n", "residual_mean", "residual_sd"]),
        model_text_bytes: byte_size(model_text),
        plot_capture: plot_capture
      }
    }
  end

  def compare_backend_timings(port_arrow_result, native_result) do
    Enum.map(@benchmark_steps, fn step ->
      port_arrow_timing = get_in(port_arrow_result, [:timings, step]) || %{}
      native_timing = get_in(native_result, [:timings, step]) || %{}

      port_arrow_ms = successful_average_ms(port_arrow_timing)
      native_ms = successful_average_ms(native_timing)

      cond do
        native_timing[:status] == :skipped ->
          %{
            step: step,
            port_arrow_ms: port_arrow_ms,
            native_ms: nil,
            native_vs_port_arrow: :not_available,
            native_status: :skipped,
            native_reason: native_timing[:reason]
          }

        is_number(port_arrow_ms) and is_number(native_ms) and port_arrow_ms > 0 ->
          %{
            step: step,
            port_arrow_ms: port_arrow_ms,
            native_ms: native_ms,
            native_vs_port_arrow: native_ms / port_arrow_ms
          }

        true ->
          %{
            step: step,
            port_arrow_ms: port_arrow_ms,
            native_ms: native_ms,
            native_vs_port_arrow: :not_available
          }
      end
    end)
  end

  def present_head_to_head(head_to_head_result) do
    html = comparison_html(head_to_head_result)

    if Code.ensure_loaded?(Kino.HTML) do
      Kino.HTML.new(html)
    else
      html
    end
  end

  def comparison_html(%{comparison: comparison} = head_to_head_result) when is_list(comparison) do
    max_ms =
      comparison
      |> Enum.flat_map(fn row -> [row[:port_arrow_ms], row[:native_ms]] end)
      |> Enum.filter(&is_number/1)
      |> case do
        [] -> 1.0
        values -> Enum.max(values)
      end

    """
    <section class="rx-benchmark-summary">
      <style>
        .rx-benchmark-summary {
          font-family: Inter, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
          color: #172026;
          line-height: 1.4;
        }
        .rx-benchmark-summary h3 {
          margin: 0 0 12px 0;
          font-size: 1.25rem;
        }
        .rx-benchmark-summary .cards {
          display: grid;
          grid-template-columns: repeat(auto-fit, minmax(210px, 1fr));
          gap: 12px;
          margin: 0 0 16px 0;
        }
        .rx-benchmark-summary .card {
          border: 1px solid #d5dde5;
          border-radius: 6px;
          padding: 10px 12px;
          background: #fbfcfd;
        }
        .rx-benchmark-summary .card h4 {
          margin: 0 0 8px 0;
          font-size: 0.95rem;
        }
        .rx-benchmark-summary .metric {
          display: grid;
          grid-template-columns: minmax(72px, 1fr) auto;
          gap: 8px;
          font-size: 0.88rem;
        }
        .rx-benchmark-summary table {
          width: 100%;
          border-collapse: collapse;
          font-size: 0.9rem;
        }
        .rx-benchmark-summary th,
        .rx-benchmark-summary td {
          border-bottom: 1px solid #e2e8ee;
          padding: 8px 6px;
          text-align: left;
          vertical-align: middle;
        }
        .rx-benchmark-summary th {
          background: #eef3f6;
          color: #27343d;
          font-weight: 650;
        }
        .rx-benchmark-summary .bars {
          display: grid;
          gap: 4px;
          min-width: 190px;
        }
        .rx-benchmark-summary .bar-track {
          display: grid;
          grid-template-columns: 68px 1fr 74px;
          align-items: center;
          gap: 6px;
        }
        .rx-benchmark-summary .track {
          height: 10px;
          background: #edf1f4;
          border-radius: 4px;
          overflow: hidden;
        }
        .rx-benchmark-summary .bar-port-arrow {
          display: block;
          height: 10px;
          background: #2c7fb8;
        }
        .rx-benchmark-summary .bar-native {
          display: block;
          height: 10px;
          background: #4d9221;
        }
        .rx-benchmark-summary .muted {
          color: #66737d;
        }
        .rx-benchmark-summary .winner {
          font-weight: 650;
        }
      </style>
      <h3>Head-to-head summary</h3>
      <div class="cards">
        #{sample_card("PortArrow", backend_sample(head_to_head_result, :port_arrow))}
        #{sample_card("Native", backend_sample(head_to_head_result, :native))}
      </div>
      <table>
        <thead>
          <tr>
            <th>Step</th>
            <th>Timing bars</th>
            <th>PortArrow</th>
            <th>Native</th>
            <th>Result</th>
          </tr>
        </thead>
        <tbody>
          #{Enum.map_join(comparison, "\n", &comparison_table_row(&1, max_ms))}
        </tbody>
      </table>
    </section>
    """
  end

  defp comparison_table_row(row, max_ms) do
    step = row[:step]
    port_arrow_ms = row[:port_arrow_ms]
    native_ms = row[:native_ms]
    result = comparison_result(row)

    """
    <tr>
      <td>#{step_label(step)}</td>
      <td>#{bar_group(port_arrow_ms, native_ms, max_ms)}</td>
      <td>#{format_ms(port_arrow_ms)}</td>
      <td>#{format_ms(native_ms)}</td>
      <td>#{result}</td>
    </tr>
    """
  end

  defp bar_group(port_arrow_ms, native_ms, max_ms) do
    """
    <div class="bars">
      #{bar_line("Port", "bar-port-arrow", port_arrow_ms, max_ms)}
      #{bar_line("Native", "bar-native", native_ms, max_ms)}
    </div>
    """
  end

  defp bar_line(label, _class, nil, _max_ms) do
    """
    <div class="bar-track">
      <span class="muted">#{label}</span>
      <span class="track"></span>
      <span class="muted">n/a</span>
    </div>
    """
  end

  defp bar_line(label, class, value, max_ms) do
    width = value / max_ms * 100 |> max(1.0) |> min(100.0) |> Float.round(2)

    """
    <div class="bar-track">
      <span class="muted">#{label}</span>
      <span class="track"><span class="#{class}" style="width: #{width}%"></span></span>
      <span>#{format_ms(value)}</span>
    </div>
    """
  end

  defp comparison_result(%{native_status: :skipped, native_reason: reason}) do
    "<span class=\"muted\">#{html_escape(reason)}</span>"
  end

  defp comparison_result(%{native_vs_port_arrow: ratio, port_arrow_ms: port_arrow_ms, native_ms: native_ms})
       when is_number(ratio) and ratio < 1.0 and is_number(port_arrow_ms) and is_number(native_ms) and
              native_ms > 0 do
    "<span class=\"winner\">Native faster</span> <span class=\"muted\">(#{format_speedup(port_arrow_ms / native_ms)} faster)</span>"
  end

  defp comparison_result(%{native_vs_port_arrow: ratio, port_arrow_ms: port_arrow_ms, native_ms: native_ms})
       when is_number(ratio) and ratio > 1.0 and is_number(port_arrow_ms) and is_number(native_ms) and
              port_arrow_ms > 0 do
    "<span class=\"winner\">PortArrow faster</span> <span class=\"muted\">(#{format_speedup(native_ms / port_arrow_ms)} faster)</span>"
  end

  defp comparison_result(%{native_vs_port_arrow: ratio}) when is_number(ratio) do
    "<span class=\"winner\">Similar</span> <span class=\"muted\">(1.0x)</span>"
  end

  defp comparison_result(_row), do: "<span class=\"muted\">not available</span>"

  defp sample_card(name, nil) do
    """
    <article class="card">
      <h4>#{html_escape(name)}</h4>
      <div class="muted">No sample data available</div>
    </article>
    """
  end

  defp sample_card(name, sample) do
    coefficients = Map.get(sample, :coefficients, %{})
    metrics = Map.get(sample, :metrics, %{})

    """
    <article class="card">
      <h4>#{html_escape(name)}</h4>
      <div class="metric"><span>n</span><strong>#{format_integer(metrics["n"])}</strong></div>
      <div class="metric"><span>slope</span><strong>#{format_number(coefficients[:slope])}</strong></div>
      <div class="metric"><span>intercept</span><strong>#{format_number(coefficients[:intercept])}</strong></div>
      <div class="metric"><span>R<sup>2</sup></span><strong>#{format_number(metrics["r_squared"])}</strong></div>
      <div class="metric"><span>sigma</span><strong>#{format_number(metrics["sigma"])}</strong></div>
    </article>
    """
  end

  defp backend_sample(%{port_arrow: %{sample: sample}}, :port_arrow), do: sample
  defp backend_sample(%{native: %{sample: sample}}, :native), do: sample
  defp backend_sample(%{port_arrow_sample: sample}, :port_arrow), do: sample
  defp backend_sample(%{native_sample: sample}, :native), do: sample
  defp backend_sample(_result, _backend), do: nil

  defp step_label(:build_dataset), do: "Build Elixir dataset"
  defp step_label(:transfer_dataset_to_r), do: "Elixir -> R transfer"
  defp step_label(:fit_linear_model), do: "R lm fit"
  defp step_label(:extract_model_summary), do: "R -> Elixir summary"
  defp step_label(:print_model_text), do: "R print text"
  defp step_label(:capture_regression_plots), do: "R plot capture"
  defp step_label(other), do: inspect(other)

  defp format_ms(nil), do: "n/a"
  defp format_ms(value), do: "#{format_number(value)} ms"

  defp format_speedup(value) when is_number(value) do
    "#{:erlang.float_to_binary(value * 1.0, decimals: 2)}x"
  end

  defp format_integer(nil), do: "n/a"
  defp format_integer(value) when is_integer(value), do: Integer.to_string(value)
  defp format_integer(value) when is_float(value), do: value |> round() |> Integer.to_string()

  defp format_number(nil), do: "n/a"

  defp format_number(value) when is_integer(value), do: Integer.to_string(value)

  defp format_number(value) when is_float(value) do
    cond do
      abs(value) >= 1000 -> :erlang.float_to_binary(value, decimals: 1)
      abs(value) >= 10 -> :erlang.float_to_binary(value, decimals: 2)
      true -> :erlang.float_to_binary(value, decimals: 4)
    end
  end

  defp html_escape(value) do
    value
    |> to_string()
    |> String.replace("&", "&amp;")
    |> String.replace("<", "&lt;")
    |> String.replace(">", "&gt;")
    |> String.replace("\"", "&quot;")
    |> String.replace("'", "&#39;")
  end

  def prepare_benchmark(row_count) do
    dataset = build_dataset(row_count)
    transferred = transfer_dataset_to_r(dataset)
    fitted = fit_linear_model(transferred)

    %{
      row_count: dataset.row_count,
      dataset: dataset,
      transferred: transferred,
      fitted: fitted
    }
  end

  def benchmark_jobs(%{dataset: dataset, transferred: transferred, fitted: fitted}) do
    jobs = %{
      "elixir build_dataset" => fn row_count ->
        build_dataset(row_count)
      end,
      "elixir to R transfer plus data.frame" => fn _row_count ->
        transfer_dataset_to_r(dataset)
      end,
      "R lm fit on transferred data" => fn _row_count ->
        fit_linear_model(transferred)
      end,
      "R to Elixir summary extraction" => fn _row_count ->
        extract_model_summary(fitted)
      end,
      "R print text extraction" => fn _row_count ->
        print_model_text(fitted)
      end
    }

    if Rx.backend() in [:port_arrow, :native] do
      Map.put(jobs, "R plot capture", fn _row_count ->
        capture_regression_plots(fitted, transferred)
      end)
    else
      jobs
    end
  end

  def benchmark_options(row_count) do
    [
      warmup: 1,
      time: 3,
      memory_time: 1,
      reduction_time: 1,
      inputs: %{"#{row_count} rows" => row_count},
      pre_check: true
    ]
  end

  def plot_capture_benchmark_status do
    case Rx.backend() do
      backend when backend in [:port_arrow, :native] ->
        %{status: :included, reason: "#{backend} supports Rx.plot/3 PNG capture"}

      other ->
        %{status: :skipped, current_backend: other}
    end
  end

  defp measure_plot_step(false, _plot_repetitions, _fitted, _transferred) do
    skipped = %{
      status: :skipped,
      current_backend: Rx.backend(),
      reason: "plot capture disabled by include_plot?: false"
    }

    {Map.delete(skipped, :current_backend), skipped}
  end

  defp measure_plot_step(true, plot_repetitions, fitted, transferred) do
    measure_step(:capture_regression_plots, plot_repetitions, fn ->
      capture_regression_plots(fitted, transferred)
    end)
  end

  defp measure_step(_step, repetitions, fun) do
    {elapsed, results} =
      Enum.map_reduce(1..repetitions, [], fn _iteration, acc ->
        started_at = System.monotonic_time(:nanosecond)
        result = fun.()
        finished_at = System.monotonic_time(:nanosecond)

        {finished_at - started_at, [result | acc]}
      end)

    elapsed_ms = Enum.map(elapsed, &Float.round(&1 / 1_000_000, 3))

    timing = %{
      status: :ok,
      repetitions: repetitions,
      average_ms: average(elapsed_ms),
      min_ms: Enum.min(elapsed_ms),
      max_ms: Enum.max(elapsed_ms)
    }

    {timing, hd(results)}
  end

  defp average(values) do
    values
    |> Enum.sum()
    |> Kernel./(length(values))
    |> Float.round(3)
  end

  defp successful_average_ms(%{status: :ok, average_ms: average_ms}) when is_number(average_ms) do
    average_ms
  end

  defp successful_average_ms(_timing), do: nil

  defp positive_integer_option(opts, key, default) do
    value = Keyword.get(opts, key, default)

    if is_integer(value) and value > 0 do
      value
    else
      raise ArgumentError, "#{key} must be a positive integer, got: #{inspect(value)}"
    end
  end

  defp discover_r_home! do
    case System.cmd("R", ["RHOME"], stderr_to_stdout: true) do
      {r_home, 0} ->
        String.trim(r_home)

      {output, status} ->
        raise "Could not discover R_HOME with `R RHOME`; status #{status}: #{output}"
    end
  rescue
    error in ErlangError ->
      raise "Could not run `R RHOME`: #{Exception.message(error)}"
  end

  defp discover_lib_r_path!(r_home) do
    candidates = [
      Path.join([r_home, "lib", "libR.so"]),
      Path.join([r_home, "lib", "libR.dylib"])
    ]

    Enum.find(candidates, &File.exists?/1) ||
      raise """
      Could not find libR.so or libR.dylib.

      Checked:
      #{Enum.map_join(candidates, "\n", &"    #{&1}")}
      """
  end
end
```

```elixir
row_count = PortArrowNativeBenchmark.normalize_row_count(Kino.Input.read(row_count_input))
```

## Dataframe Engine Benchmarks #1

These jobs compare the current manual map/vector pattern, Arrow-backed
`Rx.DataFrame`, and no-Arrow `Rx.DataFrame`. Arrow usually wins for larger frames;
no-Arrow wins on portability and setup because it does not require the R `arrow`
package.

```elixir
PortArrowNativeBenchmark.r_package_available?("arrow")
```

```elixir
Rx.backend()
```

```elixir
dataframe_plan = PortArrowNativeBenchmark.dataframe_benchmark_plan(row_count)
dataframe_jobs = dataframe_plan.jobs

%{
  backend: Rx.backend(),
  jobs: Map.keys(dataframe_jobs) |> Enum.sort(),
  skipped: dataframe_plan.skipped
}
```

```elixir
Benchee.run(dataframe_jobs, PortArrowNativeBenchmark.benchmark_options(row_count))
```

## Head-To-Head Benchmark

```elixir
head_to_head =
  PortArrowNativeBenchmark.run_head_to_head(row_count,
    repetitions: 3,
    include_plot?: true,
    plot_repetitions: 1
  )

%{
  comparison: head_to_head.comparison,
  port_arrow_sample: head_to_head.port_arrow.sample,
  native_sample: head_to_head.native.sample
}
```

```elixir
PortArrowNativeBenchmark.present_head_to_head(head_to_head)
```

Run this section for the sequential comparison. It starts with PortArrow,
measures each step, switches once to native, measures the matching native steps,
and returns per-step `port_arrow_ms`, `native_ms`, and `native_vs_port_arrow`
values. Native plot capture is shown as a normal timed step when
`include_plot?: true`.

## Dataframe Engine Benchmarks #2

These jobs compare the current manual map/vector pattern, Arrow-backed
`Rx.DataFrame`, and no-Arrow `Rx.DataFrame`. Arrow usually wins for larger frames;
no-Arrow wins on portability and setup because it does not require the R `arrow`
package.

```elixir
Rx.backend()
```

```elixir
dataframe_plan = PortArrowNativeBenchmark.dataframe_benchmark_plan(row_count)
dataframe_jobs = dataframe_plan.jobs

%{
  backend: Rx.backend(),
  jobs: Map.keys(dataframe_jobs) |> Enum.sort(),
  skipped: dataframe_plan.skipped
}
```

```elixir
Benchee.run(dataframe_jobs, PortArrowNativeBenchmark.benchmark_options(row_count))
```

## Optional Single-Backend Benchee Benchmarks

Run these cells instead of the head-to-head cell when you want a longer Benchee
suite for one selected backend. If native has already initialized, restart the
Livebook runtime before selecting PortArrow again.

```elixir
backend = Kino.Input.read(backend_input)

%{
  backend_setup: PortArrowNativeBenchmark.init_backend(backend),
  selected_row_count: row_count
}
```

## Smoke Run

Run the whole pipeline once before benchmarking. This sends deterministic
Elixir vectors into R, constructs a base R `data.frame`, fits `stats::lm`,
extracts summary metrics back to Elixir, prints the model text back to Elixir,
and captures base R plots on PortArrow.

## Benchee Benchmarks

The transfer benchmark reuses a deterministic Elixir dataset. The model-fitting,
summary, print, and plot benchmarks reuse already transferred R handles where
that is the behavior being measured.

```elixir
benchmark_state = PortArrowNativeBenchmark.prepare_benchmark(row_count)
benchmark_jobs = PortArrowNativeBenchmark.benchmark_jobs(benchmark_state)

%{
  backend: Rx.backend(),
  jobs: Map.keys(benchmark_jobs) |> Enum.sort(),
  plot_capture: PortArrowNativeBenchmark.plot_capture_benchmark_status()
}
```

```elixir
Benchee.run(
  benchmark_jobs,
  PortArrowNativeBenchmark.benchmark_options(row_count)
)
```