lib/elasticlunr/core/index.ex

defmodule Elasticlunr.Index.IdPipeline do
  @moduledoc false

  alias Elasticlunr.{Pipeline, Token}

  @behaviour Pipeline

  @impl true
  def call(%Token{} = token), do: token
end

defmodule Elasticlunr.Index do
  alias Elasticlunr.{Field, Pipeline, Token}
  alias Elasticlunr.Index.IdPipeline
  alias Elasticlunr.Dsl.{Query, QueryRepository}

  @fields ~w[fields name ref pipeline documents_size store_positions store_documents on_conflict]a
  @enforce_keys @fields
  defstruct @fields

  @type document_field :: atom() | binary()

  @type t :: %__MODULE__{
          fields: map(),
          on_conflict: atom(),
          documents_size: integer(),
          ref: Field.document_ref(),
          pipeline: Pipeline.t(),
          name: atom() | binary(),
          store_positions: boolean(),
          store_documents: boolean()
        }

  @type search_query :: binary() | map()
  @type search_result :: any()

  @spec new(keyword()) :: t()
  def new(opts \\ []) do
    ref = Keyword.get(opts, :ref, "id")
    pipeline = Keyword.get_lazy(opts, :pipeline, &Pipeline.new/0)

    id_opts = [
      store_documents: false,
      store_positions: false,
      pipeline: Pipeline.new([IdPipeline])
    ]

    attrs = %{
      documents_size: 0,
      ref: ref,
      fields: %{},
      pipeline: pipeline,
      name: Keyword.get_lazy(opts, :name, &UUID.uuid4/0),
      on_conflict: Keyword.get(opts, :on_conflict, :index),
      store_documents: Keyword.get(opts, :store_documents, true),
      store_positions: Keyword.get(opts, :store_positions, true)
    }

    __MODULE__
    |> struct!(attrs)
    |> add_field(to_string(ref), id_opts)
  end

  @spec add_field(t(), document_field(), keyword()) :: t()
  def add_field(
        %__MODULE__{
          fields: fields,
          pipeline: pipeline,
          store_positions: store_positions,
          store_documents: store_documents
        } = index,
        field,
        opts \\ []
      )
      when is_binary(field) do
    opts =
      opts
      |> Keyword.put_new(:pipeline, pipeline)
      |> Keyword.put_new(:store_documents, store_documents)
      |> Keyword.put_new(:store_positions, store_positions)

    %{index | fields: Map.put(fields, field, Field.new(opts))}
  end

  @spec update_field(t(), document_field(), Field.t()) :: t()
  def update_field(%__MODULE__{fields: fields} = index, name, %Field{} = field) do
    if not Map.has_key?(fields, name) do
      raise "Unknown field #{name} in index"
    end

    update_documents_size(%{index | fields: Map.put(fields, name, field)})
  end

  @spec get_fields(t()) :: list(Field.document_ref() | document_field())
  def get_fields(%__MODULE__{fields: fields}), do: Map.keys(fields)

  @spec get_field(t(), document_field()) :: Field.t()
  def get_field(%__MODULE__{fields: fields}, field) do
    Map.get(fields, field)
  end

  @spec save_document(t(), boolean()) :: t()
  def save_document(%__MODULE__{fields: fields} = index, save) do
    fields =
      fields
      |> Enum.map(fn {key, field} -> {key, %{field | store: save}} end)
      |> Enum.into(%{})

    %{index | fields: fields}
  end

  @spec add_documents(t(), list(map())) :: t()
  def add_documents(%__MODULE__{on_conflict: on_conflict} = index, documents, opts \\ []) do
    docs_length = length(documents)
    opts = Keyword.put_new(opts, :on_conflict, on_conflict)

    [index] =
      transform_documents(index, documents, opts)
      |> Stream.with_index(1)
      |> Stream.drop_while(fn {_, index} -> index < docs_length end)
      |> Stream.map(&elem(&1, 0))
      |> Enum.to_list()

    update_documents_size(index)
  end

  @spec update_documents(t(), list(map())) :: t()
  def update_documents(%__MODULE__{ref: ref, fields: fields} = index, documents) do
    transform_document = fn {key, content}, {document, fields} ->
      case Map.get(fields, key) do
        nil ->
          {document, fields}

        %Field{} = field ->
          id = Map.get(document, ref)
          field = Field.update(field, [%{id: id, content: content}])
          fields = Map.put(fields, key, field)

          {document, fields}
      end
    end

    fields =
      Enum.reduce(documents, fields, fn document, fields ->
        document
        |> Enum.reduce({document, fields}, transform_document)
        |> elem(1)
      end)

    update_documents_size(%{index | fields: fields})
  end

  @spec remove_documents(t(), list(Field.document_ref())) :: t()
  def remove_documents(%__MODULE__{fields: fields} = index, document_ids) do
    fields =
      Enum.reduce(fields, fields, fn {key, field}, fields ->
        field = Field.remove(field, document_ids)

        Map.put(fields, key, field)
      end)

    update_documents_size(%{index | fields: fields})
  end

  @spec analyze(t(), document_field(), any(), keyword()) :: Token.t() | list(Token.t())
  def analyze(%__MODULE__{fields: fields}, field, content, options) do
    fields
    |> Map.get(field)
    |> Field.analyze(content, options)
  end

  @spec terms(t(), keyword()) :: any()
  def terms(%__MODULE__{fields: fields}, query) do
    field = Keyword.get(query, :field)

    fields
    |> Map.get(field)
    |> Field.terms(query)
  end

  @spec all(t()) :: list(Field.document_ref())
  def all(%__MODULE__{ref: ref, fields: fields}) do
    fields
    |> Map.get(ref)
    |> Field.all()
  end

  @spec search(t(), search_query(), map() | nil) :: list(search_result())
  def search(index, query, opts \\ nil)
  def search(%__MODULE__{}, nil, _opts), do: []

  def search(%__MODULE__{ref: ref} = index, query, nil) when is_binary(query) do
    fields = get_fields(index)

    matches =
      fields
      |> Enum.reject(&(&1 == ref))
      |> Enum.map(fn field ->
        %{"match" => %{field => query}}
      end)

    elasticsearch(index, %{
      "query" => %{
        "bool" => %{
          "should" => matches
        }
      }
    })
  end

  def search(%__MODULE__{ref: ref} = index, query, %{"fields" => fields}) when is_binary(query) do
    matches =
      fields
      |> Enum.filter(fn field ->
        with true <- field != ref,
             true <- Map.has_key?(fields, field),
             %{"boost" => boost} <- Map.get(fields, field) do
          boost > 0
        end
      end)
      |> Enum.map(fn field ->
        %{"boost" => boost} = Map.get(fields, field)
        match = %{field => query}

        %{"match" => match, "boost" => boost}
      end)

    elasticsearch(index, %{
      "query" => %{
        "bool" => %{
          "should" => matches
        }
      }
    })
  end

  def search(%__MODULE__{} = index, %{"query" => _} = query, _opts),
    do: elasticsearch(index, query)

  def search(%__MODULE__{} = index, query, nil) when is_map(query),
    do: search(index, query, %{"operator" => "OR"})

  def search(%__MODULE__{} = index, %{} = query, options) do
    matches =
      query
      |> Enum.map(fn {field, content} ->
        expand = Map.get(options, "expand", false)

        operator =
          options
          |> Map.get("bool", "or")
          |> String.downcase()

        %{
          "expand" => expand,
          "match" => %{"operator" => operator, field => content}
        }
      end)

    elasticsearch(index, %{
      "query" => %{
        "bool" => %{
          "should" => matches
        }
      }
    })
  end

  defp elasticsearch(index, %{"query" => root}) do
    {key, value} = Query.split_root(root)

    query = QueryRepository.parse(key, value, root)

    query
    |> QueryRepository.score(index)
    |> Enum.sort(fn a, b -> a.score > b.score end)
  end

  defp elasticsearch(_index, _query) do
    raise "Root object must have a query element"
  end

  defp update_documents_size(%__MODULE__{fields: fields} = index) do
    size =
      index
      |> get_fields()
      |> Enum.map(fn field ->
        field = Map.get(fields, field)
        Enum.count(field.ids)
      end)
      |> Enum.reduce(0, fn size, acc ->
        case size > acc do
          true ->
            size

          false ->
            acc
        end
      end)

    %{index | documents_size: size}
  end

  defp transform_documents(%{ref: ref} = index, documents, opts) do
    add_or_ignore_field = fn index, key, fields ->
      case Map.get(fields, key) do
        nil ->
          add_field(index, key)

        %Field{} ->
          index
      end
    end

    documents
    |> Stream.map(&flatten_document/1)
    |> Stream.scan(index, fn document, index ->
      %{fields: fields} = index

      recognized_keys =
        Map.keys(document)
        |> Stream.filter(fn attribute ->
          [field | _tail] = String.split(attribute, ".")
          Map.has_key?(fields, field)
        end)

      Enum.reduce(recognized_keys, index, fn key, index ->
        index = add_or_ignore_field.(index, key, fields)
        field = get_field(index, key)

        field =
          Field.add(field, [%{id: Map.get(document, ref), content: Map.get(document, key)}], opts)

        patch_field(index, key, field)
      end)
    end)
  end

  defp patch_field(%{fields: fields} = index, key, %Field{} = field) do
    %{index | fields: Map.put(fields, key, field)}
  end

  defp flatten_document(document, prefix \\ "") do
    Enum.reduce(document, %{}, fn
      {key, value}, transformed when is_map(value) ->
        mapped = flatten_document(value, "#{prefix}#{key}.")
        Map.merge(transformed, mapped)

      {key, value}, transformed ->
        Map.put(transformed, "#{prefix}#{key}", value)
    end)
  end
end