lib/influx_ex/http/request.ex

defmodule InfluxEx.HTTP.Request do
  @moduledoc """
  A request to InfluxDB
  """

  alias InfluxEx.{ConflictError, GenericError, InvalidPayloadError, NotFoundError}
  alias InfluxEx.HTTP.Response

  @type url() :: binary()

  @type headers() :: [{binary(), binary()}]

  @type request_handler() :: (Response.t() -> term() | {:error, GenericError.t()})

  @type payload() :: binary() | map()

  @type method() :: :get | :post | :delete

  @type t() :: %__MODULE__{
          method: method(),
          endpoint: binary(),
          payload: payload() | nil,
          query_params: map(),
          handler: request_handler() | nil
        }

  @type opt() ::
          {:payload, term()}
          | {:method, method()}
          | {:query_params, map()}
          | {:handler, request_handler()}

  defstruct endpoint: nil, payload: nil, method: :get, query_params: %{}, handler: nil

  @doc """
  Make a new `Request.t()`
  """
  @spec new(binary(), [opt()]) :: t()
  def new(endpoint, opts \\ []) do
    payload = opts[:payload]
    method = opts[:method] || :get
    query_params = opts[:query_params] || %{}

    %__MODULE__{
      endpoint: endpoint,
      payload: payload,
      method: method,
      query_params: query_params,
      handler: opts[:handler]
    }
  end

  def run(request, client) do
    with {:ok, response} <- send_request(request, client),
         {:ok, response} <- parse_body(request, response, client) do
      if request.handler do
        case request.handler.(response) do
          :ok -> :ok
          {:ok, _} = ok -> ok
          {:error, _reason} = error -> error
        end
      else
        :ok
      end
    end
  end

  defp parse_body(_, %{status_code: 204} = response, _client) do
    {:ok, response}
  end

  defp parse_body(%{endpoint: endpoint}, response, _client)
       when endpoint in ["/health", "/write"] do
    {:ok, response}
  end

  defp parse_body(%{endpoint: "/query"}, %{body: body} = response, _client) when is_list(body) do
    {:ok, response}
  end

  defp parse_body(%{endpoint: "/query"}, %{body: body} = response, client) when is_binary(body) do
    parsed = client.csv_library.parse_string(body)
    {:ok, %{response | body: parsed}}
  end

  defp parse_body(_request, %{body: body} = response, _client) when is_map(body) do
    {:ok, response}
  end

  defp parse_body(_request, response, client) do
    case client.json_library.decode(response.body) do
      {:ok, parsed} ->
        {:ok, %{response | body: parsed}}

      {:error, error} ->
        {:error,
         GenericError.exception(
           "Error parsing response json: #{inspect(response.body)}, reason: #{get_json_parse_error(error)}"
         )}
    end
  end

  defp get_json_parse_error(error) when is_atom(error) do
    "#{inspect(error)}"
  end

  defp get_json_parse_error(error) when is_struct(error) do
    map = Map.from_struct(error)

    case map[:message] do
      nil ->
        "unsupported reason"

      message ->
        message
    end
  end

  defp make_url(host, port, %{endpoint: endpoint}) when endpoint in ["/health"] do
    "#{host}:#{port}/#{endpoint}"
  end

  defp make_url(host, port, request) do
    params = URI.encode_query(request.query_params)

    "#{host}:#{port}/api/v2#{request.endpoint}?#{params}"
  end

  defp send_request(request, client) do
    url = make_url(client.host, client.port, request)

    case do_send_request(client, request, url, headers(client.token, request.endpoint)) do
      {:ok, %{status_code: status_code} = resp} when status_code < 400 ->
        {:ok, resp}

      {:ok, %{status_code: status_code} = resp} when status_code >= 400 ->
        handle_http_error(client, resp)

      error ->
        error
    end
  end

  defp do_send_request(client, %__MODULE__{method: :post, endpoint: endpoint} = req, url, headers)
       when endpoint in ["/write", "/query"] do
    client.http_client.send_request(:post, url, headers, req.payload)
  end

  defp do_send_request(client, %__MODULE__{method: :post} = req, url, headers) do
    payload = client.json_library.encode!(req.payload)
    client.http_client.send_request(:post, url, headers, payload)
  end

  defp do_send_request(client, %__MODULE__{method: method}, url, headers) do
    client.http_client.send_request(method, url, headers, nil)
  end

  defp headers(token, endpoint) do
    [
      {"Authorization", "Token #{token}"}
    ]
    |> headers_for_endpoint(endpoint)
  end

  defp headers_for_endpoint(headers, "/write") do
    headers ++ [{"Content-type", "text/plain; charset=utf-8"}, {"Accept", "application/json"}]
  end

  defp headers_for_endpoint(headers, "/query") do
    headers ++ [{"Content-type", "application/vnd.flux"}, {"Accept", "application/csv"}]
  end

  defp headers_for_endpoint(headers, _other) do
    headers ++ [{"Content-type", "application/json"}]
  end

  defp handle_http_error(_client, %{body: body}) when is_map(body) do
    {:error, make_exception(body)}
  end

  defp handle_http_error(client, %{body: body}) when is_binary(body) do
    {:ok, decoded} = client.json_library.decode(body)

    {:error, make_exception(decoded)}
  end

  defp make_exception(%{"code" => "conflict", "message" => message}) do
    ConflictError.exception(message)
  end

  defp make_exception(%{"code" => "invalid", "message" => message}) do
    InvalidPayloadError.exception(message)
  end

  defp make_exception(%{"code" => "not found", "message" => message}) do
    [resource | _] = String.split(message, " ")
    NotFoundError.exception(resource)
  end

  defp make_exception(%{"code" => code, "message" => message}) do
    GenericError.exception("error code: #{inspect(code)}, message: #{message}")
  end
end