lib/elasticsearch_api/client.ex

defmodule ElasticsearchApi.Client do
  @moduledoc """
  Provides a HTTP client for Elasticsearch. It leverages the `any_http` library in order to defer
  to the user the choice of the Elixir HTTP client.

  # connect_options: [transport_opts: [verify: :verify_none]]
  """

  require Logger

  ## Typespecs

  @type opts :: keyword()

  @type path :: binary() | URI.t()

  @type status :: pos_integer()

  @type headers :: map() | [{binary(), binary()}]

  @type body :: any()

  @type response :: %{required(:status) => status(), required(:headers) => headers(), required(:body) => body()}

  @type cluster :: %{required(:endpoint) => path(), optional(:headers) => headers()}

  ## Module attributes

  @content_type_keys ~w[Content-Type content-type]

  @default_uri URI.parse("https://elastic:elastic@localhost:9200")

  @default_cluster %{
    endpoint: %{@default_uri | userinfo: nil},
    headers: %{"authorization" => "Basic #{Base.encode64(@default_uri.userinfo)}"}
  }

  @clusters Application.compile_env(:elasticsearch_api, :clusters, @default_cluster)

  ## Public functions

  @spec post(path(), headers(), body(), opts()) :: {:ok, response()}
  def post(path, headers \\ [], body, opts \\ []) do
    {url, headers, body} = prepare_request!(path, headers, body, opts)
    response = AnyHttp.post(url, headers, body, decode_body: false)

    parse_response(response)
  end

  ## Private functions

  @spec prepare_request!(path(), headers(), body(), opts()) :: {path(), headers(), body()}
  defp prepare_request!(path, headers, body, opts) do
    cluster = get_cluster!(opts)
    default_headers = cluster |> Map.get(:headers, []) |> Enum.to_list()
    request_headers = Enum.to_list(headers)
    headers = default_headers |> Enum.concat(request_headers)
    {headers, body} = format_body(headers, body)
    url = cluster |> Map.fetch!(:endpoint) |> URI.merge(path)

    # IO.inspect(URI.to_string(url), label: "URL")

    {url, headers, body}
  end

  @spec parse_response({:ok, AnyHttp.Response.t()}) :: {:ok, response()}
  defp parse_response({:ok, %AnyHttp.Response{} = response}) do
    body = parse_body(response.headers, response.body)

    {:ok, %{status: response.status, headers: response.headers, body: body}}
  end

  @spec get_cluster!(opts()) :: cluster()
  defp get_cluster!(opts) do
    case Keyword.get(opts, :cluster) do
      nil ->
        Logger.warning("No cluster specified, using default cluster")

        Map.get(@clusters, :default, @default_cluster)

      cluster when is_map(cluster) and is_map_key(cluster, :endpoint) ->
        cluster

      cluster_key when is_atom(cluster_key) ->
        if Map.has_key?(@clusters, cluster_key) do
          Map.fetch!(@clusters, cluster_key)
        else
          Logger.warning("Invalid cluster: #{inspect(cluster_key)}, using default cluster")

          Map.get(@clusters, :default, @default_cluster)
        end
    end
  end

  @spec format_body(headers(), body()) :: {headers(), body()}
  defp format_body(headers, body) do
    body =
      if json?(headers) do
        AnyJson.encode!(body)
      else
        body
      end

    {headers, body}
  end

  @spec parse_body(headers(), body()) :: any()
  defp parse_body(headers, body) do
    if json?(headers) do
      AnyJson.decode!(body)
    else
      body
    end
  end

  @spec json?(headers()) :: boolean()
  defp json?(headers) do
    Enum.any?(headers, fn {key, value} ->
      key in @content_type_keys and value == "application/json"
    end)
  end
end