lib/instream/connection.ex

defmodule Instream.Connection do
  @moduledoc """
  Defines a connection to an InfluxDB instance.

  ## Connection Definition

      defmodule MyConnection do
        use Instream.Connection, otp_app: :my_app
      end

  This connection will fetch its configuration from the application environment
  as defined by `:otp_app`. As an alternative you can define the configuration
  in the module definition itself:

      defmodule MyConnection do
        use Instream.Connection,
          config: [
            version: :v1,
            host: "my.influxdb.host",
            scheme: "http"
          ]
      end

  Both inline and `:otp_app` configuration can be mixed. In this case the
  application configuration will overwrite any inline values.

  For more information on how to configure your connection please refer to
  the documentation of `Instream.Connection.Config`.

  ## Connection Configuration

  There are some configuration values that should be checked/changed to
  get your connection up and running:

  - `:auth`: the authentication method and credentials
  - `:host`: the hostname of the server (defaults to `"localhost"`)
  - `:port`: the port of the server (defaults to `8086`)
  - `:version`: the InfluxDB server version you are using (`:v1` or `:v2`)

  Some additional configuration options/requirements depend
  on the used version:

  - `:org`: InfluxDB v2.x organization
  - `:bucket`: InfluxDB v2.x bucket
  - `:database`: InfluxDB v1.x database

  ### InfluxDB v2.x Compatibility Endpoint (InfluxQL Queries)

  If you are using InfluxQL queries with a `:v2` connection you need to
  set the `:database` configuration to a pre-mapped database.

  Please refer to the official InfluxDB DBRP mapping documentation for details.
  """

  alias Instream.Encoder.Line
  alias Instream.Log

  @type log_entry ::
          Log.PingEntry.t()
          | Log.QueryEntry.t()
          | Log.StatusEntry.t()
          | Log.WriteEntry.t()
  @type precision ::
          :hour | :minute | :second | :millisecond | :microsecond | :nanosecond | :rfc3339

  @type e_version_mismatch :: {:error, :version_mismatch}

  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts], location: :keep do
      alias Instream.Connection
      alias Instream.Connection.Config
      alias Instream.Connection.QueryRunnerV1
      alias Instream.Connection.QueryRunnerV2
      alias Instream.Connection.Supervisor

      @behaviour Connection

      @otp_app opts[:otp_app]
      @config opts[:config] || []

      @impl Connection
      def child_spec(_) do
        %{
          id: __MODULE__,
          start: {Supervisor, :start_link, [__MODULE__]}
        }
      end

      @impl Connection
      def config(key \\ nil), do: Config.get(@otp_app, __MODULE__, key, @config)

      @impl Connection
      def ping(opts \\ []) do
        case config(:version) do
          :v2 -> QueryRunnerV2.ping(opts, __MODULE__)
          _ -> QueryRunnerV1.ping(opts, __MODULE__)
        end
      end

      @impl Connection
      def query(query, opts \\ []) do
        case config(:version) do
          :v2 -> QueryRunnerV2.read(query, opts, __MODULE__)
          _ -> QueryRunnerV1.read(query, opts, __MODULE__)
        end
      end

      @impl Connection
      def status(opts \\ []) do
        case config(:version) do
          :v2 -> {:error, :version_mismatch}
          _ -> QueryRunnerV1.status(opts, __MODULE__)
        end
      end

      @impl Connection
      def version(opts \\ []) do
        case config(:version) do
          :v2 -> QueryRunnerV2.version(opts, __MODULE__)
          _ -> QueryRunnerV1.version(opts, __MODULE__)
        end
      end

      @impl Connection
      def write(points, opts \\ [])

      def write(point, opts) when is_map(point), do: write([point], opts)

      def write(points, opts) when is_list(points) do
        case config(:version) do
          :v2 -> QueryRunnerV2.write(points, opts, __MODULE__)
          _ -> QueryRunnerV1.write(points, opts, __MODULE__)
        end
      end
    end
  end

  @doc """
  Returns a supervisable connection child_spec.
  """
  @callback child_spec(_ignored :: term) :: Supervisor.child_spec()

  @doc """
  Returns the connection configuration.
  """
  @callback config(key :: atom | nil) :: Keyword.t() | term

  @doc """
  Pings the connection server.
  """
  @callback ping(opts :: Keyword.t()) :: :pong | :error

  @doc """
  Executes a reading query.

  Options:

  - `database`: use a database differing from the connection config for reading
  - `method`: whether to use a `:get` or `:post` request
  - `org`: use an organization differing from the connection config for reading
  - `precision`: return data with a "precision" other than `:rfc3339`
  """
  @callback query(query :: String.t(), opts :: Keyword.t()) :: any

  @doc """
  Checks the status of the connection server.

  *Only available with InfluxDB v1.x connections.*
  """
  @callback status(opts :: Keyword.t()) :: :ok | :error | e_version_mismatch

  @doc """
  Determines the version of the connection server.

  If the version if undetectable (no header returned) it will be
  reported as `"unknown"`. If the host is unreachable or an error occurred
  the response will be `:error`.
  """
  @callback version(opts :: Keyword.t()) :: String.t() | :error

  @doc """
  Executes a writing query.

  Usable options depend on the writer module configured.
  """
  @callback write(payload :: Line.point() | [Line.point()], opts :: Keyword.t()) :: any
end