Skip to main content

lib/http/event_source/connection.ex

defmodule HTTP.EventSource.Connection do
  @moduledoc false

  use GenServer

  alias HTTP.EventSource
  alias HTTP.EventSource.Event.Error
  alias HTTP.EventSource.Event.Message
  alias HTTP.EventSource.Event.Open
  alias HTTP.EventSource.Options
  alias HTTP.EventSource.Parser
  alias HTTP.EventSource.Telemetry

  @connecting EventSource.connecting()
  @open EventSource.open()
  @closed EventSource.closed()

  defstruct owner: nil,
            target: nil,
            uri: nil,
            url: nil,
            headers: [],
            with_credentials: false,
            connect_timeout: 30_000,
            idle_timeout: :infinity,
            ssl: [],
            socket_opts: [],
            unix_socket: nil,
            max_line_size: 64 * 1024,
            transport: nil,
            socket: nil,
            http1: nil,
            parser: nil,
            ready_state: @connecting,
            last_event_id: "",
            reconnect_time: 3_000,
            max_reconnect_time: 30_000,
            reconnect_timer: nil,
            idle_timer: nil,
            attempt: 0,
            connect_started_at: nil

  @spec start_link(Options.t()) :: GenServer.on_start()
  def start_link(%Options{} = options) do
    GenServer.start_link(__MODULE__, options)
  end

  def child_spec(%Options{ref: ref} = options) do
    %{
      id: {__MODULE__, ref},
      start: {__MODULE__, :start_link, [options]},
      restart: :temporary,
      type: :worker
    }
  end

  @impl true
  def init(%Options{} = options) do
    target = %EventSource{
      pid: self(),
      ref: options.ref,
      url: options.url,
      with_credentials: options.with_credentials
    }

    state = %__MODULE__{
      owner: options.owner,
      target: target,
      uri: options.uri,
      url: options.url,
      headers: options.headers,
      with_credentials: options.with_credentials,
      connect_timeout: options.connect_timeout,
      idle_timeout: options.idle_timeout,
      ssl: options.ssl,
      socket_opts: options.socket_opts,
      unix_socket: options.unix_socket,
      max_line_size: options.max_line_size,
      last_event_id: options.last_event_id,
      reconnect_time: options.reconnect_time,
      max_reconnect_time: options.max_reconnect_time
    }

    {:ok, state, {:continue, :connect}}
  end

  @impl true
  def handle_continue(:connect, state), do: {:noreply, connect(state)}

  @impl true
  def handle_call(:ready_state, _from, state), do: {:reply, state.ready_state, state}
  def handle_call(:last_event_id, _from, state), do: {:reply, state.last_event_id, state}
  def handle_call(:reconnect_time, _from, state), do: {:reply, state.reconnect_time, state}

  def handle_call(:close, _from, state) do
    state = close_state(state, :closed)
    {:stop, :normal, :ok, state}
  end

  @impl true
  def handle_info(:reconnect, state), do: {:noreply, connect(%{state | reconnect_timer: nil})}

  def handle_info(:idle_timeout, state) do
    {:noreply, reconnect(state, :idle_timeout)}
  end

  def handle_info(message, %{transport: transport, socket: socket} = state)
      when not is_nil(transport) and not is_nil(socket) do
    case transport.normalize_message(message, socket) do
      {:data, data} -> handle_socket_data(state, data)
      :closed -> handle_transport_closed(state)
      {:error, reason} -> {:noreply, reconnect(state, reason)}
      :unknown -> {:noreply, state}
    end
  end

  def handle_info(_message, state), do: {:noreply, state}

  defp connect(state) do
    state = close_transport(cancel_idle_timer(state))
    started_at = System.monotonic_time(:microsecond)
    Telemetry.connect_start(state.uri)

    state =
      %{
        state
        | ready_state: @connecting,
          http1: HTTP.HTTP1.new(:get),
          parser:
            Parser.new(max_line_size: state.max_line_size, last_event_id: state.last_event_id),
          attempt: state.attempt + 1,
          connect_started_at: started_at
      }

    case open_transport(state) do
      {:ok, transport, socket} ->
        case send_request(transport, socket, state) do
          :ok ->
            case transport.setopts(socket, active: :once) do
              :ok ->
                %{state | transport: transport, socket: socket}

              {:error, reason} ->
                connect_exception(state, reason, started_at)
                reconnect(%{state | transport: transport, socket: socket}, reason)
            end

          {:error, reason} ->
            connect_exception(state, reason, started_at)
            reconnect(%{state | transport: transport, socket: socket}, reason)
        end

      {:error, reason} ->
        connect_exception(state, reason, started_at)
        reconnect(state, reason)
    end
  end

  defp open_transport(state) do
    with {:ok, transport, host, port} <- select_transport(state),
         {:ok, socket} <-
           transport.connect(
             host,
             port,
             [ssl: state.ssl, socket_opts: state.socket_opts],
             state.connect_timeout
           ) do
      {:ok, transport, socket}
    end
  end

  defp select_transport(%{unix_socket: unix_socket}) when is_binary(unix_socket) do
    {:ok, HTTP.Transport.Unix, unix_socket, 0}
  end

  defp select_transport(%{uri: %URI{scheme: "http", host: host, port: port}}) do
    {:ok, HTTP.Transport.TCP, host, port || 80}
  end

  defp select_transport(%{uri: %URI{scheme: "https", host: host, port: port}}) do
    {:ok, HTTP.Transport.SSL, host, port || 443}
  end

  defp select_transport(%{uri: %URI{scheme: scheme}}), do: {:error, {:unsupported_scheme, scheme}}

  defp send_request(transport, socket, state) do
    transport.send(socket, request_iodata(state))
  end

  defp request_iodata(state) do
    headers =
      state.headers
      |> HTTP.Headers.new()
      |> HTTP.Headers.set_default("User-Agent", HTTP.Headers.user_agent(:http_event_source))
      |> HTTP.Headers.set_default("Accept", "text/event-stream")
      |> HTTP.Headers.set_default("Cache-Control", "no-cache")
      |> maybe_set_last_event_id(state.last_event_id)

    %HTTP.Request{method: :get, url: state.uri, headers: headers}
    |> HTTP.Request.to_iodata()
  end

  defp maybe_set_last_event_id(headers, ""), do: headers

  defp maybe_set_last_event_id(headers, id) do
    headers = HTTP.Headers.delete(headers, "Last-Event-ID")
    %{headers | headers: headers.headers ++ [{"Last-Event-ID", id}]}
  end

  defp handle_socket_data(state, data) do
    state = reset_idle_timer(state)

    case HTTP.HTTP1.stream(state.http1, data) do
      {:ok, http1, events} ->
        state = %{state | http1: http1}
        handle_http_events(state, events)

      {:error, reason} ->
        {:stop, :normal, fatal(state, reason)}
    end
  end

  defp handle_transport_closed(state) do
    case HTTP.HTTP1.close(state.http1) do
      {:ok, http1, events} ->
        state = %{state | http1: http1}

        case handle_http_events(state, events) do
          {:noreply, %{ready_state: @open} = state} -> {:noreply, reconnect(state, :eof)}
          {:noreply, state} -> {:noreply, state}
          {:stop, _reason, state} -> {:stop, :normal, state}
        end

      {:error, reason} ->
        {:noreply, reconnect(state, reason)}
    end
  end

  defp handle_http_events(state, events) do
    Enum.reduce_while(events, {:cont, state}, fn event, {:cont, acc} ->
      case handle_http_event(acc, event) do
        {:cont, next} -> {:cont, {:cont, next}}
        {:halt, next} -> {:halt, {:halt, next}}
      end
    end)
    |> case do
      {:cont, state} -> rearm(state)
      {:halt, %{ready_state: @closed} = state} -> {:stop, :normal, state}
      {:halt, state} -> {:noreply, state}
    end
  end

  defp handle_http_event(state, {:headers, status, headers}) do
    case validate_response(status, headers) do
      :ok ->
        duration = System.monotonic_time(:microsecond) - state.connect_started_at
        Telemetry.connect_stop(state.uri, status, duration)
        emit(state, %Open{target: state.target})

        {:cont,
         %{
           state
           | ready_state: @open,
             attempt: 0,
             idle_timer: schedule_idle_timer(state)
         }}

      {:stop, reason} ->
        {:halt, fatal(state, reason)}

      {:error, reason} ->
        {:halt, fatal(state, reason)}
    end
  end

  defp handle_http_event(%{ready_state: @open} = state, {:body, chunk}) do
    case Parser.parse(state.parser, chunk) do
      {:ok, parser, events} ->
        state = %{state | parser: parser}
        {:cont, handle_parser_events(state, events)}

      {:error, reason} ->
        {:halt, fatal(state, reason)}
    end
  end

  defp handle_http_event(state, {:body, _chunk}), do: {:cont, state}

  defp handle_http_event(%{ready_state: @open} = state, :done) do
    case Parser.close(state.parser) do
      {:ok, parser, events} ->
        state = %{state | parser: parser}
        {:halt, reconnect(handle_parser_events(state, events), :eof)}

      {:error, reason} ->
        {:halt, fatal(state, reason)}
    end
  end

  defp handle_http_event(state, :done), do: {:halt, reconnect(state, :eof)}

  defp validate_response(204, _headers), do: {:stop, {:http_status, 204}}

  defp validate_response(200, headers) do
    case HTTP.Headers.get(headers, "content-type") do
      nil ->
        {:error, :invalid_content_type}

      content_type ->
        {media_type, _params} = HTTP.Headers.parse_content_type(content_type)

        if String.downcase(media_type) == "text/event-stream" do
          :ok
        else
          {:error, :invalid_content_type}
        end
    end
  end

  defp validate_response(status, _headers), do: {:error, {:http_status, status}}

  defp handle_parser_events(state, events) do
    Enum.reduce(events, state, &handle_parser_event(&2, &1))
  end

  defp handle_parser_event(state, {:event, type, data, last_event_id}) do
    Telemetry.message_received(state.uri, type, last_event_id, byte_size(data))

    emit(state, %Message{
      target: state.target,
      type: type,
      data: data,
      origin: origin(state),
      last_event_id: last_event_id
    })

    %{state | last_event_id: last_event_id}
  end

  defp handle_parser_event(state, {:retry, reconnect_time}) do
    %{state | reconnect_time: min(reconnect_time, state.max_reconnect_time)}
  end

  defp handle_parser_event(state, {:last_event_id, last_event_id}) do
    %{state | last_event_id: last_event_id}
  end

  defp rearm(%{ready_state: @closed} = state), do: {:noreply, state}

  defp rearm(%{transport: transport, socket: socket} = state) do
    case transport.setopts(socket, active: :once) do
      :ok -> {:noreply, state}
      {:error, reason} -> {:noreply, reconnect(state, reason)}
    end
  end

  defp reconnect(%{ready_state: @closed} = state, _reason), do: state

  defp reconnect(state, reason) do
    emit(state, %Error{target: state.target, reason: reason})
    Telemetry.reconnect_start(state.uri, reason, state.reconnect_time, state.attempt)

    state =
      state
      |> cancel_idle_timer()
      |> close_transport()
      |> cancel_reconnect_timer()

    timer = Process.send_after(self(), :reconnect, state.reconnect_time)
    %{state | ready_state: @connecting, reconnect_timer: timer}
  end

  defp fatal(state, reason) do
    if state.ready_state == @connecting do
      connect_started_at = state.connect_started_at || System.monotonic_time(:microsecond)
      connect_exception(state, reason, connect_started_at)
    end

    emit(state, %Error{target: state.target, reason: reason})
    close_state(state, reason)
  end

  defp close_state(state, reason) do
    state =
      state
      |> cancel_idle_timer()
      |> cancel_reconnect_timer()
      |> close_transport()

    Telemetry.close_stop(state.uri, reason)
    %{state | ready_state: @closed}
  end

  defp close_transport(%{transport: nil} = state), do: %{state | socket: nil}
  defp close_transport(%{socket: nil} = state), do: state

  defp close_transport(state) do
    _ = state.transport.close(state.socket)
    %{state | socket: nil, transport: nil}
  end

  defp cancel_reconnect_timer(%{reconnect_timer: nil} = state), do: state

  defp cancel_reconnect_timer(%{reconnect_timer: timer} = state) do
    _ = Process.cancel_timer(timer)
    %{state | reconnect_timer: nil}
  end

  defp schedule_idle_timer(%{idle_timeout: :infinity}), do: nil

  defp schedule_idle_timer(%{idle_timeout: timeout}),
    do: Process.send_after(self(), :idle_timeout, timeout)

  defp reset_idle_timer(%{ready_state: @open} = state) do
    state
    |> cancel_idle_timer()
    |> then(fn state -> %{state | idle_timer: schedule_idle_timer(state)} end)
  end

  defp reset_idle_timer(state), do: state

  defp cancel_idle_timer(%{idle_timer: nil} = state), do: state

  defp cancel_idle_timer(%{idle_timer: timer} = state) do
    _ = Process.cancel_timer(timer)
    %{state | idle_timer: nil}
  end

  defp connect_exception(state, reason, started_at) do
    duration = System.monotonic_time(:microsecond) - started_at
    Telemetry.connect_exception(state.uri, reason, duration)
  end

  defp origin(state) do
    port =
      case state.uri.port do
        nil -> ""
        port -> ":" <> Integer.to_string(port)
      end

    state.uri.scheme <> "://" <> state.uri.host <> port
  end

  defp emit(state, event) do
    send(state.owner, {EventSource, state.target, event})
  end
end