lib/influx_ex/flux.ex

defmodule InfluxEx.Flux do
  @moduledoc """
  Functions to build Flux queries

  Current time related functionality only supports flux's duration types, for
  this is a value such has `15m`. Where `15` is the number of the unit that
  follows, which in this case is `m` meaning minutes.

  For more information about duration types see the
  [Flux docs](https://docs.influxdata.com/flux/v0.x/spec/types/#duration-types).

  The `%InfluxEx.Flux{}` struct implements the `String.Chars` protocol which
  allows `to_string/1` to be called on the structure.
  """

  alias InfluxEx.{Bucket, Client}

  @type aggregate_window_opt() :: {:create_empty, boolean()}

  @type aggregate_selector() :: :mean

  @type aggregate_window() :: %{
          every: binary(),
          create_empty: false,
          fn: aggregate_selector()
        }

  @typedoc """
  Data structure for a flux query
  """
  @type t() :: %__MODULE__{
          bucket: Bucket.name(),
          start: binary() | nil,
          end: binary() | nil,
          measurement: binary() | nil,
          field: binary() | nil,
          tags: map(),
          aggregate_window: aggregate_window() | nil,
          fill_value_use_previous: boolean()
        }

  defstruct bucket: nil,
            start: nil,
            end: nil,
            measurement: nil,
            field: nil,
            tags: %{},
            aggregate_window: nil,
            fill_value_use_previous: false

  @doc """
  Set the bucket for the query

  This function starts a new flux query.

  ```elixir
  InfluxEx.Flux.from("my bucket")
  ```
  """
  @spec from(Bucket.name()) :: t()
  def from(bucket) do
    %__MODULE__{bucket: bucket}
  end

  @doc """
  Set the time range for the query

  To query a bucket over the last 15 minutes, you can use `range/3`:

  ```elixir
  "my_bucket"
  |> InfluxEx.Flux.from()
  |> InfluxEx.Flux.range("-15m")
  ```
  """
  @spec range(t(), binary(), binary() | nil) :: t()
  def range(f, start, end_t \\ nil) do
    %__MODULE__{f | start: start, end: end_t}
  end

  @doc """
  Set which field the query is for

  ```elixir
  "my_bucket"
  |> InfluxEx.Flux.from()
  |> InfluxEx.Flux.range("-15m")
  |> InfluxEx.Flux.measurement("cpu")
  |> InfluxEx.Flux.field("average")
  ```
  """
  @spec field(t(), binary()) :: t()
  def field(f, field) do
    %__MODULE__{f | field: field}
  end

  @doc """
  Set the measurement name to query for

  ```elixir
  "my_bucket"
  |> InfluxEx.Flux.from()
  |> InfluxEx.Flux.range("-15m")
  |> InfluxEx.Flux.measurement("cpu")
  ```
  """
  @spec measurement(t(), binary()) :: t()
  def measurement(f, measurement) do
    %__MODULE__{f | measurement: measurement}
  end

  @doc """
  Set a tag to filter against
  """
  @spec tag(t(), atom() | binary(), binary()) :: t()
  def tag(f, tag_name, tag_value) when is_atom(tag_name) do
    tag(f, Atom.to_string(tag_name), tag_value)
  end

  def tag(f, tag_name, tag_value) when is_binary(tag_name) do
    tags = Map.put(f.tags, tag_name, tag_value)
    %__MODULE__{f | tags: tags}
  end

  @doc """
  Add an aggregate window

  """
  @spec aggregate_window(t(), binary(), [aggregate_window_opt()]) :: t()
  def aggregate_window(f, every, opts \\ []) do
    create_empty = opts[:create_empty] || false
    window = %{every: every, fn: :mean, create_empty: create_empty}

    %{f | aggregate_window: window}
  end

  @doc """
  Fill missing values with the previous one

  This is useful for setting the `:create_empty` field to `true` when using
  `Flux.aggregate_window/3`.

  ```elixir
  "my bucket"
  |> InfluxEx.Flux.from()
  |> InfluxEx.Flux.range("-1d")
  |> InfluxEx.Flux.measurement("cpu")
  |> InfluxEx.Flux.field("average")
  |> InfluxEx.Flux.aggregate_window("1h", create_empty: true)
  |> InfluxEx.Flux.fill_value_previous()
  ```

  The above query will return tables with data over the last day averaged in one
  hour intervals. Where there's missing data points, the query will fill the
  value with the last data point's value.
  """
  def fill_value_previous(f) do
    %{f | fill_value_use_previous: true}
  end

  @doc """
  Run the flux query
  """
  @spec run_query(t(), Client.t(), [InfluxEx.query_opt()]) ::
          {:ok, InfluxEx.tables()} | {:error, InfluxEx.error()}
  def run_query(flux_query, client, opts \\ []) do
    query = to_string(flux_query)

    InfluxEx.query(client, query, opts)
  end

  defimpl String.Chars do
    def to_string(f) do
      case validate(f) do
        :ok ->
          """
          #{from_to_string(f)}
          |> #{range_to_string(f)}
          |> #{measurement_to_string(f)}
          |> #{field_to_string(f)}
          #{tags_to_string(f)}
          """
          |> add_aggregate_window_to_string(f)
          |> add_value_fill(f)

        {:error, missing} ->
          raise ArgumentError, """
          Flux query is missing require fields

          #{inspect(missing)}
          """
      end
    end

    defp from_to_string(f) do
      "from(bucket: #{inspect(f.bucket)})"
    end

    defp range_to_string(f) do
      "range(start: #{f.start})"
    end

    defp measurement_to_string(f) do
      "filter(fn: (r) => r._measurement == #{inspect(f.measurement)})"
    end

    defp field_to_string(f) do
      "filter(fn: (r) => r._field == #{inspect(f.field)})"
    end

    defp add_aggregate_window_to_string(query_string, flux) do
      case flux.aggregate_window do
        nil ->
          query_string

        window ->
          query_string <>
            "\n" <>
            "|> aggregateWindow(every: #{window.every}, fn: #{window.fn}, createEmpty: #{window.create_empty})"
      end
    end

    defp add_value_fill(query_string, flux) do
      if flux.fill_value_use_previous do
        query_string <>
          "\n" <>
          """
          |> fill(column: "_value", usePrevious: true)
          """
      else
        query_string
      end
    end

    defp validate(flux) do
      required = [:bucket, :measurement, :field, :start]

      flux
      |> Map.from_struct()
      |> do_validate(required, [])
    end

    defp do_validate(_, [], []) do
      :ok
    end

    defp do_validate(_, [], missing) do
      {:error, missing}
    end

    defp do_validate(flux, [required | rest], missing) do
      if flux[required] do
        do_validate(flux, rest, missing)
      else
        do_validate(flux, rest, [required | missing])
      end
    end

    defp tags_to_string(flux) do
      # we should improve the query to filter tags in one filter call when
      # possible
      Enum.reduce(flux.tags, "", fn {tag, value}, str ->
        str <> "|> filter(fn: (r) => r[#{inspect(tag)}] == #{inspect(value)})\n"
      end)
    end
  end
end