lib/broadway_dashboard.ex

defmodule BroadwayDashboard do
  use Phoenix.LiveDashboard.PageBuilder, refresher?: false

  @moduledoc "README.md"
             |> File.read!()
             |> String.split("<!-- MDOC !-->")
             |> Enum.fetch!(1)

  alias BroadwayDashboard.{Metrics, PipelineGraph}

  # We check the Broadway version installed on remote nodes.
  # This should match mix.exs.
  @minimum_broadway_version "1.0.0"

  @disabled_link "https://hexdocs.pm/broadway_dashboard"
  @page_title "Broadway pipelines"

  @impl true
  def init(opts) do
    pipelines = opts[:pipelines] || :auto_discover

    {:ok, %{pipelines: pipelines}, application: :broadway}
  end

  @impl true
  def menu_link(%{pipelines: pipelines}, _capabilities) do
    if pipelines == [] do
      {:disabled, @page_title, @disabled_link}
    else
      {:ok, @page_title}
    end
  end

  defp pipelines_or_auto_discover(pipeline_config, node) do
    cond do
      pipeline_config == [] ->
        {:error, :no_pipelines_available}

      is_list(pipeline_config) ->
        {:ok, pipeline_config}

      pipeline_config == :auto_discover ->
        with :ok <- check_broadway_version(node) do
          running_pipelines(node)
        end

      true ->
        {:error, :no_pipelines_available}
    end
  end

  defp running_pipelines(node) do
    case :rpc.call(node, Broadway, :all_running, []) do
      [] ->
        {:error, :no_pipelines_available}

      pipelines when is_list(pipelines) ->
        {:ok, pipelines}

      {:badrpc, _error} ->
        {:error, :cannot_list_running_pipelines}
    end
  end

  @impl true
  def mount(params, %{pipelines: pipelines}, socket) do
    case pipelines_or_auto_discover(pipelines, socket.assigns.page.node) do
      {:ok, pipelines} ->
        socket = assign(socket, :pipelines, pipelines)
        pipeline = nav_pipeline(params, pipelines)

        cond do
          pipeline ->
            node = socket.assigns.page.node

            with :ok <- check_socket_connection(socket),
                 :ok <- check_broadway_version(node),
                 {:ok, initial_payload} <- Metrics.listen(node, self(), pipeline) do
              stats = %{
                successful: initial_payload.successful,
                failed: initial_payload.failed,
                throughput_successful: 0,
                throughput_failed: 0
              }

              layers = PipelineGraph.build_layers(initial_payload.topology_workload)

              {:ok, assign(socket, pipeline: pipeline, stats: stats, layers: layers)}
            else
              {:error, error} ->
                {:ok, assign(socket, pipeline: nil, error: error)}
            end

          true ->
            nav = pipelines |> hd() |> inspect()
            to = live_dashboard_path(socket, socket.assigns.page, nav: nav)
            {:ok, push_redirect(socket, to: to)}
        end

      {:error, error} ->
        {:ok, assign(socket, pipeline: nil, error: error)}
    end
  end

  defp nav_pipeline(params, pipelines) do
    nav = params["nav"]
    nav = if nav && nav != "", do: nav
    nav && Enum.find(pipelines, fn name -> inspect(name) == nav end)
  end

  defp check_socket_connection(socket) do
    if connected?(socket) do
      :ok
    else
      {:error, :connection_is_not_available}
    end
  end

  @impl true
  def handle_info({:update_pipeline, payload}, socket) do
    if socket.assigns.pipeline == payload.pipeline do
      previous_stats = socket.assigns.stats

      stats = %{
        successful: payload.successful,
        failed: payload.failed,
        throughput_successful: payload.successful - previous_stats.successful,
        throughput_failed: payload.failed - previous_stats.failed
      }

      layers = PipelineGraph.build_layers(payload.topology_workload)

      {:noreply, assign(socket, stats: stats, layers: layers)}
    else
      {:noreply, socket}
    end
  end

  @impl true
  def render(assigns) do
    if assigns[:error] do
      render_error(assigns)
    else
      items =
        for name <- assigns.pipelines do
          name = inspect(name)

          {name, "navigate", name: name, render: fn -> render_pipeline_or_error(assigns) end}
        end

      nav_bar(items: items, page: assigns[:page])
    end
  end

  defp nav_bar(opts) do
    assigns = Map.new(opts)

    ~H"""
    <.live_nav_bar id="broadway_navbar" page={@page}>
      <:item name={name} method={method} :for={{name, method, item} <- @items}>
        <%= item[:render].() %>
      </:item>
    </.live_nav_bar>
    """
  end

  defp render_pipeline_or_error(assigns) do
    if assigns[:error] do
      render_error(assigns)
    else
      render_pipeline(assigns)
    end
  end

  defp render_pipeline(assigns) do
    ~H"""
    <.row>
      <:col>
        <.pipeline_throughput_row stats={@stats} />
        <.pipeline_graph_row layers={@layers} />
      </:col>
    </.row>
    """
  end

  defp render_error(assigns) do
    error_message = error_message(assigns)
    assigns = Map.put(assigns, :error_message, error_message)

    ~H"""
    <.row>
      <:col>
        <.card><%= @error_message %></.card>
      </:col>
    </.row>
    """
  end

  defp error_message(assigns) do
    case assigns.error do
      :connection_is_not_available ->
        "Dashboard is not connected yet."

      :pipeline_not_found ->
        "This pipeline is not available for this node."

      :pipeline_is_not_running ->
        "This pipeline is not running on this node."

      :broadway_is_not_available ->
        "Broadway is not available on remote node."

      :version_is_not_enough ->
        "Broadway is outdated on remote node. Minimum version required is #{@minimum_broadway_version}"

      :no_pipelines_available ->
        "There is no pipeline running on this node."

      :cannot_list_running_pipelines ->
        "Could not list running pipelines at remote node. Please try again later."

      :not_able_to_start_remotely ->
        "Could not start the metrics server remotely. Please try again later."

      {:badrpc, _} ->
        "Could not send request to node. Try again later."
    end
  end

  defp pipeline_throughput_row(assigns) do
    ~H"""
    <.row>
      <:col>
        <.row>
          <:col>
            <.card title="Throughput" hint="Messages p/ second." inner_title="successful"><%= @stats.throughput_successful %></.card>
          </:col>
          <:col>
            <.card inner_title="failed"><%= @stats.throughput_failed %></.card>
          </:col>
          <:col>
            <.card inner_title="total"><%= @stats.throughput_successful + @stats.throughput_failed %></.card>
          </:col>
        </.row>
      </:col>
      <:col>
        <.row>
          <:col>
            <.card title="All time" hint="Messages since start." inner_title="successful"><%= @stats.successful %></.card>
          </:col>
          <:col>
            <.card inner_title="failed"><%= @stats.failed %></.card>
          </:col>
          <:col>
            <.card inner_title="total"><%= @stats.successful + @stats.failed %></.card>
          </:col>
        </.row>
      </:col>
    </.row>
    """
  end

  @hint """
  Each stage of Broadway is represented here by a circle.
  A greener circle means that the stage is most of the time "free".
  When the color change to red it means that the process is doing
  its work.
  You may want to play with the configuration of your pipeline to
  find the sweet spot between a high throughput and a lower number of
  processes in red.
  """

  defp pipeline_graph_row(assigns) do
    assigns = Map.put(assigns, :hint, @hint)

    ~H"""
    <.row>
      <:col>
        <.live_layered_graph layers={@layers} id="pipeline" title="Pipeline" hint={@hint} background={&background/1} format_detail={&format_detail/1} />
      </:col>
    </.row>
    """
  end

  defp background(node_data) when is_binary(node_data) do
    "gray"
  end

  defp background(node_data) do
    # This calculation is defining the Hue portion of the HSL color function.
    # By definition, the value 0 is red and the value 120 is green.
    # See: https://developer.mozilla.org/en-US/docs/Web/CSS/color_value#hsl_colors
    hue = 100 - node_data.detail

    "hsl(#{hue}, 80%, 35%)"
  end

  defp format_detail(node_data) do
    "#{node_data.detail}%"
  end

  defp check_broadway_version(node) do
    case :rpc.call(node, Application, :spec, [:broadway, :vsn]) do
      {:badrpc, _reason} = error ->
        {:error, error}

      vsn when is_list(vsn) ->
        if Version.compare(to_string(vsn), @minimum_broadway_version) in [:gt, :eq] do
          :ok
        else
          {:error, :version_is_not_enough}
        end

      nil ->
        {:error, :broadway_is_not_available}
    end
  end
end