defmodule ClickHouse do
@moduledoc """
A ClickHouse client.
This currently represents an early work in progress.
"""
use Supervisor
alias ClickHouse.{Client, Query, Result, Stream, Telemetry}
@start_opts KeywordValidator.schema!(
name: [
is: :atom,
required: true,
default: :default,
doc: "The name of the client."
],
interface: [
is: :mod,
required: true,
default: ClickHouse.Interface.HTTP,
doc: "The network interface to use for queries."
],
formats: [
is: {:list, :mod},
required: true,
default: [
ClickHouse.Format.JSONCompactEachRow,
ClickHouse.Format.RowBinary,
ClickHouse.Format.TSV,
ClickHouse.Format.TSVWithNames,
ClickHouse.Format.TSVWithNamesAndTypes,
ClickHouse.Format.Values
],
doc: "A list of formats available for encoding/decoding."
]
)
################################
# Types
################################
@typedoc """
A ClickHouse client.
"""
@type client :: atom() | ClickHouse.Client.t()
@typedoc """
A query statement.
"""
@type statement :: binary() | struct()
@typedoc """
Parameters used with queries.
Optional data types can be provided in a tuple format.
"""
@type params :: list() | {data_types(), list()}
@typedoc """
A list of data types.
"""
@type data_types :: list(data_type())
@typedoc """
The data types available.
"""
@type data_type ::
:i64
| :i32
| :i16
| :i8
| :u64
| :u32
| :u16
| :u8
| :f64
| :f32
| :string
| :uuid
| :date
| :datetime
| {:datetime64, integer()}
| {:fixed_string, integer()}
| {:enum8, %{(String.t() | atom()) => integer()}}
| {:enum16, %{(String.t() | atom()) => integer()}}
| {:array, data_type()}
| {:low_cardinality, data_type()}
| {:nullable, data_type()}
| {{:simple_aggregate_function, atom()}, data_type()}
@typedoc """
Options used for `child_spec/1` and `start_link/1`
"""
@type start_option ::
{:name, atom()}
| {:interface, ClickHouse.Interface.t()}
| {:formats, [ClickHouse.Format.t()]}
@typedoc """
Various representations of ClickHouse-related errors.
"""
@type error ::
ClickHouse.ConnectionError.t()
| ClickHouse.CoordinationError.t()
| ClickHouse.DatabaseError.t()
| ClickHouse.ParsingError.t()
| ClickHouse.QueryError.t()
| ClickHouse.StreamError.t()
| ClickHouse.SystemError.t()
################################
# Public API
################################
@doc """
Starts a ClickHouse client.
## Options
#{KeywordValidator.docs(@start_opts)}
## Extra Options
Any additional options passed will be given to the client interface.
"""
@spec start_link([start_option()]) :: Supervisor.on_start()
def start_link(opts \\ []) do
opts_keys = Keyword.keys(@start_opts.schema)
{opts, extra_opts} = Keyword.split(opts, opts_keys)
opts = KeywordValidator.validate!(opts, @start_opts)
Supervisor.start_link(__MODULE__, {opts, extra_opts})
end
@doc """
Prepares and executes a query using a ClickHouse client.
"""
@spec query(client(), statement(), params(), keyword()) ::
{:ok, ClickHouse.Result.t()} | {:error, error()}
def query(client, statement, params \\ [], opts \\ [])
def query(%Client{} = client, statement, params, opts) do
query = prepare(client, statement, params)
execute(client, query, opts)
end
def query(client, statement, params, opts) when is_atom(client) do
client = fetch_client!(client)
query(client, statement, params, opts)
end
@doc """
Prepares a query for a ClickHouse client.
"""
@spec prepare(client(), statement(), params()) :: ClickHouse.Query.t()
def prepare(client, statement, params \\ [])
def prepare(%Client{} = client, statement, params) do
Telemetry.span(:prepare, %{client: client.name}, fn ->
result = Query.prepare(client, statement, params)
{result, %{client: client.name}}
end)
end
def prepare(client, statement, params) when is_atom(client) do
client = fetch_client!(client)
prepare(client, statement, params)
end
@doc """
Executes a query using a ClickHouse client.
"""
@spec execute(client(), ClickHouse.Query.t(), opts :: keyword()) ::
{:ok, ClickHouse.Result.t()} | {:error, error()}
def execute(client, query, opts \\ [])
def execute(%Client{} = client, query, opts) do
Telemetry.span(:execute, %{client: client.name}, fn ->
result =
case client.interface.execute(client, query, opts) do
{:ok, result} ->
result = Result.decode(result)
{:ok, result}
{:error, error} ->
Telemetry.error(:execute, error, %{client: client.name})
{:error, error}
end
{result, %{client: client.name}}
end)
end
def execute(client, query, opts) when is_atom(client) do
client = fetch_client!(client)
execute(client, query, opts)
end
@doc """
Creates a new query stream using a ClickHouse client.
"""
@spec stream!(client(), statement(), params(), opts :: keyword()) :: ClickHouse.Stream.t()
def stream!(client, query, params \\ [], opts \\ [])
def stream!(%Client{} = client, statement, params, opts) do
query = prepare(client, statement, params)
Stream.new(client, query, opts)
end
def stream!(client, statement, params, opts) when is_atom(client) do
client = fetch_client!(client)
stream!(client, statement, params, opts)
end
################################
# Supervisor Callbacks
################################
@impl Supervisor
def init({opts, extra_opts}) do
client = init_client(opts)
children = [
interface(opts, extra_opts)
]
Telemetry.span(:init, %{client: client.name}, fn ->
result = Supervisor.init(children, strategy: :one_for_one)
{result, %{client: client.name}}
end)
end
################################
# Private API
################################
defp init_client(opts) do
client = Client.new(opts)
:ok = :persistent_term.put({__MODULE__, client.name}, client)
client
end
defp fetch_client!(name) do
:persistent_term.get({__MODULE__, name})
rescue
ArgumentError ->
# credo:disable-for-next-line
raise ArgumentError, """
No ClickHouse client #{inspect(name)} available.
"""
end
defp interface(opts, extra_opts) do
client = Keyword.fetch!(opts, :name)
interface = Keyword.fetch!(opts, :interface)
interface_opts = Keyword.put(extra_opts, :name, client)
{interface, interface_opts}
end
end