lib/instream/series.ex

defmodule Instream.Series do
  @moduledoc """
  ## Series Definition

  Series definitions can be used to have a fixed structured usable for
  reading and writing data to an InfluxDB server:

      defmodule MySeries.CPULoad do
        use Instream.Series

        series do
          measurement "cpu_load"

          tag :host, default: "www"
          tag :core

          field :value, default: 100
          field :value_desc
        end
      end

  The macros `tag/2` and `field/2` both accept a keyword tuple with a
  `:default` entry. This value will be pre-assigned when using the data
  struct with all other fields or tags being set to `nil`.

  ### Structs

  Each of your series definitions will register three separate structs.

  Based on the aforementioned `MySeries.CPULoad` you will have access
  to the following structs:

      %MySeries.CPULoad{
        fields: %MySeries.CPULoad.Fields{value: 100, value_desc: nil},
        tags: %MySeries.CPULoad.Tags{host: "www", core: nil},
        timestamp: nil
      }

  `:timestamp` is expected to be either a
  unix nanosecond or an RFC3339 timestamp.

  ### Compile-Time Series Validation

  Defining a series triggers a validation function during compilation.

  This validation for example prevents the usage of a field and tag sharing
  the same name. Some internal keys like `:time` will also raise an
  `ArgumentError` during compilation.

  You can deactivate this compile time validation by passing
  `skip_validation: true` in your series module:

      defmodule MySeries.ConflictButAccepted do
        use Instream.Series, skip_validation: true

        series do
          tag :conflict
          field :conflict
        end
      end

  Validations performed:

  - having `use Instream.Series` requires also calling `series do .. end`
  - a measurement must be defined
  - at least one field must be defined
  - fields and tags must not share a name
  - the names `:_field`, `:_measurement`, and `:time` are not allowed to be
    used for fields or tags

  ## Reading Series Points (Hydration)

  Whenever you want to convert a plain map or a query result into a specific
  series you can use the built-in hydration methods:

      MySeries.from_map(%{
        timestamp: 1_234_567_890,
        some_tag: "hydrate",
        some_field: 123
      })

      ~S(SELECT * FROM "my_measurement")
      |> MyConnection.query()
      |> MySeries.from_result()

  The timestamp itself is kept "as is" for integer values, timestamps in
  RFC3339 format (e.g. `"1970-01-01T01:00:00.000+01:00"`) will be converted
  to `:nanosecond` integer values.

  Please be aware that when using an `OTP` release prior to `21.0` the time
  will be truncated to `:microsecond` precision due to
  `:calendar.rfc3339_to_system_time/2` not being available and
  `DateTime.from_iso8601/1` only supporting microseconds.

  ### Hydrating Results with Multiple Fields (InfluxDB v2.x Pivoting)

  If you are connecting to an InfluxDB v2.x instance and have more than one
  field in your series you will need to "pivot" your query results to be able
  to fully hydrate your structs:

      MyConnection.query(~s[
        from(bucket: "\#{MyConnection.config(:bucket)}")
        |> range(start: -5m)
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
      ])

  Without using the `pivot()` function in your `:flux` query you will receive
  one result struct for each field (other fields containing default values)
  instead of one struct with all fields.

  See https://www.influxdata.com/blog/how-to-pivot-data-flux-columnar-data/ for
  more details and examples.

  ## Writing Series Points

  You can then use your series module to assemble a data point (one at a time)
  for writing:

      data = %MySeries{}
      data = %{data | fields: %{data.fields | value: 17}}
      data = %{data | tags: %{data.tags | bar: "bar", foo: "foo"}}

  And then write one or many at once:

      MyConnection.write(point)
      MyConnection.write([point_1, point_2, point_3])

  If you want to pass an explicit timestamp you can use the key `:timestamp`:

      data = %MySeries{}
      data = %{data | timestamp: 1_439_587_926_000_000_000}

  The timestamp is (by default) expected to be a nanosecond unix timestamp.
  To use different precision (for all points in this write operation!) you can
  change this value by modifying your write call:

      data = %MySeries{}
      data = %{data | timestamp: 1_439_587_926}

      MyConnection.write(data, precision: :second)

  Supported precision types are:

  - `:hour`
  - `:minute`
  - `:second`
  - `:millisecond`
  - `:microsecond`
  - `:nanosecond`
  - `:rfc3339`

  Please be aware that the UDP protocol writer (`Instream.Writer.UDP`) does
  not support custom timestamp precisions. All UDP timestamps are implicitly
  expected to already be at nanosecond precision.
  """

  alias Instream.Series.Hydrator
  alias Instream.Series.Validator

  defmacro __using__(opts) do
    quote location: :keep do
      unless unquote(opts[:skip_validation]) do
        @after_compile unquote(__MODULE__)
      end

      import unquote(__MODULE__), only: [series: 1]
    end
  end

  defmacro __after_compile__(%{module: module}, _bytecode) do
    Validator.proper_series?(module)
  end

  @doc """
  Defines the series.
  """
  defmacro series(do: block) do
    quote location: :keep do
      @behaviour unquote(__MODULE__)

      @measurement nil

      Module.register_attribute(__MODULE__, :fields_raw, accumulate: true)
      Module.register_attribute(__MODULE__, :tags_raw, accumulate: true)

      try do
        # scoped import
        import unquote(__MODULE__)
        unquote(block)
      after
        :ok
      end

      @fields_struct Enum.sort(@fields_raw, &unquote(__MODULE__).__sort_fields__/2)
      @tags_struct Enum.sort(@tags_raw, &unquote(__MODULE__).__sort_tags__/2)

      @impl unquote(__MODULE__)
      def __meta__(:fields), do: Keyword.keys(@fields_struct)
      def __meta__(:measurement), do: @measurement
      def __meta__(:tags), do: Keyword.keys(@tags_struct)

      Module.eval_quoted(__ENV__, [
        unquote(__MODULE__).__struct_fields__(@fields_struct),
        unquote(__MODULE__).__struct_tags__(@tags_struct)
      ])

      Module.eval_quoted(__ENV__, [
        unquote(__MODULE__).__struct__(__MODULE__)
      ])

      @impl unquote(__MODULE__)
      def from_map(data), do: Hydrator.from_map(__MODULE__, data)

      @impl unquote(__MODULE__)
      def from_result(data), do: Hydrator.from_result(__MODULE__, data)
    end
  end

  @doc """
  Provides additional metadata for a series.

  ## Available Information

  - `:fields`: the fields in the series
  - `:measurement`: the measurement of the series
  - `:tags`: the available tags defining the series
  """
  @callback __meta__(:field | :measurement | :tags) :: any

  @doc """
  Creates a series dataset from any given map.

  Keys not defined in the series are silently dropped.
  """
  @callback from_map(map) :: struct

  @doc """
  Creates a list of series datasets from a query result.

  Keys not defined in the series are silently dropped.
  """
  @callback from_result(map | [map]) :: [struct]

  @doc """
  Defines a field in the series.
  """
  defmacro field(name, opts \\ []) do
    quote do
      @fields_raw {unquote(name), unquote(opts[:default])}
    end
  end

  @doc """
  Defines the measurement of the series.
  """
  defmacro measurement(name) do
    quote do
      @measurement unquote(name)
    end
  end

  @doc """
  Defines a tag in the series.
  """
  defmacro tag(name, opts \\ []) do
    quote do
      @tags_raw {unquote(name), unquote(opts[:default])}
    end
  end

  @doc false
  def __sort_fields__({left, _}, {right, _}), do: left < right

  @doc false
  def __sort_tags__({left, _}, {right, _}), do: left < right

  @doc false
  def __struct__(series) do
    quote do
      @type t :: %unquote(series){
              fields: %unquote(series).Fields{},
              tags: %unquote(series).Tags{},
              timestamp: non_neg_integer | binary | nil
            }

      defstruct fields: %unquote(series).Fields{},
                tags: %unquote(series).Tags{},
                timestamp: nil
    end
  end

  @doc false
  def __struct_fields__(fields) do
    quote do
      defmodule Fields do
        @moduledoc false

        defstruct unquote(Macro.escape(fields))
      end
    end
  end

  @doc false
  def __struct_tags__(tags) do
    quote do
      defmodule Tags do
        @moduledoc false

        defstruct unquote(Macro.escape(tags))
      end
    end
  end
end