Skip to main content

lib/http/event_source/options.ex

defmodule HTTP.EventSource.Options do
  @moduledoc false

  @default_connect_timeout 30_000
  @default_reconnect_time 3_000
  @default_max_reconnect_time 30_000
  @default_max_line_size 64 * 1024

  @string_keys %{
    "connect_timeout" => :connect_timeout,
    "connectTimeout" => :connect_timeout,
    "headers" => :headers,
    "idle_timeout" => :idle_timeout,
    "idleTimeout" => :idle_timeout,
    "last_event_id" => :last_event_id,
    "lastEventId" => :last_event_id,
    "max_line_size" => :max_line_size,
    "maxLineSize" => :max_line_size,
    "max_reconnect_time" => :max_reconnect_time,
    "maxReconnectTime" => :max_reconnect_time,
    "owner" => :owner,
    "reconnect_time" => :reconnect_time,
    "reconnectTime" => :reconnect_time,
    "socket_opts" => :socket_opts,
    "socketOpts" => :socket_opts,
    "ssl" => :ssl,
    "unix_socket" => :unix_socket,
    "unixSocket" => :unix_socket,
    "with_credentials" => :with_credentials,
    "withCredentials" => :with_credentials
  }

  defstruct uri: nil,
            url: nil,
            owner: nil,
            with_credentials: false,
            headers: [],
            last_event_id: "",
            reconnect_time: @default_reconnect_time,
            max_reconnect_time: @default_max_reconnect_time,
            connect_timeout: @default_connect_timeout,
            idle_timeout: :infinity,
            ssl: [],
            socket_opts: [],
            unix_socket: nil,
            max_line_size: @default_max_line_size,
            ref: nil

  @type t :: %__MODULE__{
          uri: URI.t(),
          url: String.t(),
          owner: pid(),
          with_credentials: boolean(),
          headers: [{String.t(), String.t()}],
          last_event_id: String.t(),
          reconnect_time: non_neg_integer(),
          max_reconnect_time: non_neg_integer(),
          connect_timeout: timeout(),
          idle_timeout: timeout(),
          ssl: keyword(),
          socket_opts: keyword(),
          unix_socket: String.t() | nil,
          max_line_size: pos_integer(),
          ref: reference()
        }

  @spec new(String.t() | URI.t(), keyword() | map()) :: {:ok, t()} | {:error, term()}
  def new(url, init \\ []) do
    with {:ok, uri} <- normalize_url(url),
         {:ok, init} <- normalize_init(init) do
      {:ok,
       %__MODULE__{
         uri: uri,
         url: URI.to_string(uri),
         owner: Keyword.get(init, :owner, self()),
         with_credentials: Keyword.get(init, :with_credentials, false),
         headers: Keyword.get(init, :headers, []),
         last_event_id: Keyword.get(init, :last_event_id, ""),
         reconnect_time: Keyword.get(init, :reconnect_time, @default_reconnect_time),
         max_reconnect_time: Keyword.get(init, :max_reconnect_time, @default_max_reconnect_time),
         connect_timeout: Keyword.get(init, :connect_timeout, @default_connect_timeout),
         idle_timeout: Keyword.get(init, :idle_timeout, :infinity),
         ssl: Keyword.get(init, :ssl, []),
         socket_opts: Keyword.get(init, :socket_opts, []),
         unix_socket: Keyword.get(init, :unix_socket),
         max_line_size: Keyword.get(init, :max_line_size, @default_max_line_size),
         ref: Keyword.get(init, :ref, make_ref())
       }}
    end
  end

  defp normalize_url(%URI{} = uri), do: normalize_uri(uri)

  defp normalize_url(url) when is_binary(url) do
    url
    |> URI.parse()
    |> normalize_uri()
  end

  defp normalize_url(_url), do: {:error, :invalid_url}

  defp normalize_uri(%URI{scheme: scheme, host: host} = uri) when is_binary(host) do
    case scheme do
      "http" -> {:ok, uri}
      "https" -> {:ok, uri}
      _ -> {:error, {:unsupported_scheme, scheme}}
    end
  end

  defp normalize_uri(%URI{scheme: scheme}), do: {:error, {:unsupported_scheme, scheme}}

  defp normalize_init(init) when is_map(init) do
    init
    |> Enum.map(fn {key, value} -> {normalize_key(key), value} end)
    |> normalize_init()
  end

  defp normalize_init(init) when is_list(init) do
    with {:ok, headers} <- normalize_headers(Keyword.get(init, :headers, [])),
         {:ok, owner} <- normalize_owner(Keyword.get(init, :owner, self())),
         {:ok, with_credentials} <-
           normalize_boolean(
             Keyword.get(init, :with_credentials, false),
             :invalid_with_credentials
           ),
         {:ok, last_event_id} <- normalize_last_event_id(Keyword.get(init, :last_event_id, "")),
         {:ok, reconnect_time} <-
           normalize_non_neg_integer(
             Keyword.get(init, :reconnect_time, @default_reconnect_time),
             :invalid_reconnect_time
           ),
         {:ok, max_reconnect_time} <-
           normalize_non_neg_integer(
             Keyword.get(init, :max_reconnect_time, @default_max_reconnect_time),
             :invalid_max_reconnect_time
           ),
         {:ok, connect_timeout} <-
           normalize_timeout(
             Keyword.get(init, :connect_timeout, @default_connect_timeout),
             :invalid_connect_timeout
           ),
         {:ok, idle_timeout} <-
           normalize_timeout(Keyword.get(init, :idle_timeout, :infinity), :invalid_idle_timeout),
         {:ok, ssl} <- normalize_keyword(Keyword.get(init, :ssl, []), :invalid_ssl_options),
         {:ok, socket_opts} <-
           normalize_keyword(Keyword.get(init, :socket_opts, []), :invalid_socket_options),
         {:ok, unix_socket} <- normalize_unix_socket(Keyword.get(init, :unix_socket)),
         {:ok, max_line_size} <-
           normalize_pos_integer(
             Keyword.get(init, :max_line_size, @default_max_line_size),
             :invalid_max_line_size
           ) do
      {:ok,
       init
       |> Keyword.put(:headers, headers)
       |> Keyword.put(:owner, owner)
       |> Keyword.put(:with_credentials, with_credentials)
       |> Keyword.put(:last_event_id, last_event_id)
       |> Keyword.put(:reconnect_time, reconnect_time)
       |> Keyword.put(:max_reconnect_time, max_reconnect_time)
       |> Keyword.put(:connect_timeout, connect_timeout)
       |> Keyword.put(:idle_timeout, idle_timeout)
       |> Keyword.put(:ssl, ssl)
       |> Keyword.put(:socket_opts, socket_opts)
       |> Keyword.put(:unix_socket, unix_socket)
       |> Keyword.put(:max_line_size, max_line_size)}
    end
  end

  defp normalize_init(_init), do: {:error, :invalid_options}

  defp normalize_key(key) when is_binary(key), do: Map.get(@string_keys, key, key)
  defp normalize_key(key), do: key

  defp normalize_headers(%HTTP.Headers{headers: headers}), do: normalize_headers(headers)

  defp normalize_headers(headers) when is_map(headers) do
    headers
    |> Map.to_list()
    |> normalize_headers()
  end

  defp normalize_headers(headers) when is_list(headers) do
    headers =
      Enum.map(headers, fn {name, value} ->
        {HTTP.Headers.normalize_name(to_string(name)), to_string(value)}
      end)

    {:ok, headers}
  rescue
    _error -> {:error, :invalid_headers}
  end

  defp normalize_headers(_headers), do: {:error, :invalid_headers}

  defp normalize_owner(owner) when is_pid(owner), do: {:ok, owner}
  defp normalize_owner(_owner), do: {:error, :invalid_owner}

  defp normalize_boolean(value, _error) when is_boolean(value), do: {:ok, value}
  defp normalize_boolean(_value, error), do: {:error, error}

  defp normalize_last_event_id(value) when is_binary(value) do
    if binary_part_contains?(value, [<<0>>, "\n", "\r"]) do
      {:error, :invalid_last_event_id}
    else
      {:ok, value}
    end
  end

  defp normalize_last_event_id(_value), do: {:error, :invalid_last_event_id}

  defp normalize_timeout(:infinity, _error), do: {:ok, :infinity}
  defp normalize_timeout(value, error), do: normalize_pos_integer(value, error)

  defp normalize_pos_integer(value, _error) when is_integer(value) and value > 0, do: {:ok, value}
  defp normalize_pos_integer(_value, error), do: {:error, error}

  defp normalize_non_neg_integer(value, _error) when is_integer(value) and value >= 0 do
    {:ok, value}
  end

  defp normalize_non_neg_integer(_value, error), do: {:error, error}

  defp normalize_keyword(value, _error) when is_list(value), do: {:ok, value}
  defp normalize_keyword(_value, error), do: {:error, error}

  defp normalize_unix_socket(nil), do: {:ok, nil}
  defp normalize_unix_socket(value) when is_binary(value), do: {:ok, value}
  defp normalize_unix_socket(_value), do: {:error, :invalid_unix_socket}

  defp binary_part_contains?(value, parts) do
    Enum.any?(parts, fn part -> :binary.match(value, part) != :nomatch end)
  end
end