lib/mobius.ex

defmodule Mobius do
  @moduledoc """
  Localized metrics reporter
  """

  use Supervisor

  alias Mobius.{MetricsTable, Scraper}

  alias Telemetry.Metrics

  @default_args [name: :mobius, persistence_dir: "/data"]

  @type time_unit() :: :second | :minute | :hour | :day

  @typedoc """
  Arguments to Mobius

  * `:name` - the name of the mobius instance (defaults to `:mobius`)
  * `:metrics` - list of telemetry metrics for Mobius to track
  * `:persistence_dir` - the top level directory where mobius will persist
    metric information
  * `:day_count` - number of day-granularity samples to keep
  * `:hour_count` - number of hour-granularity samples to keep
  * `:minute_count` - number of minute-granularity samples to keep
  * `:second_count` - number of second-granularity samples to keep
  """
  @type arg() :: {:name, name()} | {:metrics, [Metrics.t()]} | {:persistence_dir, binary()}

  @typedoc """
  The name of the Mobius instance

  This is used to store data for a particular set of mobius metrics.
  """
  @type name() :: atom()

  @type metric_type() :: :counter | :last_value | :sum

  @type metric_name() :: [atom()]

  @doc """
  Start Mobius
  """
  def start_link(args) do
    Supervisor.start_link(__MODULE__, ensure_args(args), name: __MODULE__.Supervisor)
  end

  @impl Supervisor
  def init(args) do
    mobius_persistence_path = Path.join(args[:persistence_dir], to_string(args[:name]))
    :ok = ensure_mobius_persistence_dir(mobius_persistence_path)

    args =
      args
      |> Keyword.put(:persistence_dir, mobius_persistence_path)
      |> Keyword.put_new(:day_count, 60)
      |> Keyword.put_new(:hour_count, 48)
      |> Keyword.put_new(:minute_count, 120)
      |> Keyword.put_new(:second_count, 120)

    MetricsTable.init(args)

    children = [
      {Mobius.MetricsTable.Monitor, args},
      {Mobius.Registry, args},
      {Mobius.Scraper, args}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  defp ensure_args(args) do
    Keyword.merge(@default_args, args)
  end

  defp ensure_mobius_persistence_dir(persistence_path) do
    case File.mkdir_p(persistence_path) do
      :ok ->
        :ok

      {:error, :eexist} ->
        :ok

      error ->
        error
    end
  end

  @typedoc """
  Options to use when plotting time series metric data

  * `:name` - the name of the Mobius instance you are using. Unless you
    specified this in your configuration you should be safe to allow this
    option to default, which is `:mobius_metrics`.
  * `:last` - display data point that have been captured over the last `x`
    amount of time. Where `x` is either an integer or a tuple of
    `{integer(), time_unit()}`. If you only pass an integer the time unit of
    `:seconds` is assumed. By default Mobius will plot the last 3 minutes of
    data.
  * `:from` - the unix timestamp, in seconds, to start querying from
  * `:to` - the unix timestamp, in seconds, to stop querying at
  * `:type` - for metrics that have different types of measurements, you can pass
    this option to filter which metric type you want to plot
  """
  @type plot_opt() ::
          {:name, Mobius.name()}
          | {:last, integer() | {integer(), time_unit()}}
          | {:from, integer()}
          | {:to, integer()}
          | {:type, metric_type()}

  @doc """
  Plot the metric name to the screen

  If there are tags for the metric you can pass those in the second argument:

  ```elixir
  Mobius.Charts.plot("vm.memory.total", %{some: :tag})
  ```

  By default the plot will display the last 3 minutes of metric history.

  However, you can pass the `:from` and `:to` options to look at a specific
  range of time.

  ```elixir
  Mobius.plot("vm.memory.total", %{}, from: 1630619212, to: 1630619219)
  ```

  You can also plot data over the last `x` amount of time. Where x is an
  integer. When there is no `time_unit()` provided the unit is assumed to be
  `:second`.

  Plotting data over the last 30 seconds:

  ```elixir
  Mobius.plot("vm.memory.total", %{}, last: 30)
  ```

  Plotting data over the last 2 hours:

  ```elixir
  Mobius.plot("vm.memory.total", %{}, last: {2, :hour})
  ```
  """
  @spec plot(binary(), map(), [plot_opt()]) :: :ok
  def plot(metric_name, tags \\ %{}, opts \\ []) do
    parsed_metric_name = parse_metric_name(metric_name)
    scraper_query_opts = query_opts(opts)

    series =
      opts
      |> Keyword.get(:name, :mobius)
      |> Scraper.all(scraper_query_opts)
      |> Enum.flat_map(fn {_timestamp, metrics} ->
        series_for_metric_from_metrics(metrics, parsed_metric_name, tags, opts)
      end)

    {:ok, plot} = Mobius.Asciichart.plot(series, height: 12)

    chart = [
      "\t\t",
      IO.ANSI.yellow(),
      "Metric Name: ",
      metric_name,
      IO.ANSI.reset(),
      ", ",
      IO.ANSI.cyan(),
      "Tags: #{inspect(tags)}",
      IO.ANSI.reset(),
      "\n\n",
      plot
    ]

    IO.puts(chart)
  end

  def query_opts(opts) do
    if opts[:from] do
      Keyword.take(opts, [:from, :to])
    else
      last_ts(opts)
    end
  end

  def last_ts(opts) do
    now = System.system_time(:second)

    ts =
      case opts[:last] do
        nil ->
          now - 180

        {offset, unit} ->
          now - offset * get_unit_offset(unit)

        offset ->
          now - offset
      end

    [from: ts]
  end

  defp get_unit_offset(:second), do: 1
  defp get_unit_offset(:minute), do: 60
  defp get_unit_offset(:hour), do: 3600
  defp get_unit_offset(:day), do: 86400

  @type naming_opt :: :csv_ext | :timestamp
  @type csv_opt() ::
          {:file, String.t()}
          | {:naming, [naming_opt]}
          | {:last, integer() | {integer(), time_unit()}}
          | {:from, integer()}
          | {:to, integer()}

  @doc """
  Produces a CSV of currently collected metrics, optionally writing the CSV to file.
  The CSV looks like: timestamp, name, type, value, tag1, tag2, tag3..., tagN
  If tags are provided, only the metrics matching the tags are output to the CSV.
  If a writable file is given via the :file option, the CSV is written to it. Otherwise it is written to the terminal.
  If the optional :naming option contains :csv_ext, the .csv extension is added to the file name if not already present.
  If the optional :naming option list :timestamp, the file name is prefixed by a timestamp.

  Just as with plotting, metrics to be outputted to CSV can be restricted to a relative time period:

   * `:last` - metrics captured over the last `x`
    amount of time. Where `x` is either an integer or a tuple of
    `{integer(), time_unit()}`. If you only pass an integer the time unit of
    `:seconds` is assumed. By default the last 3 minutes of
    data will be outputted.
  * `:from` - the unix timestamp, in seconds, to start querying from
  * `:to` - the unix timestamp, in seconds, to stop querying at

  If a metric has multiple types, a CSV can filter on one type with option `type: :sum` or `type: :last_value` etc..

  Examples:

  iex> Mobius.to_csv("vm.memory.total", %{})
  # -- writes CSV values to the terminal

  iex> Mobius.to_csv("vm.memory.total", %{}, file: "/data/csv/vm.memory.total")
  # -- writes CSV values to file vm.memory.total

  iex> Mobius.to_csv("vm.memory.total", %{}, file: "/data/csv/vm.memory.total", naming: [:csv_ext, :timestamp])
  # -- writes CSV values to a file like 20210830T174954_vm.memory.total.csv

  iex> Mobius.to_csv("vm.memory.total", %{})
  # -- writes CSV values to the terminal

  iex> Mobius.to_csv("vintage_net_qmi.connection.end.duration", %{ifname: "wwan0", status: :disconnected}, type: :sum, last: {60, :day})
  # -- writes CSV values to the terminal

  """
  @spec to_csv(String.t(), map, [csv_opt]) :: :ok
  def to_csv(metric_name, tags \\ %{}, opts \\ []) do
    parsed_metric_name = parse_metric_name(metric_name)

    rows =
      opts
      |> Keyword.get(:name, :mobius)
      |> Scraper.all(query_opts(opts))
      |> Enum.flat_map(fn {timestamp, metrics} ->
        rows_from_metrics(metrics, parsed_metric_name, tags, timestamp, opts)
      end)

    tag_names = unique_tag_names(rows)

    headers_row =
      ["timestamp", "name", "type", "value"] ++
        for tag_name <- tag_names, do: Atom.to_string(tag_name)

    data_rows = data_rows(rows, metric_name, tag_names)

    csv([headers_row | data_rows], opts)
  end

  defp unique_tag_names(rows) do
    Enum.reduce(rows, MapSet.new(), fn row, set ->
      Enum.reduce(Map.keys(row.tags), set, fn tag_name, acc -> MapSet.put(acc, tag_name) end)
    end)
    |> Enum.sort()
  end

  defp data_rows(rows, metric_name, tag_names) do
    Enum.reduce(rows, [], fn row, acc ->
      tag_values = for tag_name <- tag_names, do: "#{Map.get(row.tags, tag_name, "")}"

      data_row =
        ["#{row.timestamp}", "#{metric_name}", "#{row.type}", "#{row.value}"] ++ tag_values

      acc ++ [data_row]
    end)
  end

  defp csv(all_rows, opts) do
    filepath = csv_file(opts)

    out =
      if filepath do
        with :ok <- File.mkdir_p(Path.dirname(filepath)),
             {:ok, device} <- File.open(filepath, [:write]) do
          device
        else
          {:error, reason} ->
            IO.puts(
              "Failed to open file #{inspect(filepath)} to write CSV: #{inspect(reason)}. Writing to terminal instead."
            )

            :stdio
        end
      else
        :stdio
      end

    Enum.each(all_rows, fn row -> IO.write(out, [Enum.intersperse(row, ","), "\n"]) end)
    :ok
  end

  defp csv_file(opts) do
    case Keyword.get(opts, :file) do
      path when is_binary(path) ->
        dir = Path.dirname(path)

        file =
          path
          |> Path.split()
          |> List.last()
          |> maybe_with_csv_ext(opts)
          |> maybe_with_timestamp(opts)

        final_path = Path.join(dir, file)
        IO.puts("[Mobius] Writing CSV to #{final_path}")
        final_path

      _ ->
        nil
    end
  end

  defp maybe_with_csv_ext(path, opts) do
    add_csv_ext? = :csv_ext in Keyword.get(opts, :naming, [])

    if add_csv_ext? and not String.ends_with?(path, ".csv") do
      path <> ".csv"
    else
      path
    end
  end

  defp maybe_with_timestamp(path, opts) do
    if :timestamp in Keyword.get(opts, :naming, []) do
      [ts | _] = DateTime.utc_now() |> DateTime.to_iso8601(:basic) |> String.split(".")
      ts <> "_" <> path
    else
      path
    end
  end

  defp series_for_metric_from_metrics(metrics, metric_name, tags, opts) do
    type = opts[:type]

    Enum.reduce(metrics, [], fn
      metric, ms ->
        case value_from_metric(metric, metric_name, tags, type) do
          nil ->
            ms

          value ->
            ms ++ [value]
        end
    end)
  end

  defp value_from_metric({metric_name, _type, value, tags}, metric_name, tags, nil) do
    value
  end

  defp value_from_metric({metric_name, type, value, tags}, metric_name, tags, type) do
    value
  end

  defp value_from_metric(_metric, _metric_name, _tags, _type), do: nil

  defp rows_from_metrics(metrics, metric_name, tags, timestamp, opts) do
    required_type = Keyword.get(opts, :type)

    Enum.reduce(metrics, [], fn
      {^metric_name, type, value, metric_tags}, rows ->
        if match?(^tags, metric_tags) and matches_type?(type, required_type) do
          row = %{type: type, value: value, tags: metric_tags, timestamp: timestamp}

          rows ++ [row]
        else
          rows
        end

      _metric, rows ->
        rows
    end)
  end

  defp matches_type?(_type, nil), do: true

  defp matches_type?(type, type), do: true
  defp matches_type?(_, _), do: false

  defp parse_metric_name(metric_name),
    do: metric_name |> String.split(".", trim: true) |> Enum.map(&String.to_existing_atom/1)

  @doc """
  Get the current metric information

  If you configured Mobius to use a different name then you can pass in your
  custom name to ensure Mobius requests the metrics from the right place.
  """
  @spec info(Mobius.name() | nil) :: :ok
  def info(name \\ nil) do
    name = name || :mobius

    name
    |> MetricsTable.get_entries()
    |> Enum.group_by(fn {event_name, _type, _value, meta} -> {event_name, meta} end)
    |> Enum.each(fn {{event_name, meta}, metrics} ->
      reports =
        Enum.map(metrics, fn {_event_name, type, value, _meta} ->
          "#{to_string(type)}: #{inspect(value)}\n"
        end)

      [
        "Metric Name: ",
        Enum.join(event_name, "."),
        "\n",
        "Tags: #{inspect(meta)}\n",
        reports
      ]
      |> IO.puts()
    end)
  end
end