lib/panoramix/query.ex

defmodule Panoramix.Query do
  @moduledoc """
  Provides functions for building Druid query requests.
  """
  defstruct [query_type: nil, data_source: nil, intervals: nil, granularity: nil,
             aggregations: nil, post_aggregations: nil, filter: nil,
             dimension: nil, dimensions: nil, metric: nil, threshold: nil, context: nil,
             to_include: nil, merge: nil, analysis_types: nil, limit_spec: nil,
             bound: nil, virtual_columns: nil, limit: nil, search_dimensions: nil,
             query: nil, sort: nil]

  # A query has type Panoramix.query.t()
  @type t :: %__MODULE__{}

  @doc """
  Use `from` macro to build Druid queries. See [Druid documentation](http://druid.io/docs/latest/querying/querying.html) to learn about
  available fields and general query object structure.

  ## Example

    ```elixir
      iex(1)> use Panoramix
      Panoramix.Query
      iex(2)> q = from "my_datasource",
      ...(2)>       query_type: "timeseries",
      ...(2)>       intervals: ["2019-03-01T00:00:00+00:00/2019-03-04T00:00:00+00:00"],
      ...(2)>       granularity: :day,
      ...(2)>       filter: dimensions.foo == "bar",
      ...(2)>        aggregations: [event_count: count(),
      ...(2)>                       unique_id_count: hyperUnique(:user_unique)]
      %Panoramix.Query{
      aggregations: [
        %{name: :event_count, type: "count"},
        %{fieldName: :user_unique, name: :unique_id_count, type: :hyperUnique}
      ],
      analysis_types: nil,
      bound: nil,
      context: %{priority: 0, timeout: 120000},
      data_source: "my_datasource",
      dimension: nil,
      dimensions: nil,
      filter: %{dimension: "foo", type: "selector", value: "bar"},
      granularity: :day,
      intervals: ["2019-03-01T00:00:00+00:00/2019-03-04T00:00:00+00:00"],
      limit: nil,
      limit_spec: nil,
      merge: nil,
      metric: nil,
      post_aggregations: nil,
      query: nil,
      query_type: "timeseries",
      search_dimensions: nil,
      sort: nil,
      threshold: nil,
      to_include: nil,
      virtual_columns: nil
      }
    ```

  Some HLL aggregation names are capitalized and therefore won't play well with the macro. For such cases
  use their aliases as a workaround:
  `hllSketchBuild`, `hllSketchMerge`, `hllSketchEstimate`, `hllSketchUnion`, `hllSketchToString`.

  The aggregation aliases will be replaced with original names when building a query.

  ## Example

    ```elixir
      iex(1)> use Panoramix
      Panoramix.Query
      iex(2)> query = from "my_datasource",
      ...(2)>       query_type: "timeseries",
      ...(2)>       intervals: ["2018-05-29T00:00:00+00:00/2018-06-05T00:00:00+00:00"],
      ...(2)>       granularity: :day,
      ...(2)>       aggregations: [event_count: count(),
      ...(2)>                     unique_ids: hllSketchMerge(:user_unique, round: true)]
      %Panoramix.Query{
        aggregations: [
          %{name: :event_count, type: "count"},
          %{
            fieldName: :user_unique,
            name: :unique_ids,
            round: true,
            type: "HLLSketchMerge"
          }
        ],
        ...
      }
    ```

  """
  @doc since: "1.0.0"
  defmacro from(source, kw) do
    # Supply default "context" parameters (timeout, priority) so that
    # we always have some to work with. If these have already been supplied
    # in kw then defaults will be overwritten.
    query_fields = [context: default_context()] ++ List.foldl(kw, [], &build_query/2)
    quote generated: true, bind_quoted: [source: source, query_fields: query_fields] do
      query =
        case source do
          %Panoramix.Query{} ->
            # Are we extending an existing query?
            source
          _ ->
            # Are we creating a new query from scratch, given some kind of datasource?
            %Panoramix.Query{data_source: Panoramix.Query.datasource(source)}
        end
      Map.merge(query, Map.new query_fields)
    end
  end

  @doc nil
  # exported only so that the `from` macro can call it.
  def datasource(datasource) when is_binary(datasource) do
    # We're using a named datasource as the source for the query
    datasource
  end
  def datasource(%{type: :query, query: nested_query} = datasource) do
    # The datasource is a nested query. Let's convert it to JSON if needed
    nested_query_json =
      case nested_query do
        %Panoramix.Query{} ->
          to_map(nested_query)
        _ ->
          # Assume it's already JSON-shaped
          nested_query
      end
    %{datasource | query: nested_query_json}
  end
  def datasource(%{type: :join, left: left, right: right} = datasource) do
    # A join between two datasources.
    # A named datasource and a recursive join can only appear on the
    # left side, but let's let Druid enforce that.
    left_datasource = datasource(left)
    right_datasource = datasource(right)
    %{datasource | left: left_datasource, right: right_datasource}
  end
  def datasource(%{type: type} = datasource) when is_atom(type) do
    # Some other type of datasource. Let's include it literally.
    datasource
  end

  defp default_context() do
    quote generated: true do
      # Let's add a timeout in the query "context", as we need to
      # tell Druid to cancel the query if it takes too long.
      # We're going to close the HTTP connection on our end, so
      # there is no point in Druid keeping processing.
      timeout = Application.get_env(:panoramix, :request_timeout, 120_000)
      # Also set the configured priority.  0 is what Druid picks if you
      # don't specify a priority, so that seems to be a sensible default.
      priority = Application.get_env(:panoramix, :query_priority, 0)
      %{timeout: timeout, priority: priority}
    end
  end

  defp build_query({field, value}, query_fields)
  when field in [:granularity, :dimension, :dimensions, :metric, :query_type,
                 :threshold, :merge, :analysis_types, :limit_spec,
                 :limit, :search_dimensions, :query, :sort] do
    # For these fields, we just include the value verbatim.
    [{field, value}] ++ query_fields
  end
  defp build_query({:bound, bound}, query_fields) do
    [bound:
     quote generated: true, bind_quoted: [bound: bound] do
       value = String.Chars.to_string bound
       unless value in ["maxTime", "minTime"] do
         raise ArgumentError, "invalid bound value '#{value}', expected 'maxTime' or 'minTime'"
       end
       value
     end
    ] ++ query_fields
  end
  defp build_query({:intervals, intervals}, query_fields) do
    [intervals: build_intervals(intervals)] ++ query_fields
  end
  defp build_query({:aggregations, aggregations}, query_fields) do
    [aggregations: build_aggregations(aggregations)] ++ query_fields
  end
  defp build_query({:post_aggregations, post_aggregations}, query_fields) do
    [post_aggregations: build_post_aggregations(post_aggregations)] ++ query_fields
  end
  defp build_query({:filter, filter}, query_fields) do
    [filter: build_filter(filter)] ++ query_fields
  end
  defp build_query({:to_include, to_include}, query_fields) do
    [to_include:
     quote do
         case unquote(to_include) do
           :all ->
             %{type: "all"}
           :none ->
             %{type: "none"}
           list when is_list(list) ->
             %{type: "list", columns: list}
         end
     end] ++ query_fields
  end
  defp build_query({:virtual_columns, virtual_columns}, query_fields) do
    [virtual_columns: build_virtual_columns(virtual_columns)] ++ query_fields
  end
  defp build_query({:context, context}, query_fields) do
    [context: build_context(context)] ++ query_fields
  end
  defp build_query({unknown, _}, _query_fields) do
    raise ArgumentError, "Unknown query field #{inspect unknown}"
  end

  defp build_intervals(intervals) do
    # mark as "generated" to avoid warnings about unreachable case
    # clauses when interval is a constant
    quote generated: true, bind_quoted: [intervals: intervals] do
      Enum.map intervals, fn
        interval_string when is_binary(interval_string) ->
          # Already a string - pass it on unchanged
          interval_string
        {from, to} ->
          Panoramix.format_time!(from) <> "/" <> Panoramix.format_time!(to)
      end
    end
  end

  defp build_aggregations(aggregations) do
    Enum.map aggregations, &build_aggregation/1
  end

  defp build_aggregation({name, {:count, _, []}}) do
    quote do: %{type: "count", name: unquote name}
  end
  defp build_aggregation({name, {:when, _, [aggregation, filter]}}) do
    # XXX: is it correct to put the name on the "inner" aggregation,
    # instead of the filtered one?
    quote generated: true, bind_quoted: [
      filter: build_filter(filter),
      aggregator: build_aggregation({name, aggregation})]
      do
      case filter do
        nil ->
          # There is no filter - just use the plain aggregator
          aggregator
        _ ->
          %{type: "filtered",
            filter: filter,
            aggregator: aggregator}
      end
    end
  end
  defp build_aggregation({name, {aggregation_type, _, [field_name]}}) do
    # e.g. hyperUnique(:user_unique)
    normalized_aggregation_type = normalize_aggregation_type_name(aggregation_type)
    quote do: %{type: unquote(normalized_aggregation_type),
        name: unquote(name),
        fieldName: unquote(field_name)}
  end
  defp build_aggregation({name, {aggregation_type, _, [field_name, keywords]}}) do
    # e.g. hyperUnique(:user_unique, round: true)
    normalized_aggregation_type = normalize_aggregation_type_name(aggregation_type)
    quote generated: true, bind_quoted: [
      aggregation_type: normalized_aggregation_type,
      name: name,
      field_name: field_name,
      keywords: keywords]
      do
      Map.merge(
        %{type: aggregation_type,
          name: name,
          fieldName: field_name},
        Map.new(keywords))
    end
  end

  # Some capitalized aggregation names need normalizing. See docs for more info.
  defp normalize_aggregation_type_name(:hllSketchBuild), do:
    "HLLSketchBuild"
  defp normalize_aggregation_type_name(:hllSketchMerge), do:
    "HLLSketchMerge"
  defp normalize_aggregation_type_name(:hllSketchEstimate), do:
    "HLLSketchEstimate"
  defp normalize_aggregation_type_name(:hllSketchEstimateWithBounds), do:
    "HLLSketchEstimateWithBounds"
  defp normalize_aggregation_type_name(:hllSketchUnion), do:
    "HLLSketchUnion"
  defp normalize_aggregation_type_name(:hllSketchToString), do:
    "HLLSketchToString"
  defp normalize_aggregation_type_name(name), do:
    name

  defp build_post_aggregations(post_aggregations) do
    Enum.map post_aggregations,
    fn {name, post_aggregation} ->
      pa = build_post_aggregation(post_aggregation)
      quote do
        Map.put(unquote(pa), :name, unquote(name))
      end
    end
  end

  defp build_post_aggregation({arith_op, _, [a, b]})
  when arith_op in [:+, :-, :*, :/] do
    pa1 = build_post_aggregation(a)
    pa2 = build_post_aggregation(b)
    quote do
      %{type: "arithmetic",
        fn: unquote(arith_op),
        fields: [unquote(pa1), unquote(pa2)]}
    end
  end
  defp build_post_aggregation({{:., _, [{:aggregations, _, _}, aggregation]}, _, _}) do
    # aggregations.foo
    quote do
      %{type: "fieldAccess",
        fieldName: unquote(aggregation)}
    end
  end
  defp build_post_aggregation({{:., _, [Access, :get]}, _, [{:aggregations, _, _}, aggregation]}) do
    # aggregations["foo"]
    quote do
      %{type: "fieldAccess",
        fieldName: unquote(aggregation)}
    end
  end
  defp build_post_aggregation(constant) when is_number(constant) do
    quote do
      %{type: "constant",
        value: unquote(constant)}
    end
  end
  defp build_post_aggregation({:expression, _, [expression]}) do
    {:%{}, [], [{:type, "expression"}, {:expression, expression}]}
  end
  defp build_post_aggregation({post_aggregator, _, [field | options]})
  when post_aggregator in [:hllSketchToString, :hllSketchEstimateWithBounds, :hllSketchEstimate] do
    field_ref = build_post_aggregation(field)
    post_aggregation_field_accessor(post_aggregator, :field, field_ref, options)
  end
  defp build_post_aggregation({:hllSketchUnion, _, [fields | options]}) do
    pa_list = for field <- fields, do: build_post_aggregation(field)
    post_aggregation_field_accessor(:hllSketchUnion, :fields, pa_list, options)
  end
  defp build_post_aggregation({post_aggregator, _, [fields]})
  when post_aggregator in [:doubleGreatest, :longGreatest, :doubleLeast, :longLeast] do
    pa_list = for field <- fields, do: build_post_aggregation(field)
    post_aggregation_field_accessor(post_aggregator, :fields, pa_list)
  end
  defp build_post_aggregation({post_aggregator, _, [field_name | options]}) do
    # This is for all post-aggregators that use a "fieldName" parameter,
    # and optionally a bunch of extra parameters.
    post_aggregation_field_accessor(post_aggregator, :fieldName, field_name, options)
  end

  def post_aggregation_field_accessor(type_name, accessor_name, accessor, options \\ []) do
    type_name = normalize_aggregation_type_name(type_name)
    options = List.first(options) || []
    {:%{}, [], [{:type, type_name}, {accessor_name, accessor} | options]}
  end

  defp build_filter({:== = operator, _, [a, b]}) do
    build_eq_filter(operator, a, b)
  end
  defp build_filter({:!= = operator, _, [a, b]}) do
    eq_filter = build_eq_filter(operator, a, b)
    {:%{}, [], [type: "not", field: eq_filter]}
  end
  defp build_filter({:and, _, [a, b]}) do
    filter_a = build_filter(a)
    filter_b = build_filter(b)
    quote generated: true do
      case {unquote(filter_a), unquote(filter_b)} do
        {nil, nil} ->
          # No filter AND no filter: that's "no filter"
          nil
        {nil, filter} ->
          # No filter AND filter: just one filter
          filter
        {filter, nil} ->
          # Likewise
          filter
        # If either or both filter is an AND already, merge them together
        {filter_a_unquoted, filter_b_unquoted} ->
          # Need to handle both atom and string keys
          a_is_and = unquote(atom_or_string_value(quote(do: filter_a_unquoted), :type)) == "and"
          b_is_and = unquote(atom_or_string_value(quote(do: filter_b_unquoted), :type)) == "and"
          filter_a_fields = unquote(atom_or_string_value(quote(do: filter_a_unquoted), :fields))
          filter_b_fields = unquote(atom_or_string_value(quote(do: filter_b_unquoted), :fields))
          case {a_is_and, b_is_and} do
            {true, true} ->
              %{type: "and", fields: filter_a_fields ++ filter_b_fields}
            {true, false} ->
              %{type: "and", fields: filter_a_fields ++ [filter_b_unquoted]}
            {false, true} ->
              %{type: "and", fields: [filter_a_unquoted] ++ filter_b_fields}
            {false, false} ->
              %{type: "and", fields: [filter_a_unquoted, filter_b_unquoted]}
          end
      end
    end
  end
  defp build_filter({:or, _, [a, b]}) do
    filter_a = build_filter(a)
    filter_b = build_filter(b)
    quote generated: true do
      # It's not meaningful to use 'or' with the empty filter,
      # since the empty filter already allows anything.
      case {unquote(filter_a), unquote(filter_b)} do
        {nil, _} ->
          raise "left operand to 'or' must not be nil"
        {_, nil} ->
          raise "right operand to 'or' must not be nil"
        {filter_a_unquoted, filter_b_unquoted} ->
          %{type: "or", fields: [filter_a_unquoted, filter_b_unquoted]}
      end
    end
  end
  defp build_filter({:not, _, [a]}) do
    filter = build_filter(a)
    quote generated: true do
      # It's not meaningful to use 'not' with the empty filter,
      # since "not everything" would allow "nothing".
      case unquote(filter) do
        nil ->
          raise "operand to 'not' must not be nil"
        filter_unquoted ->
          %{type: "not", field: filter_unquoted}
      end
    end
  end
  # Let's handle the 'in' operator.  First, let's handle
  # dimensions.foo in intervals([a, b])
  # (where 'foo' will usually be '__time', a special dimension for
  # the event timestamp)
  defp build_filter({:in, _, [a, {:intervals, _, [intervals]}]}) do
    dimension = dimension_or_extraction_fn(a)
    unless dimension do
      raise "left operand of 'in' must be a dimension"
    end
    {:%{}, [], [
        type: "interval",
        intervals: build_intervals(intervals)] ++
      # allow extraction function
      Map.to_list(dimension)}
  end
  # Now handle
  # dimensions.foo in ["123", "456"]
  defp build_filter({:in, _, [a, values]}) do
    dimension = dimension_or_extraction_fn(a)
    unless dimension do
      raise "left operand of 'in' must be a dimension"
    end
    {:%{}, [], [
        type: "in",
        values: values] ++
      # allow extraction function
      Map.to_list(dimension)}
  end
  defp build_filter({lt1, _, [{lt2, _, [a, b]}, c]})
    when lt1 in [:<, :<=] and lt2 in [:<, :<=] do
    # 1 < dimensions.foo < 10, or
    # 1 <= dimensions.foo <= 10
    #
    # Note that operator precedence and associativity gives:
    # ((a < b) < c)
    # so lt2 is actually the one that appears first in the
    # source code.
    lower_strict = (lt2 == :<)
    upper_strict = (lt1 == :<)
    dimension = dimension_or_extraction_fn(b)
    unless dimension do
      raise "middle operand in bound filter must be a dimension"
    end
    base = {:%{}, [], [type: "bound", lowerStrict: lower_strict, upperStrict: upper_strict] ++
      # allow extraction function
      Map.to_list(dimension)}
    # Need 'generated: true' here to avoid compiler warnings for
    # our case expression in case a and c are literal constants.
    quote generated: true do
      # Need to convert bounds to strings, and select sorting order.
      # Let's go for "numeric" when both are numbers, "lexicographic"
      # when both are strings, and crash otherwise.
      # TODO: do we need "alphanumeric" and "strlen"?
      {lower, upper, ordering} =
        case {unquote(a), unquote(c)} do
          {l, u} when is_integer(l) and is_integer(u) ->
            {Integer.to_string(l), Integer.to_string(u), "numeric"}
          {l, u} when is_float(l) and is_float(u) ->
            {Float.to_string(l), Float.to_string(u), "numeric"}
          {l, u} when is_binary(l) and is_binary(u) ->
            {l, u, "lexicographic"}
        end
      Map.merge(unquote(base),
        %{lower: lower,
          upper: upper,
          ordering: ordering})
    end
  end
  defp build_filter({:expression, _, [expression]}) do
    # A math expression, as described in http://druid.io/docs/0.12.1/misc/math-expr
    # We're expecting a string that we're passing on to Druid
    quote bind_quoted: [expression: expression] do
      %{type: "expression",
        expression: expression}
    end
  end
  defp build_filter({:^, _, [expression]}) do
    # We're recycling the ^ operator to incorporate an already created
    # filter into a filter expression.
    quote generated: true, bind_quoted: [expression: expression] do
      case expression do
        %{type: _} = filter ->
          # Looks like a filter!
          filter
        %{"type" => _} = filter ->
          # Same, but the keys are strings, not atoms
          filter
        nil ->
          # nil is a valid filter as well
          nil
      end
    end
  end

  defp build_eq_filter(operator, a, b) do
    dimension_a = dimension_or_extraction_fn(a)
    dimension_b = dimension_or_extraction_fn(b)
    case {dimension_a, dimension_b} do
      {nil, _} ->
        raise "left operand of #{operator} must be a dimension"
      {_, nil} ->
        # Compare a dimension to a value
        {:%{}, [], [
            type: "selector",
            value: b] ++
          # dimension_a is either just a dimension, or a dimension
          # plus an extraction function
          Map.to_list(dimension_a)}
      {_, _} ->
        # Compare two dimensions
        dimension_spec_a = to_dimension_spec(dimension_a)
        dimension_spec_b = to_dimension_spec(dimension_b)
        quote do: %{type: "columnComparison",
                    dimensions: [unquote(dimension_spec_a),
                                 unquote(dimension_spec_b)]}
    end
  end

  defp atom_or_string_value(map, key_atom) do
    # Given a macro fragment that evaluates to a map, and an atom,
    # return a macro fragment that returns the value of that atom
    # in the map, or the value of the corresponding string in the map,
    # or nil if neither is present in the map.
    var = Macro.unique_var(:x, __MODULE__)
    key_string = Atom.to_string(key_atom)
    {:case, [generated: true], [
        map,
        [do: [
          {:->, [generated: true], [[{:%{}, [], [{key_atom, var}]}], var]},
          {:->, [generated: true], [[{:%{}, [], [{key_string, var}]}], var]},
          {:->, [generated: true], [[{:%{}, [], []}], nil]}]]]}
  end

  # TODO: handle more extraction functions
  defp dimension_or_extraction_fn({{:., _, [{:dimensions, _, _}, dimension]}, _, _}) do
    # dimensions.foo
    %{dimension: Atom.to_string(dimension)}
  end
  defp dimension_or_extraction_fn({{:., _, [Access, :get]}, _, [{:dimensions, _, _}, dimension]}) do
    # dimensions["foo"]
    %{dimension: dimension}
  end
  defp dimension_or_extraction_fn({:lookup, _, args}) do
    case args do
      [lookup_name | maybe_opts] ->
        opts = case maybe_opts do
                 [] -> []
                 [opts] -> opts
               end
        %{extractionFn: {:%{}, [],
                         [{"type", "registeredLookup"},
                          {"lookup", lookup_name}] ++ opts}}
      _ ->
        raise ArgumentError, "Expected lookup name as argument to lookup"
    end
  end
  defp dimension_or_extraction_fn({:|>, _, [left, right]}) do
    left = dimension_or_extraction_fn(left)
    right = dimension_or_extraction_fn(right)
    case {left, right} do
      {%{dimension: dimension, extractionFn: left_extraction_fn}, %{extractionFn: right_extraction_fn}} ->
        # There are extraction functions on both sides of the operator
        # - let's combine them into a cascade extraction function.
        %{dimension: dimension,
          extractionFn: {:%{}, [],
                         [{"type", "cascade"},
                          {"extractionFns", [left_extraction_fn, right_extraction_fn]}]}}
      {%{dimension: dimension}, %{extractionFn: extraction_fn}} ->
        %{dimension: dimension, extractionFn: extraction_fn}
    end
  end
  defp dimension_or_extraction_fn(_) do
    nil
  end

  defp to_dimension_spec(%{dimension: dimension, extractionFn: extraction_fn}) do
    # Do we need to set outputName here?
    {:%{}, [], [type: "extraction",
                dimension: dimension,
                extractionFn: extraction_fn]}
  end
  defp to_dimension_spec(%{dimension: dimension}) do
    dimension
  end

  defp build_virtual_columns(virtual_columns) do
    Enum.map virtual_columns, &build_virtual_column/1
  end

  defp build_virtual_column({name, {:expression, _, [expression, output_type]}}) do
    quote generated: true, bind_quoted: [
      name: name,
      expression: expression,
      output_type: output_type
    ] do
      output_type = String.upcase(String.Chars.to_string(output_type))
      unless output_type in ["LONG", "FLOAT", "DOUBLE", "STRING"] do
        raise ArgumentError, "Unexpected output type #{output_type}, expected one of :long, :float, :double, :string"
      end
      %{"type" => "expression",
        "name" => name,
        "outputType" => output_type,
        "expression" => expression}
    end
  end
  defp build_virtual_column({_name, {:expression, _, args}}) do
    raise ArgumentError, "Expected 2 arguments to 'expression' in virtual column, expression and output type; " <>
      "got #{length args}"
  end

  defp build_context(context) do
    quote generated: true, bind_quoted: [context: context, default_context: default_context()] do
      Map.merge(default_context, context)
    end
  end

  @doc """
  Convert a Panoramix.Query struct into a map ready to be converted to JSON.
  """
  def to_map(%Panoramix.Query{} = query) do
    unless query.query_type do
      raise "query type not specified"
    end
    [queryType: query.query_type,
     dataSource: query.data_source,
     intervals: query.intervals,
     granularity: query.granularity,
     aggregations: query.aggregations,
     postAggregations: query.post_aggregations,
     filter: query.filter,
     dimension: query.dimension,
     dimensions: query.dimensions,
     metric: query.metric,
     threshold: query.threshold,
     context: query.context,
     toInclude: query.to_include,
     merge: query.merge,
     analysisTypes: query.analysis_types,
     limitSpec: query.limit_spec,
     bound: query.bound,
     virtualColumns: query.virtual_columns,
     limit: query.limit,
     searchDimensions: query.search_dimensions,
     query: query.query,
     sort: query.sort,
    ]
    |> Enum.reject(fn {_, v} -> is_nil(v) end)
    |> Enum.into(%{})
  end

  @doc """
  Convert a Panoramix.Query struct into its JSON representation.
  """
  def to_json(query) do
    query
    |> to_map()
    |> Jason.encode!
  end
end