defmodule Panoramix.Error do
defexception [:message, :code]
@type t :: %__MODULE__{}
end
defmodule Panoramix do
@moduledoc """
Post a query to Druid Broker or request its status.
Use Panoramix.Query to build a query.
## Examples
Build a query like this:
```elixir
use Panoramix
q = from "my_datasource",
query_type: "timeseries",
intervals: ["2019-03-01T00:00:00+00:00/2019-03-04T00:00:00+00:00"],
granularity: :day,
filter: dimensions.foo == "bar",
aggregations: [event_count: count(),
unique_id_count: hyperUnique(:user_unique)]
```
And then send it:
```elixir
Panoramix.post_query(q, :default)
```
Where `:default` is a configuration profile pointing to your Druid server.
The default value for the profile argument is `:default`, so if you
only need a single configuration you can omit it:
```elixir
Panoramix.post_query(q)
```
Response example:
```elixir
{:ok,
[
%{
"result" => %{
"event_count" => 7544,
"unique_id_count" => 43.18210933535
},
"timestamp" => "2019-03-01T00:00:00.000Z"
},
%{
"result" => %{
"event_count" => 1051,
"unique_id_count" => 104.02052398847
},
"timestamp" => "2019-03-02T00:00:00.000Z"
},
%{
"result" => %{
"event_count" => 4591,
"unique_id_count" => 79.19885795313
},
"timestamp" => "2019-03-03T00:00:00.000Z"
}
]}
```
To make a nested query, pass a map of the form `%{type: :query, query: inner_query}`
as data source. For example:
```elixir
use Panoramix
inner_query = from "my_datasource",
query_type: "topN",
intervals: ["2019-03-01T00:00:00+00:00/2019-03-04T00:00:00+00:00"],
granularity: :day,
aggregations: [event_count: count()],
dimension: "foo",
metric: "event_count",
threshold: 100
q = from %{type: :query, query: inner_query},
query_type: "timeseries",
intervals: ["2019-03-01T00:00:00+00:00/2019-03-04T00:00:00+00:00"],
granularity: :day,
aggregations: [foo_count: count(),
event_count_sum: longSum(:event_count)],
post_aggregations: [mean_events_per_foo: aggregations.event_count_sum / aggregations.foo_count]
```
To make a join query, pass a map of the form `%{type: :join, left: left, right: right,
joinType: :INNER | :LEFT, rightPrefix: "prefix_", condition: "condition"}`. Both the left
and the right side can be a nested query as above, `%{type: :query, query: inner_query}`,
which will be expanded. Other join sources will be passed to Druid unchanged. For example:
```elixir
use Panoramix
from %{type: :join,
left: "sales",
right: %{type: :lookup, lookup: "store_to_country"},
rightPrefix: "r.",
condition: "store == \"r.k\"",
joinType: :INNER},
query_type: "groupBy",
intervals: ["0000/3000"],
granularity: "all",
dimensions: [%{type: "default", outputName: "country", dimension: "r.v"}],
aggregations: [country_revenue: longSum(:revenue)]
```
You can also build a JSON query yourself by passing it as a map to
`post_query`:
```elixir
Panoramix.post_query(%{queryType: "timeBoundary", dataSource: "my_datasource"})
```
To request status from Broker run
```elixir
Panoramix.status(:default)
```
"""
@moduledoc since: "1.0.0"
@spec post_query(Panoramix.Query.t() | map(), atom()) :: {:ok, term()} |
{:error, HTTPoison.Error.t() | Jason.DecodeError.t() | Panoramix.Error.t()}
def post_query(query, profile \\ :default) do
url_path = "/druid/v2"
body = case query do
%Panoramix.Query{} ->
Panoramix.Query.to_json(query)
_ ->
Jason.encode!(query)
end
headers = [{"Content-Type", "application/json"}]
request_and_decode(profile, :post, url_path, body, headers)
end
@spec post_query!(Panoramix.Query.t() | map(), atom()) :: term()
def post_query!(query, profile \\ :default) do
case post_query(query, profile) do
{:ok, response} -> response
{:error, error} -> raise error
end
end
@spec status(atom) :: {:ok, term()} |
{:error, HTTPoison.Error.t() | Jason.DecodeError.t() | Panoramix.Error.t()}
def status(profile \\ :default) do
url_path = "/status"
body = ""
headers = []
request_and_decode(profile, :get, url_path, body, headers)
end
@spec status!(atom) :: term()
def status!(profile \\ :default) do
case status(profile) do
{:ok, response} -> response
{:error, error} -> raise error
end
end
defp request_and_decode(profile, method, url_path, body, headers) do
broker_profiles = Application.get_env(:panoramix, :broker_profiles)
broker_profile = broker_profiles[profile] ||
raise ArgumentError, "no broker profile with name #{profile}"
url = broker_profile[:base_url] <> url_path
options = http_options(url, broker_profile)
with {:ok, http_response} <- HTTPoison.request(method, url, body, headers, options),
{:ok, body} <- maybe_handle_druid_error(http_response) do
Jason.decode(body)
end
end
defp http_options(url, broker_profile) do
ssl_options(url, broker_profile) ++ auth_options(broker_profile) ++ timeout_options()
end
defp ssl_options(url, broker_profile) do
if url =~ ~r(^https://) do
cacert_options = cacert_options(broker_profile)
[ssl: [verify: :verify_peer, depth: 10] ++ cacert_options]
else
[]
end
end
defp cacert_options(broker_profile) do
cond do
cacertfile = broker_profile[:cacertfile] ->
# The CA certificate is in a file.
[cacertfile: cacertfile]
cacert = broker_profile[:cacert] ->
# The CA certificate is provided as a PEM-encoded string.
# Need to convert it to DER.
pem_entries = :public_key.pem_decode(cacert)
cacerts = for {:"Certificate", cert, :not_encrypted} <- pem_entries, do: cert
[cacerts: cacerts]
true ->
# No CA certificate specified.
[]
end
end
defp auth_options(broker_profile) do
if broker_profile[:http_username] do
auth = {broker_profile[:http_username], broker_profile[:http_password]}
[hackney: [basic_auth: auth]]
else
[]
end
end
defp timeout_options() do
# Default to 120 seconds
request_timeout = Application.get_env(:panoramix, :request_timeout, 120_000)
[recv_timeout: request_timeout]
end
defp maybe_handle_druid_error(
%HTTPoison.Response{status_code: 200, body: body}) do
{:ok, body}
end
defp maybe_handle_druid_error(
%HTTPoison.Response{status_code: status_code, body: body}) do
message =
"Druid error (code #{status_code}): " <>
case Jason.decode body do
{:ok, %{"error" => _} = decoded} ->
# Usually we'll get a JSON object from Druid with "error",
# "errorMessage", "errorClass" and "host". Some of them
# might be null.
Enum.join(
for field <- ["error", "errorMessage", "errorClass", "host"],
decoded[field] do
"#{field}: #{decoded[field]}"
end, " ")
_ ->
"undecodable error: " <> body
end
{:error, %Panoramix.Error{message: message, code: status_code}}
end
@doc ~S"""
Format a date or a datetime into a format that Druid expects.
## Examples
iex> Panoramix.format_time! ~D[2018-07-20]
"2018-07-20"
iex> Panoramix.format_time!(
...> Timex.to_datetime({{2018,07,20},{1,2,3}}))
"2018-07-20T01:02:03+00:00"
"""
def format_time!(%DateTime{} = datetime) do
Timex.format! datetime, "{ISO:Extended}"
end
def format_time!(%Date{} = date) do
Timex.format! date, "{ISOdate}"
end
defmacro __using__(_params) do
quote do
import Panoramix.Query, only: [from: 2]
end
end
end