lib/arke_postgres/query.ex

# Copyright 2023 Arkemis S.r.l.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

defmodule ArkePostgres.Query do
  import Ecto.Query
  alias Arke.Utils.DatetimeHandler, as: DatetimeHandler

  @record_fields [:id, :arke_id, :data, :metadata, :inserted_at, :updated_at]

  def generate_query(
        %{filters: filters, orders: orders, offset: offset, limit: limit} = arke_query,
        action
      ) do
    base_query(arke_query, action)
    |> handle_filters(filters)
    |> handle_orders(orders)
    |> handle_offset(offset)
    |> handle_limit(limit)
  end

  def execute(query, :raw),
    do: Ecto.Adapters.SQL.to_sql(:all, ArkePostgres.Repo, generate_query(query, :raw))

  def execute(query, :all) do
    generate_query(query, :all)
    |> ArkePostgres.Repo.all(prefix: query.project)
    |> generate_units(query.arke, query.project)
  end

  def execute(query, :one) do
    record = generate_query(query, :one) |> ArkePostgres.Repo.one(prefix: query.project)
    init_unit(record, query.arke, query.project)
  end

  def execute(query, :count) do
    generate_query(query, :count) |> ArkePostgres.Repo.one(prefix: query.project)
  end

  def execute(query, :pseudo_query), do: generate_query(query, :pseudo_query)

  def get_column(%{data: %{persistence: "arke_parameter"}} = parameter),
    do: get_arke_column(parameter)

  def get_column(%{data: %{persistence: "table_column"}} = parameter),
    do: get_table_column(parameter)

  def remove_arke_system(metadata, project_id) when project_id == :arke_system, do: metadata
  def remove_arke_system(metadata, project_id) do
    case Map.get(metadata, "project") do
      "arke_system" -> Map.delete(metadata, "project")
      _ -> metadata
    end
  end

  def merge_unit_metadata(params, units, project_id) do
    Enum.map(params, fn param ->
      unit_metadata = Map.get(units, param.child_id, %{})
      merge_metadata = Map.merge(param.metadata, unit_metadata, fn _k, _v1, v2 -> v2 end)
      Map.put(param, :metadata, remove_arke_system(merge_metadata, project_id))
    end)
  end

  def get_manager_units(project_id) do
    arke_link =%{id: :arke_link, data: %{parameters: [%{id: :type},%{id: :child_id},%{id: :parent_id},%{id: :metadata}]}}

    links =
      from(q in table_query(arke_link, nil), where: q.type in ["parameter", "group"])
      |> ArkePostgres.Repo.all(prefix: project_id)

    parameter_links = Enum.filter(links, fn x -> x.type == "parameter" end)
    group_links = Enum.filter(links, fn x -> x.type == "group" end)


    parameters_id = Arke.Utils.DefaultData.get_parameters_id()

    list_arke_id = Arke.Utils.DefaultData.get_arke_id()

    unit_list =
      from(q in base_query(), where: q.arke_id in ^list_arke_id)
      |> ArkePostgres.Repo.all(prefix: project_id)

    units_map = Map.new(unit_list, &{&1.id, &1.metadata})
    parameter_links = merge_unit_metadata(parameter_links, units_map, project_id)

    parameters = parse_parameters(Enum.filter(unit_list, fn u -> u.arke_id in parameters_id end), project_id)

    arke_list =
      parse_arke_list(
        Enum.filter(unit_list, fn u -> u.arke_id == "arke" end),
        parameter_links
      )
    groups =
      parse_groups(
        Enum.filter(unit_list, fn u -> u.arke_id == "group" end),
        group_links
      )

    {parameters, arke_list, groups}
  end

  def get_project_record() do
    unit_list =
      from(q in base_query(), where: q.arke_id == "arke_project")
      |> ArkePostgres.Repo.all(prefix: "arke_system")
  end

  defp parse_arke_list(arke_list, parameter_links) do
    # todo: remove the string to atom when everything would become string
    Enum.reduce(arke_list, [], fn %{id: id, metadata: metadata} = unit, new_arke_list ->
      params =
        Enum.reduce(
          Enum.filter(parameter_links, fn x -> x.parent_id == id end),
          [],
          fn p, new_params ->
            [%{id: String.to_atom(p.child_id), metadata: Enum.reduce(p.metadata,%{}, fn {key, val}, acc -> Map.put(acc, String.to_atom(key), val) end)} | new_params]
          end
        )
        updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end)
        |> Map.put(:id, id)
        |> Map.put(:metadata, metadata)
        |> Map.update(:parameters,[], fn current -> params ++ current end)

      [ updated_data | new_arke_list]
    end)
  end

  defp parse_parameters(parameter_list, project_id)do
    Enum.reduce(parameter_list, [], fn %{id: id, arke_id: arke_id, metadata: metadata} = unit, new_parameter_list ->
      parsed_metadata = remove_arke_system(metadata, project_id)

      updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end)
                     |> Map.put(:id, id)
                     |> Map.put(:type, arke_id)
                     |> Map.put(:metadata, parsed_metadata)
      [ updated_data | new_parameter_list]
    end)
  end

  defp parse_groups(groups, group_links) do
    Enum.reduce(groups, [], fn %{id: id, metadata: metadata} = unit, new_groups ->
      arke_list =
        Enum.reduce(
          Enum.filter(group_links, fn x -> x.parent_id == id end),
          [],
          fn p, new_params ->
            [%{id: String.to_atom(p.child_id), metadata: p.metadata} | new_params]
          end
        )
      updated_data = Enum.reduce(unit.data,%{}, fn {k,db_data},acc -> Map.put(acc,String.to_atom(k),db_data["value"]) end)
                     |> Map.put(:id, id)
                     |> Map.put(:metadata, metadata)
                     |> Map.update(:arke_list,[], fn db_arke_list ->
      Enum.reduce(db_arke_list,[], fn key,acc ->
        case Enum.find(arke_list, fn %{id: id, metadata: _metadata} -> to_string(id) == key end) do
          nil -> [key|acc]
          data ->
            [data|acc]
        end
       end)
      end)
      [ updated_data | new_groups]
    end)
  end


  ######################################################################################################################
  # PRIVATE FUNCTIONS ##################################################################################################
  ######################################################################################################################
  defp base_query(%{arke: %{data: %{type: "table"}, id: id} = arke} = _arke_query, action),
    do: table_query(arke, action)

  defp base_query(%{link: nil} = _arke_query, action), do: arke_query(action)

  defp base_query(%{link: %{unit: %{id: link_id},depth: depth, direction: direction,type: type}, project: project} = _arke_query, action),
    do:
      get_nodes(
        project,
        action,
        [to_string(link_id)],
        depth,
        direction,
        type
      )
  defp base_query(%{link: %{unit: unit_list,depth: depth, direction: direction,type: type}, project: project} = _arke_query, action) when is_list(unit_list) do
    get_nodes(
      project,
      action,
      Enum.map(unit_list, fn unit -> to_string(unit.id) end),
      depth,
      direction,
      type
    )
  end

  defp base_query(), do: from("arke_unit", select: ^@record_fields)

  defp arke_query(:count), do: from("arke_unit", select: count("*"))
  defp arke_query(_action), do: from("arke_unit", select: ^@record_fields)

  defp table_query(%{id: id, data: data} = arke, action) do
    table_name = Atom.to_string(id)

    fields =
      Enum.reduce(data.parameters, [], fn %{id: parameter_id}, new_fields ->
        [parameter_id | new_fields]
      end)

    from(table_name, select: ^fields)
  end

  defp get_arke(project, %{arke_id: arke_id}, nil) when is_binary(arke_id),
    do: Arke.Boundary.ArkeManager.get(String.to_existing_atom(arke_id), project)

  defp get_arke(project, %{arke_id: arke_id}, nil) when is_atom(arke_id),
    do: Arke.Boundary.ArkeManager.get(arke_id, project)

  defp get_arke(project, %{arke_id: arke_id}, nil),
    do: Arke.Boundary.ArkeManager.get(String.to_existing_atom(arke_id), project)

  defp get_arke(project, %{arke_id: arke_id}, arke) when is_atom(arke),
    do: Arke.Boundary.ArkeManager.get(arke, project)

  defp get_arke(project, %{arke_id: arke_id}, arke) when is_binary(arke),
    do: Arke.Boundary.ArkeManager.get(String.to_existing_atom(arke), project)

  defp get_arke(_, %{arke_id: arke_id}, arke), do: arke
  defp get_arke(_, _data, arke), do: arke

  defp generate_units(data, arke, project) do
    Enum.reduce(data, [], fn d, units ->
      units ++ [init_unit(d, arke, project)]
    end)
  end

  def init_unit(nil, _, _), do: nil

  def init_unit(record, arke, project) do
    arke = get_arke(project, record, arke)
    {metadata, record} = Map.pop(record, :metadata, %{})
    {record_data, record} = Map.pop(record, :data, %{})

    record_data =
      Enum.map(arke.data.parameters, fn p ->
        {p.id, Map.get(record_data, Atom.to_string(p.id), nil)}
      end)
      |> Map.new()

    record = Map.put(record, :metadata, Map.merge(metadata, %{project: project}))
    record = Map.merge(record_data, record)
    Arke.Core.Unit.load(arke, record)
  end

  def handle_filters(query, filters) do
    Enum.reduce(filters, query, fn %{logic: logic, negate: negate, base_filters: base_filters},
                                   new_query ->
      clause = handle_condition(logic, base_filters) |> handle_negate_condition(negate)
      from(q in new_query, where: ^clause)
    end)
  end

  defp handle_condition(logic, base_filters) do
    Enum.reduce(base_filters, nil, fn %{
                                        parameter: parameter,
                                        operator: operator,
                                        value: value,
                                        negate: negate
                                      },
                                      clause ->
      column = get_column(parameter)
      value = get_value(parameter, value)

      if is_nil(value) or operator == :isnull do
        condition = get_nil_query(parameter, column) |> handle_negate_condition(negate)
        add_condition_to_clause(condition, clause, logic)
      else
        condition =
          filter_query_by_operator(parameter, column, value, operator) |> handle_negate_condition(negate)

        add_condition_to_clause(condition, clause, logic)
      end
    end)
  end

  defp handle_negate_condition(condition, true), do: dynamic([q], not (^condition))
  defp handle_negate_condition(condition, false), do: condition

  defp add_condition_to_clause(condition, nil, _), do: dynamic([q], ^condition)
  defp add_condition_to_clause(condition, clause, :and), do: dynamic([q], ^clause and ^condition)
  defp add_condition_to_clause(condition, clause, :or), do: dynamic([q], ^clause or ^condition)

  defp handle_orders(query, orders) do
    order_by =
      Enum.reduce(orders, [], fn %{parameter: parameter, direction: direction}, new_order_by ->
        column = get_column(parameter)
        [{direction, column} | new_order_by]
      end)

    from(q in query, order_by: ^order_by)
  end

  defp handle_offset(query, offset) when is_nil(offset), do: query
  defp handle_offset(query, offset), do: from(q in query, offset: ^offset)

  defp handle_limit(query, limit) when is_nil(limit), do: query
  defp handle_limit(query, limit), do: from(q in query, limit: ^limit)

  defp get_table_column(%{id: id} = _parameter), do: dynamic([q], fragment("?", field(q, ^id)))

  defp get_arke_column(%{id: id, data: %{multiple: true}} = _parameter),
       do: dynamic([q], fragment("(? -> ? ->> 'value')::jsonb", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :string} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id)))


  defp get_arke_column(%{id: id, arke_id: :atom} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :boolean} = _parameter),
    do:
      dynamic(
        [q],
        fragment("(? -> ? ->> 'value')::boolean", field(q, :data), ^Atom.to_string(id))
      )

  defp get_arke_column(%{id: id, arke_id: :datetime} = _parameter),
    do:
      dynamic(
        [q],
        fragment("(? -> ? ->> 'value')::timestamp", field(q, :data), ^Atom.to_string(id))
      )

  defp get_arke_column(%{id: id, arke_id: :date} = _parameter),
    do:
      dynamic(
        [q],
        fragment("(? -> ? ->> 'value')::date", field(q, :data), ^Atom.to_string(id))
      )

  defp get_arke_column(%{id: id, arke_id: :time} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::time", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :integer} = _parameter),
    do:
      dynamic(
        [q],
        fragment("(? -> ? ->> 'value')::integer", field(q, :data), ^Atom.to_string(id))
      )

  defp get_arke_column(%{id: id, arke_id: :float} = _parameter),
    do:
      dynamic([q], fragment("(? -> ? ->> 'value')::float", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :dict} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :list} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::JSON", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :link} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id)))

  defp get_arke_column(%{id: id, arke_id: :dynamic} = _parameter),
    do: dynamic([q], fragment("(? -> ? ->> 'value')::text", field(q, :data), ^Atom.to_string(id)))

  defp get_value(_parameter, value) when is_nil(value), do: value
  defp get_value(_parameter, value) when is_list(value), do: value
  defp get_value(_parameter, value) when is_map(value), do: value

  defp get_value(parameter, value) when is_atom(value) and not is_boolean(value),
    do: get_value(parameter, Atom.to_string(value))

  defp get_value(%{id: id, arke_id: :string} = _parameter, value) when is_binary(value), do: value
  defp get_value(%{id: id, arke_id: :string} = _parameter, value), do: Kernel.inspect(value)

  defp get_value(%{id: id, arke_id: :integer} = _parameter, value) when is_number(value),
    do: value

  defp get_value(%{id: id, arke_id: :integer} = _parameter, value) when is_binary(value) do
    case Integer.parse(value) do
      {value, _remainder} -> value
      _ -> raise("Parameter(#{id}) value not valid")
    end
  end

  defp get_value(%{id: id, arke_id: :integer} = parameter, value) when is_list(value),
    do: parse_number_list(parameter, value, fn v -> is_integer(v) end)

  defp get_value(%{id: id, arke_id: :float} = _parameter, value) when is_number(value), do: value

  defp get_value(%{id: id, arke_id: :float} = _parameter, value) when is_binary(value) do
    case Float.parse(value) do
      {value, _remainder} -> value
      _ -> raise("Parameter(#{id}) value not valid")
    end
  end

  defp get_value(%{id: id, arke_id: :float} = parameter, value) when is_list(value),
    do: parse_number_list(parameter, value, fn v -> is_number(v) end)

  defp get_value(%{id: id, arke_id: :boolean} = _parameter, true), do: true
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "true"), do: true
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "True"), do: true
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, 1), do: true
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "1"), do: true
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, false), do: false
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "false"), do: false
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "False"), do: false
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, 0), do: false
  defp get_value(%{id: id, arke_id: :boolean} = _parameter, "0"), do: false

  defp get_value(%{id: id, arke_id: :boolean} = _parameter, _),
    do: raise("Parameter(#{id}) value not valid")

  defp get_value(%{id: id, arke_id: :datetime} = _parameter, value) do
    case DatetimeHandler.parse_datetime(value) do
      {:ok, parsed_datetime} -> parsed_datetime
      {:error, _msg} -> raise("Parameter(#{id}) value not valid")
    end
  end

  defp get_value(%{id: id, arke_id: :date} = _parameter, value) do
    case DatetimeHandler.parse_date(value) do
      {:ok, parsed_date} -> parsed_date
      {:error, _msg} -> raise("Parameter(#{id}) value not valid")
    end
  end

  defp get_value(%{id: id, arke_id: :time} = _parameter, value) do
    case DatetimeHandler.parse_time(value) do
      {:ok, parsed_time} -> parsed_time
      {:error, _msg} -> raise("Parameter(#{id}) value not valid")
    end
  end

  defp get_value(%{id: id, arke_id: :dict} = _parameter, value), do: value
  defp get_value(%{id: id, arke_id: :list} = _parameter, value), do: value
  defp get_value(%{id: id, arke_id: :link} = _parameter, value), do: value
  defp get_value(%{id: id, arke_id: :dynamic} = _parameter, value), do: value
  defp get_value(%{id: id}, value), do: raise("Parameter(#{id}) value not valid")

  defp parse_number_list(parameter, value, func) do
    case Enum.all?(value, &is_binary(&1)) do
      true ->
        Enum.map(value, &get_value(parameter, &1))

      false ->
        # check if all the values are numbers, otherwise throw an error
        case Enum.all?(value, &func.(&1)) do
          true -> Enum.map(value, &get_value(parameter, &1))
          false -> raise("Parameter(#{parameter.id}) value not valid")
        end
    end
  end

  defp get_nil_query(%{id: id} = _parameter, column),
    do:
      dynamic(
        [q],
        fragment("? IS NULL AND (data \\? ?)", ^column, ^Atom.to_string(id))
      )

  defp filter_query_by_operator(%{data: %{multiple: true}}, column, value, :eq), do: dynamic([q], fragment("jsonb_exists(?, ?)", ^column, ^value))
  defp filter_query_by_operator(parameter, column, value, :eq), do: dynamic([q], ^column == ^value)

  defp filter_query_by_operator(parameter, column, value, :contains),
    do: dynamic([q], like(^column, fragment("?", ^("%" <> value <> "%"))))

  defp filter_query_by_operator(parameter, column, value, :icontains),
    do: dynamic([q], ilike(^column, fragment("?", ^("%" <> value <> "%"))))

  defp filter_query_by_operator(parameter, column, value, :endswith),
    do: dynamic([q], like(^column, fragment("?", ^("%" <> value))))

  defp filter_query_by_operator(parameter, column, value, :iendswith),
    do: dynamic([q], ilike(^column, fragment("?", ^("%" <> value))))

  defp filter_query_by_operator(parameter, column, value, :startswith),
    do: dynamic([q], like(^column, fragment("?", ^(value <> "%"))))

  defp filter_query_by_operator(parameter, column, value, :istartswith),
    do: dynamic([q], ilike(^column, fragment("?", ^(value <> "%"))))

  defp filter_query_by_operator(parameter, column, value, :lte), do: dynamic([q], ^column <= ^value)
  defp filter_query_by_operator(parameter, column, value, :lt), do: dynamic([q], ^column < ^value)
  defp filter_query_by_operator(parameter, column, value, :gt), do: dynamic([q], ^column > ^value)
  defp filter_query_by_operator(parameter, column, value, :gte), do: dynamic([q], ^column >= ^value)
  defp filter_query_by_operator(parameter, column, value, :in), do: dynamic([q], ^column in ^value)
  defp filter_query_by_operator(parameter, column, value, _), do: dynamic([q], ^column == ^value)

  # defp filter_query_by_operator(query, key, value, "between"), do: from q in query, where: column_table(q, ^key) == ^value

  ######################################################################################################################
  # ARKE LINK ##########################################################################################################
  ######################################################################################################################

  @raw_cte_query """
  (
    WITH RECURSIVE tree(depth, parent_id, type, child_id, metadata, starting_unit) AS (
      SELECT 0, parent_id, type, child_id, metadata, ? FROM ?.arke_link WHERE ? = ANY(?)
      UNION SELECT
        depth + 1,
        ?.arke_link.parent_id,
        ?.arke_link.type,
        ?.arke_link.child_id,
        ?.arke_link.metadata,
        tree.starting_unit
      FROM
        ?.arke_link JOIN tree
        ON ?.arke_link.? = tree.?
      WHERE
       depth < ?
    )
    SELECT * FROM tree ORDER BY depth
  )
  """

  def get_nodes(project, action, unit_id, depth, direction, type) when is_list(unit_id) do
    project = get_project(project)
    {link_field, tree_field} = get_fields_by_direction(direction)
    where_field = get_where_field_by_direction(direction) |> get_where_condition_by_type(type)
    get_link_query(action, project, unit_id, link_field, tree_field, depth, where_field)
  end
  def get_nodes(project, action, unit_id, depth, direction, type), do:  get_nodes(project, action, [unit_id], depth, direction, type)

  defp get_project(project) when is_atom(project), do: Atom.to_string(project)
  defp get_project(project), do: project
  defp get_fields_by_direction(:child), do: {"parent_id", "child_id"}
  defp get_fields_by_direction(:parent), do: {"child_id", "parent_id"}

  defp get_where_field_by_direction(:child),
    do: dynamic([a, cte], a.id == fragment("?", field(cte, ^:child_id)))

  defp get_where_field_by_direction(:parent),
    do: dynamic([a, cte], a.id == fragment("?", field(cte, ^:parent_id)))

  defp get_where_condition_by_type(condition, nil), do: condition

  defp get_where_condition_by_type(condition, type),
    do: dynamic([a, cte], ^condition and cte.type == ^type)

  defp get_where_field_by_direction(:parent),
    do: dynamic([a, cte], a.id == fragment("?", field(cte, ^:parent_id)))

  defp get_link_query(:count, project, unit_id_list, link_field, tree_field, depth, where_field) do
    from(a in "arke_unit",
      left_join:
        cte in fragment(
          @raw_cte_query,
          literal(^link_field),
          literal(^project),
          literal(^link_field),
          ^unit_id_list,
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^link_field),
          literal(^tree_field),
          ^depth
        ),
      where: ^where_field,
      select: count([a.id, cte.starting_unit], :distinct)
    )
  end

  defp get_link_query(_action, project, unit_id_list, link_field, tree_field, depth, where_field) do
    q = from(r in from(a in "arke_unit",
      left_join:
        cte in fragment(
          @raw_cte_query,
          literal(^link_field),
          literal(^project),
          literal(^link_field),
          ^unit_id_list,
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^project),
          literal(^link_field),
          literal(^tree_field),
          ^depth
        ),
      where: ^where_field,
      distinct: [a.id, cte.starting_unit],
      select: %{
        id: a.id,
        arke_id: a.arke_id,
        data: a.data,
        metadata: a.metadata,
        inserted_at: a.inserted_at,
        updated_at: a.updated_at,
        depth: cte.depth,
        link_metadata: cte.metadata,
        link_type: cte.type,
        starting_unit: cte.starting_unit
      }
    ))
    from x in subquery(q), select: x
  end

end