lib/snap/indexes.ex

defmodule Snap.Indexes do
  @moduledoc """
  Helper functions around index management.
  """
  alias Snap
  alias Snap.Bulk
  alias Snap.Cluster.Namespace

  @doc """
  Creates an index.
  """
  @spec create(module(), String.t(), map(), Keyword.t()) :: Snap.Cluster.result()
  def create(cluster, index, mapping, opts \\ []) do
    namespaced_index = Namespace.add_namespace_to_index(index, cluster)
    Snap.put(cluster, "/#{namespaced_index}", mapping, [], [], opts)
  end

  @doc """
  Deletes an index.
  """
  @spec delete(module(), String.t(), Keyword.t()) :: Snap.Cluster.result()
  def delete(cluster, index, opts \\ []) do
    namespaced_index = Namespace.add_namespace_to_index(index, cluster)
    Snap.delete(cluster, "/#{namespaced_index}", [], [], opts)
  end

  @doc """
  Creates and loads a new index, switching the alias to it with zero-downtime.

  Takes an `Enumerable` of `Snap.Bulk` actions, and builds a new index from
  it. Refreshes it, updates the alias to it, and cleans up the old indexes,
  leaving the previous one behind.

  May return `t:Snap.Cluster.error/0` or a `Snap.BulkError` containing a list
  of failed bulk actions.
  """
  @spec hotswap(Enumerable.t(), module(), String.t(), map(), Keyword.t()) ::
          :ok | Snap.Cluster.error() | {:error, Snap.BulkError.t()}
  def hotswap(stream, cluster, alias, mapping, opts \\ []) do
    index = generate_index_name(alias)

    with {:ok, _} <- create(cluster, index, mapping),
         :ok <- Bulk.perform(stream, cluster, index, opts),
         :ok <- refresh(cluster, index),
         :ok <- alias(cluster, index, alias),
         :ok <- cleanup(cluster, alias, 2, opts) do
      :ok
    end
  end

  @doc """
  Refreshes an index.
  """
  @spec refresh(cluster :: module(), index :: String.t(), opts :: Keyword.t()) ::
          :ok | Snap.Cluster.error()
  def refresh(cluster, index, opts \\ []) do
    namespaced_index = Namespace.add_namespace_to_index(index, cluster)

    with {:ok, _} <- Snap.post(cluster, "/#{namespaced_index}/_refresh", nil, [], [], opts) do
      :ok
    end
  end

  @doc """
  Creates an alias for a versioned index, removing any existing aliases.
  """
  @spec alias(module(), String.t(), String.t(), Keyword.t()) :: :ok | Snap.Cluster.error()
  def alias(cluster, index, alias, opts \\ []) do
    with {:ok, indexes} <- list_starting_with(cluster, alias, opts) do
      indexes = Enum.reject(indexes, &(&1 == index))

      namespaced_index = Namespace.add_namespace_to_index(index, cluster)
      namespaced_alias = Namespace.add_namespace_to_index(alias, cluster)

      remove_actions =
        Enum.map(indexes, fn i ->
          %{
            "remove" => %{
              "index" => Namespace.add_namespace_to_index(i, cluster),
              "alias" => namespaced_alias
            }
          }
        end)

      actions = %{
        "actions" =>
          remove_actions ++
            [%{"add" => %{"index" => namespaced_index, "alias" => namespaced_alias}}]
      }

      with {:ok, _response} <- Snap.post(cluster, "/_aliases", actions), do: :ok
    end
  end

  @doc """
  Lists all the indexes in the cluster.
  """
  @spec list(module(), Keyword.t()) :: {:ok, list(String.t())} | Snap.Cluster.error()
  def list(cluster, opts \\ []) do
    namespace = Namespace.index_namespace(cluster)

    with {:ok, indexes} <- Snap.get(cluster, "/_cat/indices", [format: "json"], [], opts) do
      indexes =
        indexes
        |> Enum.map(& &1["index"])
        # Only return indexes inside this namespace
        |> Enum.filter(&String.starts_with?(&1, "#{namespace}-"))
        # Present them without the namespace prefix
        |> Enum.map(&String.trim_leading(&1, "#{namespace}-"))
        |> Enum.sort()

      {:ok, indexes}
    end
  end

  @doc """
  Lists all the timestamp versioned indexes starting with the prefix.
  """
  @spec list_starting_with(module(), String.t(), Keyword.t()) ::
          {:ok, list(String.t())} | Snap.Cluster.error()
  def list_starting_with(cluster, prefix, opts \\ []) do
    namespace = Namespace.index_namespace(cluster)

    with {:ok, indexes} <- Snap.get(cluster, "/_cat/indices", [format: "json"], [], opts) do
      prefix = prefix |> to_string() |> Regex.escape()
      {:ok, regex} = Regex.compile("^#{prefix}-[0-9]+$")

      indexes =
        indexes
        |> Enum.map(& &1["index"])
        # Only return indexes inside this namespace
        |> Enum.filter(&String.starts_with?(&1, "#{namespace}-"))
        # Present them without the namespace prefix
        |> Enum.map(&String.trim_leading(&1, "#{namespace}-"))
        |> Enum.filter(&Regex.match?(regex, &1))
        |> Enum.sort_by(&sort_index_by_timestamp/1)

      {:ok, indexes}
    end
  end

  @doc """
  Deletes older timestamped indexes.
  """
  @spec cleanup(module(), String.t(), non_neg_integer(), Keyword.t()) ::
          :ok | Snap.Cluster.error()
  def cleanup(cluster, alias, preserve \\ 2, opts \\ []) do
    with {:ok, indexes} <- list_starting_with(cluster, alias, opts) do
      indexes
      |> Enum.sort_by(&sort_index_by_timestamp/1, &>=/2)
      |> Enum.drop(preserve)
      |> Enum.reduce_while(:ok, fn index, ok ->
        case delete(cluster, index, opts) do
          {:ok, _} -> {:cont, ok}
          {:error, _} = err -> {:halt, err}
        end
      end)
    end
  end

  defp generate_index_name(alias) do
    ts = generate_alias_timestamp()
    "#{alias}-#{ts}"
  end

  defp generate_alias_timestamp do
    DateTime.to_unix(DateTime.utc_now(), :microsecond)
  end

  defp sort_index_by_timestamp(index) do
    index
    |> String.split("-")
    |> Enum.at(-1)
    |> String.to_integer()
  end
end