lib/ice_agent.ex

defmodule ExICE.ICEAgent do
  @moduledoc """
  ICE Agent.

  Not to be confused with Elixir Agent.
  """
  use GenServer

  require Logger

  alias ExICE.{
    Candidate,
    CandidatePair,
    Checklist,
    ControlledHandler,
    ControllingHandler,
    Gatherer
  }

  alias ExICE.Attribute.{ICEControlling, ICEControlled, Priority, UseCandidate}

  alias ExSTUN.Message
  alias ExSTUN.Message.Type
  alias ExSTUN.Message.Attribute.{ErrorCode, Username, XORMappedAddress}

  # Ta timeout in ms
  @ta_timeout 50

  # transaction timeout in ms
  # see appendix B.1
  @hto 500

  @conn_check_handler %{controlling: ControllingHandler, controlled: ControlledHandler}

  @typedoc """
  ICE agent role.

  `:controlling` agent is responsible for nominating a pair.
  """
  @type role() :: :controlling | :controlled

  @typedoc """
  Emitted when gathering process state has changed.

  For exact meaning refer to the W3C WebRTC standard, sec 5.6.3.
  """
  @type gathering_state_change() :: {:gathering_state_change, :new | :gathering | :complete}

  @typedoc """
  Emitted when connection state has changed.

  For exact meaning refer to the W3C WebRTC standard, sec. 5.6.4.
  """
  @type connection_state_change() ::
          {:connection_state_change, :checking | :connected | :completed | :failed}

  @typedoc """
  Messages sent by the ExICE.
  """
  @type signal() ::
          {:ex_ice, pid(),
           gathering_state_change()
           | connection_state_change()
           | {:data, binary()}
           | {:new_candidate, binary()}}

  @typedoc """
  ICE Agent configuration options.
  All notifications are by default sent to a process that spawns `ExICE`.
  This behavior can be overwritten using the following options.

  * `ip_filter` - filter applied when gathering local candidates
  * `stun_servers` - list of STUN servers
  * `on_gathering_state_change` - where to send gathering state change notifications. Defaults to a process that spawns `ExICE`.
  * `on_connection_state_change` - where to send connection state change notifications. Defaults to a process that spawns `ExICE`.
  * `on_data` - where to send data. Defaults to a process that spawns `ExICE`.
  * `on_new_candidate` - where to send new candidates. Defaults to a process that spawns `ExICE`.

  Currently, there is no support for local relay (TURN) candidates
  however, remote relay candidates work correctly.
  """
  @type opts() :: [
          ip_filter: (:inet.ip_address() -> boolean),
          stun_servers: [String.t()],
          on_gathering_state_change: pid() | nil,
          on_connection_state_change: pid() | nil,
          on_data: pid() | nil,
          on_new_candidate: pid() | nil
        ]

  defguardp are_pairs_equal(p1, p2)
            when p1.local_cand.base_address == p2.local_cand.base_address and
                   p1.local_cand.base_port == p2.local_cand.base_port and
                   p1.local_cand.address == p2.local_cand.address and
                   p1.local_cand.port == p2.local_cand.port and
                   p1.remote_cand.address == p2.remote_cand.address and
                   p1.remote_cand.port == p2.remote_cand.port

  defguardp is_response(class) when class in [:success_response, :error_response]

  @doc """
  Starts and links a new ICE agent.

  Process calling this function is called a `controlling process` and
  has to be prepared for receiving ExICE messages described by `t:signal/0`.
  """
  @spec start_link(role(), opts()) :: GenServer.on_start()
  def start_link(role, opts \\ []) do
    GenServer.start_link(__MODULE__, opts ++ [role: role, controlling_process: self()])
  end

  @doc """
  Configures where to send gathering state change notifications.
  """
  @spec on_gathering_state_change(pid(), pid() | nil) :: :ok
  def on_gathering_state_change(ice_agent, send_to) do
    GenServer.call(ice_agent, {:on_gathering_state_change, send_to})
  end

  @doc """
  Configures where to send connection state change notifications.
  """
  @spec on_connection_state_change(pid(), pid() | nil) :: :ok
  def on_connection_state_change(ice_agent, send_to) do
    GenServer.call(ice_agent, {:on_connection_state_change, send_to})
  end

  @doc """
  Configures where to send data.
  """
  @spec on_data(pid(), pid() | nil) :: :ok
  def on_data(ice_agent, send_to) do
    GenServer.call(ice_agent, {:on_data, send_to})
  end

  @doc """
  Configures where to send new candidates.
  """
  @spec on_new_candidate(pid(), pid() | nil) :: :ok
  def on_new_candidate(ice_agent, send_to) do
    GenServer.call(ice_agent, {:on_new_candidate, send_to})
  end

  @doc """
  Gets local credentials.

  They remain unchanged until ICE restart.
  """
  @spec get_local_credentials(pid()) :: {:ok, ufrag :: binary(), pwd :: binary()}
  def get_local_credentials(ice_agent) do
    GenServer.call(ice_agent, :get_local_credentials)
  end

  @doc """
  Sets remote credentials.

  Call to this function is mandatory to start connectivity checks.
  """
  @spec set_remote_credentials(pid(), binary(), binary()) :: :ok
  def set_remote_credentials(ice_agent, ufrag, passwd)
      when is_binary(ufrag) and is_binary(passwd) do
    GenServer.cast(ice_agent, {:set_remote_credentials, ufrag, passwd})
  end

  @doc """
  Starts ICE gathering process.

  Once a new candidate is discovered, it is sent as a message to the controlling process.
  See `t:signal/0` for a message structure.
  """
  @spec gather_candidates(pid()) :: :ok
  def gather_candidates(ice_agent) do
    GenServer.cast(ice_agent, :gather_candidates)
  end

  @doc """
  Adds a remote candidate.

  If an ICE agent has already gathered any local candidates and
  have remote credentials set, adding a remote candidate will start
  connectivity checks.
  """
  @spec add_remote_candidate(pid(), String.t()) :: :ok
  def add_remote_candidate(ice_agent, candidate) when is_binary(candidate) do
    GenServer.cast(ice_agent, {:add_remote_candidate, candidate})
  end

  @doc """
  Informs ICE agent that a remote side finished its gathering process.

  Call to this function is mandatory to nominate a pair (when an agent is the `controlling` one)
  and in turn move to the `completed` state.
  """
  @spec end_of_candidates(pid()) :: :ok
  def end_of_candidates(ice_agent) do
    GenServer.cast(ice_agent, :end_of_candidates)
  end

  @doc """
  Sends data.

  Can only be called after moving to the `connected` state.
  """
  @spec send_data(pid(), binary()) :: :ok
  def send_data(ice_agent, data) when is_binary(data) do
    GenServer.cast(ice_agent, {:send_data, data})
  end

  @doc """
  Restarts ICE.

  If there were any valid pairs in the previous ICE session,
  data can still be sent.
  """
  @spec restart(pid()) :: :ok
  def restart(ice_agent) do
    GenServer.cast(ice_agent, :restart)
  end

  @doc """
  Stops ICE agent and all of its sockets.
  """
  @spec stop(pid()) :: :ok
  def stop(ice_agent) do
    GenServer.stop(ice_agent)
  end

  ### Server

  @impl true
  def init(opts) do
    stun_servers =
      opts
      |> Keyword.get(:stun_servers, [])
      |> Enum.map(fn stun_server ->
        case ExICE.URI.parse(stun_server) do
          {:ok, stun_server} ->
            stun_server

          :error ->
            Logger.warning("""
            Couldn't parse STUN server URI: #{inspect(stun_server)}. \
            Ignoring.\
            """)

            nil
        end
      end)
      |> Enum.reject(&(&1 == nil))

    {local_ufrag, local_pwd} = generate_credentials()

    controlling_process = Keyword.fetch!(opts, :controlling_process)

    state = %{
      state: :new,
      controlling_process: controlling_process,
      on_connection_state_change: opts[:on_connection_state_change] || controlling_process,
      on_gathering_state_change: opts[:on_gathering_state_change] || controlling_process,
      on_data: opts[:on_data] || controlling_process,
      on_new_candidate: opts[:on_new_candidate] || controlling_process,
      ta_timer: nil,
      gathering_transactions: %{},
      ip_filter: opts[:ip_filter],
      role: Keyword.fetch!(opts, :role),
      tiebreaker: generate_tiebreaker(),
      checklist: %{},
      selected_pair: nil,
      prev_selected_pair: nil,
      prev_valid_pairs: [],
      conn_checks: %{},
      gathering_state: :new,
      eoc: false,
      # {did we nominate pair, pair id}
      nominating?: {false, nil},
      local_ufrag: local_ufrag,
      local_pwd: local_pwd,
      local_cands: [],
      remote_ufrag: nil,
      remote_pwd: nil,
      remote_cands: [],
      stun_servers: stun_servers,
      turn_servers: []
    }

    {:ok, state}
  end

  @impl true
  def handle_call({:on_gathering_state_change, send_to}, _from, state) do
    {:reply, :ok, %{state | on_gathering_state_change: send_to}}
  end

  @impl true
  def handle_call({:on_connection_state_change, send_to}, _from, state) do
    {:reply, :ok, %{state | on_connection_state_change: send_to}}
  end

  @impl true
  def handle_call({:on_data, send_to}, _from, state) do
    {:reply, :ok, %{state | on_data: send_to}}
  end

  @impl true
  def handle_call({:on_new_candidate, send_to}, _from, state) do
    {:reply, :ok, %{state | on_new_candidate: send_to}}
  end

  @impl true
  def handle_call(:get_local_credentials, _from, state) do
    {:reply, {:ok, state.local_ufrag, state.local_pwd}, state}
  end

  @impl true
  def handle_cast(
        {:set_remote_credentials, ufrag, pwd},
        %{remote_ufrag: nil, remote_pwd: nil} = state
      ) do
    Logger.debug("Setting remote credentials: #{inspect(ufrag)}:#{inspect(pwd)}")
    state = %{state | remote_ufrag: ufrag, remote_pwd: pwd}
    {:noreply, state}
  end

  @impl true
  def handle_cast(
        {:set_remote_credentials, ufrag, pwd},
        %{remote_ufrag: ufrag, remote_pwd: pwd} = state
      ) do
    Logger.warning("Passed the same remote credentials to be set. Ignoring.")
    {:noreply, state}
  end

  @impl true
  def handle_cast({:set_remote_credentials, ufrag, pwd}, state) do
    Logger.debug("New remote credentials different than the current ones. Restarting ICE")
    state = do_restart(state)
    state = %{state | remote_ufrag: ufrag, remote_pwd: pwd}
    {:noreply, state}
  end

  @impl true
  def handle_cast(:gather_candidates, %{gathering_state: :gathering} = state) do
    Logger.warning("Can't gather candidates. Gathering already in progress. Ignoring.")
    {:noreply, state}
  end

  @impl true
  def handle_cast(:gather_candidates, %{gathering_state: :complete} = state) do
    Logger.warning("Can't gather candidates. ICE restart needed. Ignoring.")
    {:noreply, state}
  end

  @impl true
  def handle_cast(:gather_candidates, %{gathering_state: :new} = state) do
    Logger.debug("Gathering state change: #{state.gathering_state} -> gathering")
    notify(state.on_gathering_state_change, {:gathering_state_change, :gathering})
    state = %{state | gathering_state: :gathering}

    {:ok, host_candidates} = Gatherer.gather_host_candidates(ip_filter: state.ip_filter)

    for cand <- host_candidates do
      notify(state.on_new_candidate, {:new_candidate, Candidate.marshal(cand)})
    end

    # TODO should we override?
    state = %{state | local_cands: state.local_cands ++ host_candidates}

    gathering_transactions =
      for stun_server <- state.stun_servers, host_cand <- host_candidates, into: %{} do
        <<t_id::12*8>> = :crypto.strong_rand_bytes(12)

        t = %{
          t_id: t_id,
          host_cand: host_cand,
          stun_server: stun_server,
          send_time: nil,
          state: :waiting
        }

        {t_id, t}
      end

    state =
      %{state | gathering_transactions: gathering_transactions}
      |> update_gathering_state()
      |> update_ta_timer()

    {:noreply, state}
  end

  @impl true
  def handle_cast({:add_remote_candidate, _remote_cand}, %{eoc: true} = state) do
    Logger.warning("Received remote candidate after end-of-candidates. Ignoring.")
    {:noreply, state}
  end

  @impl true
  def handle_cast({:add_remote_candidate, remote_cand}, state) do
    Logger.debug("New remote candidate: #{inspect(remote_cand)}")

    case Candidate.unmarshal(remote_cand) do
      {:ok, remote_cand} ->
        state = do_add_remote_candidate(remote_cand, state)
        Logger.debug("Successfully added remote candidate.")
        state = update_connection_state(state)
        state = update_ta_timer(state)
        {:noreply, state}

      {:error, reason} ->
        Logger.warning("Invalid remote candidate, reason: #{inspect(reason)}. Ignoring.")
        {:noreply, state}
    end
  end

  @impl true
  def handle_cast(:end_of_candidates, %{role: :controlled} = state) do
    state = %{state | eoc: true}
    # we might need to move to the completed state
    state = update_connection_state(state)
    {:noreply, state}
  end

  @impl true
  def handle_cast(:end_of_candidates, %{role: :controlling} = state) do
    state = %{state | eoc: true}
    # check wheter it's time to nominate and if yes, try noimnate
    state = maybe_nominate(state)
    {:noreply, state}
  end

  @impl true
  def handle_cast({:send_data, data}, %{state: ice_state} = state)
      when ice_state in [:connected, :completed] do
    %CandidatePair{} =
      pair =
      state.selected_pair ||
        Checklist.get_valid_pair(state.checklist) ||
        state.prev_selected_pair ||
        List.first(state.prev_valid_pairs)

    dst = {pair.remote_cand.address, pair.remote_cand.port}
    do_send(pair.local_cand.socket, dst, data)
    {:noreply, state}
  end

  @impl true
  def handle_cast({:send_data, _data}, %{state: ice_state} = state) do
    Logger.warning("""
    Cannot send data in ICE state: #{inspect(ice_state)}. \
    Data can only be sent in state :connected or :completed. Ignoring.\
    """)

    {:noreply, state}
  end

  @impl true
  def handle_cast(:restart, state) do
    Logger.debug("Restarting ICE")
    state = do_restart(state)
    {:noreply, state}
  end

  @impl true
  def handle_info(:ta_timeout, %{remote_ufrag: nil, remote_pwd: nil} = state) do
    # TODO we can do this better i.e.
    # allow for executing gathering transactions
    Logger.debug("Ta timer fired but there are no remote credentials. Scheduling next check")
    ta_timer = Process.send_after(self(), :ta_timeout, @ta_timeout)
    state = %{state | ta_timer: ta_timer}
    state = update_ta_timer(state)
    {:noreply, state}
  end

  @impl true
  def handle_info(:ta_timeout, state) when state.state in [:completed, :failed] do
    Logger.warning("""
    Ta timer fired in unexpected state: #{state.state}.
    Trying to update gathering and connection states.
    """)

    state =
      state
      |> update_gathering_state()
      |> update_connection_state()
      |> update_ta_timer()

    {:noreply, state}
  end

  @impl true
  def handle_info(:ta_timeout, state) do
    state =
      state
      |> timeout_pending_transactions()
      |> update_gathering_state()
      |> update_connection_state()
      |> maybe_nominate()

    if state.state in [:completed, :failed] do
      state = update_ta_timer(state)
      {:noreply, state}
    else
      {transaction_executed, state} =
        case Checklist.get_next_pair(state.checklist) do
          %CandidatePair{} = pair ->
            Logger.debug("Sending conn check on pair: #{inspect(pair.id)}")
            {pair, state} = send_conn_check(pair, state)
            state = put_in(state, [:checklist, pair.id], pair)
            {true, state}

          nil ->
            case get_next_gathering_transaction(state.gathering_transactions) do
              {_t_id, transaction} ->
                case handle_gathering_transaction(transaction, state) do
                  {:ok, state} -> {true, state}
                  {:error, state} -> {false, state}
                end

              nil ->
                {false, state}
            end
        end

      unless transaction_executed do
        Logger.debug("Couldn't find transaction to execute. Did Ta timer fired without the need?")
      end

      # schedule next check and call update_ta_timer
      # if the next check is not needed, update_ta_timer will
      # cancel it
      ta_timer = Process.send_after(self(), :ta_timeout, @ta_timeout)
      state = %{state | ta_timer: ta_timer}
      state = update_ta_timer(state)

      {:noreply, state}
    end
  end

  @impl true
  def handle_info({:keepalive, id}, %{selected_pair: s_pair} = state)
      when not is_nil(s_pair) and s_pair.id == id do
    # if pair was selected, send keepalives only on that pair
    pair = CandidatePair.schedule_keepalive(s_pair)
    send_keepalive(state.checklist[id])
    state = put_in(state.checklist[id], pair)
    {:noreply, state}
  end

  @impl true
  def handle_info({:keepalive, _id}, %{selected_pair: s_pair} = state) when not is_nil(s_pair) do
    # note: current implementation assumes that, if selected pair exists, none of the already existing
    # valid pairs will ever become selected (only new appearing valid pairs)
    # that's why there's no call to `CandidatePair.schedule_keepalive/1`
    {:noreply, state}
  end

  @impl true
  def handle_info({:keepalive, id}, state) do
    # TODO: keepalives should be send only if no data has been send for @tr_timeout
    # atm, we send keepalives anyways, also it might be better to pace them with ta_timer
    # TODO: candidates not in a valid pair also should be kept alive (RFC 8445, sect 5.1.1.4)
    case Map.fetch(state.checklist, id) do
      {:ok, pair} ->
        pair = CandidatePair.schedule_keepalive(pair)
        state = put_in(state.checklist[id], pair)
        send_keepalive(pair)
        {:noreply, state}

      :error ->
        Logger.warning("Received keepalive request for non-existant candidate pair")
        {:noreply, state}
    end
  end

  @impl true
  def handle_info({:udp, socket, src_ip, src_port, packet}, state) do
    if ExSTUN.is_stun(packet) do
      case ExSTUN.Message.decode(packet) do
        {:ok, msg} ->
          state = handle_stun_msg(socket, src_ip, src_port, msg, state)
          {:noreply, state}

        {:error, reason} ->
          Logger.warning("Couldn't decode stun message: #{inspect(reason)}")
          {:noreply, state}
      end
    else
      notify(state.on_data, {:data, packet})
      {:noreply, state}
    end
  end

  @impl true
  def handle_info(msg, state) do
    Logger.warning("Got unexpected msg: #{inspect(msg)}")
    {:noreply, state}
  end

  @impl true
  def terminate(reason, _state) do
    # we don't need to close sockets manually as this is done automatically by Erlang
    Logger.debug("Stopping ICE agent with reason: #{inspect(reason)}")
  end

  defp do_add_remote_candidate(remote_cand, state) do
    local_cands = get_matching_candidates(state.local_cands, remote_cand)

    checklist_foundations = Checklist.get_foundations(state.checklist)

    new_pairs =
      for local_cand <- local_cands, into: %{} do
        local_cand =
          if local_cand.type == :srflx do
            %Candidate{local_cand | address: local_cand.base_address, port: local_cand.base_port}
          else
            local_cand
          end

        pair_state = get_pair_state(local_cand, remote_cand, checklist_foundations)
        pair = CandidatePair.new(local_cand, remote_cand, state.role, pair_state)
        {pair.id, pair}
      end

    checklist = Checklist.prune(Map.merge(state.checklist, new_pairs))

    added_pairs = Map.drop(checklist, Map.keys(state.checklist))

    if added_pairs == %{} do
      Logger.debug("Not adding any new pairs as they were redundant")
    else
      Logger.debug("New candidate pairs: #{inspect(added_pairs)}")
    end

    %{state | checklist: checklist, remote_cands: [remote_cand | state.remote_cands]}
  end

  defp get_next_gathering_transaction(gathering_transactions) do
    Enum.find(gathering_transactions, fn {_t_id, t} -> t.state == :waiting end)
  end

  defp handle_gathering_transaction(
         %{t_id: t_id, host_cand: host_cand, stun_server: stun_server} = t,
         state
       ) do
    Logger.debug("""
    Sending binding request to gather srflx candidate for:
    host_cand: #{inspect(host_cand)},
    stun_server: #{inspect(stun_server)}
    """)

    case Gatherer.gather_srflx_candidate(t_id, host_cand, stun_server) do
      :ok ->
        now = System.monotonic_time(:millisecond)
        t = %{t | state: :in_progress, send_time: now}
        state = put_in(state, [:gathering_transactions, t_id], t)
        {:ok, state}

      {:error, reason} ->
        Logger.debug("Couldn't send binding request, reason: #{reason}")

        state =
          state
          |> put_in([:gathering_transactions, t.t_id, :state], :failed)
          |> update_gathering_state()

        {:error, state}
    end
  end

  defp timeout_pending_transactions(state) do
    now = System.monotonic_time(:millisecond)
    state = timeout_gathering_transactions(now, state)
    timeout_conn_checks(now, state)
  end

  defp timeout_conn_checks(now, state) do
    {stale_cc, cc} =
      Enum.split_with(state.conn_checks, fn {_id, %{send_time: send_time}} ->
        now - send_time >= @hto
      end)

    {stale_cc, cc} = {Map.new(stale_cc), Map.new(cc)}

    checklist =
      if stale_cc != %{} do
        Logger.debug("Connectivity checks timed out: #{inspect(Map.keys(stale_cc))}")
        stale_pair_ids = Enum.map(stale_cc, fn {_id, %{pair_id: pair_id}} -> pair_id end)
        Logger.debug("Pairs failed. Reason: timeout. Pairs: #{inspect(stale_pair_ids)}")
        Checklist.timeout_pairs(state.checklist, stale_pair_ids)
      else
        state.checklist
      end

    %{state | checklist: checklist, conn_checks: cc}
  end

  defp timeout_gathering_transactions(now, state) do
    {stale_gath_trans, gath_trans} =
      Enum.split_with(state.gathering_transactions, fn {_id,
                                                        %{state: t_state, send_time: send_time}} ->
        t_state == :in_progress and now - send_time >= @hto
      end)

    gath_trans = Map.new(gath_trans)

    if stale_gath_trans != [] do
      Logger.debug("Gathering transactions timed out: #{inspect(Keyword.keys(stale_gath_trans))}")
    end

    %{state | gathering_transactions: gath_trans}
  end

  defp handle_stun_msg(socket, src_ip, src_port, %Message{} = msg, state) do
    # TODO revisit 7.3.1.4

    {:ok, socket_addr} = :inet.sockname(socket)

    case msg.type do
      %Type{class: :request, method: :binding} ->
        Logger.debug("""
        Received binding request from: #{inspect({src_ip, src_port})}, on: #{inspect(socket_addr)} \
        """)

        handle_binding_request(socket, src_ip, src_port, msg, state)

      %Type{class: class, method: :binding}
      when is_response(class) and is_map_key(state.conn_checks, msg.transaction_id) ->
        Logger.debug("""
        Received conn check response from: #{inspect({src_ip, src_port})}, on: #{inspect(socket_addr)} \
        """)

        handle_conn_check_response(socket, src_ip, src_port, msg, state)

      %Type{class: class, method: :binding}
      when is_response(class) and is_map_key(state.gathering_transactions, msg.transaction_id) ->
        Logger.debug("""
        Received gathering transaction response from: #{inspect({src_ip, src_port})}, on: #{inspect(socket_addr)} \
        """)

        handle_gathering_transaction_response(socket, src_ip, src_port, msg, state)

      %Type{class: class, method: :binding} when is_response(class) ->
        Logger.warning("""
        Ignoring binding response with unknown t_id: #{msg.transaction_id}.
        Is it retransmission or we called ICE restart?
        """)

        state

      other ->
        Logger.warning("""
        Unknown msg from: #{inspect({src_ip, src_port})}, on: #{inspect(socket_addr)}, msg: #{inspect(other)} \
        """)

        state
    end
    |> update_gathering_state()
    |> update_connection_state()
    |> maybe_nominate()
    |> update_ta_timer()
  end

  ## BINDING REQUEST HANDLING ##

  defp handle_binding_request(socket, src_ip, src_port, msg, state) do
    # username = state.local_ufrag <> ":" <> state.remote_ufrag
    # TODO check username
    with {:ok, key} <- authenticate_msg(msg, state.local_pwd),
         {:ok, prio_attr} <- get_prio_attribute(msg),
         {:ok, role_attr} <- get_role_attribute(msg),
         use_cand_attr when use_cand_attr in [nil, %UseCandidate{}] <-
           get_use_cand_attribute(msg),
         {{:ok, state}, _} <- {check_req_role_conflict(role_attr, state), key} do
      case find_host_cand(state.local_cands, socket) do
        nil ->
          # keepalive on pair selected before ice restart
          # TODO can we reach this? Won't we use incorrect local_pwd for auth?
          Logger.debug("Keepalive on pair from previous ICE session")
          send_binding_success_response(socket, src_ip, src_port, msg, key)
          state

        %Candidate{} = local_cand ->
          {remote_cand, state} = get_or_create_remote_cand(src_ip, src_port, prio_attr, state)
          pair = CandidatePair.new(local_cand, remote_cand, state.role, :waiting)

          @conn_check_handler[state.role].handle_conn_check_request(
            state,
            pair,
            msg,
            use_cand_attr,
            key
          )
      end
    else
      {:error, reason}
      when reason in [
             :invalid_priority_attribute,
             :no_priority_attribute,
             :invalid_use_candidate_attribute
           ] ->
        # TODO should we reply with 400 bad request when
        # attributes are invalid (they are present but invalid)
        # TODO should we authenticate?
        # chrome does not authenticate but section 6.3.1.1 suggests
        # we should add message-integrity
        Logger.debug("""
        Invalid binding request, reason: #{reason}. \
        Sending bad request error response"\
        """)

        send_bad_request_error_response(socket, src_ip, src_port, msg)
        state

      {:error, reason} ->
        Logger.debug("Ignoring binding request, reason: #{reason}")
        state

      {{:error, :role_conflict, tiebreaker}, key} ->
        Logger.debug("""
        Role conflict. We retain our role which is: #{state.role}. Sending error response.
        Our tiebreaker: #{state.tiebreaker}
        Peer's tiebreaker: #{tiebreaker}\
        """)

        send_role_conflict_error_response(socket, src_ip, src_port, msg, key)
        state
    end
  end

  defp get_prio_attribute(msg) do
    case Message.get_attribute(msg, Priority) do
      {:ok, _} = attr -> attr
      {:error, _} -> {:error, :invalid_priority_attribute}
      nil -> {:error, :no_priority_attribute}
    end
  end

  defp get_role_attribute(msg) do
    role_attr =
      Message.get_attribute(msg, ICEControlling) || Message.get_attribute(msg, ICEControlled)

    case role_attr do
      {:ok, _} -> role_attr
      {:error, _} -> {:error, :invalid_role_attribute}
      nil -> {:error, :no_role_attribute}
    end
  end

  defp get_use_cand_attribute(msg) do
    # this function breaks the convention...
    case Message.get_attribute(msg, UseCandidate) do
      {:ok, attr} -> attr
      {:error, _} -> {:error, :invalid_use_candidate_attribute}
      nil -> nil
    end
  end

  defp check_req_role_conflict(
         %ICEControlling{tiebreaker: tiebreaker},
         %{role: :controlling} = state
       )
       when state.tiebreaker >= tiebreaker do
    {:error, :role_conflict, tiebreaker}
  end

  defp check_req_role_conflict(
         %ICEControlling{tiebreaker: tiebreaker},
         %{role: :controlling} = state
       ) do
    Logger.debug("""
    Role conflict, switching our role to controlled. Recomputing pairs priority.
    Our tiebreaker: #{state.tiebreaker}
    Peer's tiebreaker: #{tiebreaker}\
    """)

    checklist = Checklist.recompute_pair_prios(state.checklist, :controlled)
    {:ok, %{state | role: :controlled, checklist: checklist}}
  end

  defp check_req_role_conflict(
         %ICEControlled{tiebreaker: tiebreaker},
         %{role: :controlled} = state
       )
       when state.tiebreaker >= tiebreaker do
    Logger.debug("""
    Role conflict, switching our role to controlling. Recomputing pairs priority.
    Our tiebreaker: #{state.tiebreaker}
    Peer's tiebreaker: #{tiebreaker}\
    """)

    checklist = Checklist.recompute_pair_prios(state.checklist, :controlling)
    {:ok, %{state | role: :controlling, checklist: checklist}}
  end

  defp check_req_role_conflict(%ICEControlled{tiebreaker: tiebreaker}, %{role: :controlled}) do
    {:error, :role_conflict, tiebreaker}
  end

  defp check_req_role_conflict(_role_attr, state), do: {:ok, state}

  ## BINDING RESPONSE HANDLING ##

  defp handle_conn_check_response(socket, src_ip, src_port, msg, state) do
    {%{pair_id: pair_id}, state} = pop_in(state, [:conn_checks, msg.transaction_id])
    conn_check_pair = Map.fetch!(state.checklist, pair_id)

    # check that the source and destination transport
    # adresses are symmetric - see sec. 7.2.5.2.1
    if is_symmetric(socket, {src_ip, src_port}, conn_check_pair) do
      case msg.type.class do
        :success_response -> handle_conn_check_success_response(conn_check_pair, msg, state)
        :error_response -> handle_conn_check_error_response(conn_check_pair, msg, state)
      end
    else
      {:ok, {socket_ip, socket_port}} = :inet.sockname(socket)

      Logger.warning("""
      Ignoring conn check response, non-symmetric src and dst addresses.
      Sent from: #{inspect({conn_check_pair.local_cand.base_address, conn_check_pair.local_cand.base_port})}, \
      to: #{inspect({conn_check_pair.remote_cand.address, conn_check_pair.remote_cand.port})}
      Recv from: #{inspect({src_ip, src_port})}, on: #{inspect({socket_ip, socket_port})}
      Pair failed: #{conn_check_pair.id}
      """)

      conn_check_pair = %CandidatePair{conn_check_pair | state: :failed}

      put_in(state, [:checklist, conn_check_pair.id], conn_check_pair)
    end
  end

  defp handle_conn_check_success_response(conn_check_pair, msg, state) do
    with {:ok, _key} <- authenticate_msg(msg, state.remote_pwd),
         {:ok, xor_addr} <- Message.get_attribute(msg, XORMappedAddress) do
      {local_cand, state} = get_or_create_local_cand(xor_addr, conn_check_pair, state)
      remote_cand = conn_check_pair.remote_cand

      valid_pair =
        CandidatePair.new(local_cand, remote_cand, state.role, :succeeded, valid?: true)

      checklist_pair = Checklist.find_pair(state.checklist, valid_pair)

      {pair_id, state} = add_valid_pair(valid_pair, conn_check_pair, checklist_pair, state)

      pair = CandidatePair.schedule_keepalive(state.checklist[pair_id])
      state = put_in(state.checklist[pair_id], pair)

      # get new conn check pair as it will have updated
      # discovered and succeeded pair fields
      conn_check_pair = Map.fetch!(state.checklist, conn_check_pair.id)
      nominate? = conn_check_pair.nominate?
      conn_check_pair = %CandidatePair{conn_check_pair | nominate?: false}
      state = put_in(state, [:checklist, conn_check_pair.id], conn_check_pair)
      @conn_check_handler[state.role].update_nominated_flag(state, pair_id, nominate?)
    else
      {:error, reason} when reason == :invalid_auth_attributes ->
        Logger.debug("Ignoring conn check response, reason: #{reason}")

        conn_check_pair = %CandidatePair{conn_check_pair | state: :failed}

        put_in(state, [:checklist, conn_check_pair.id], conn_check_pair)

      _other ->
        Logger.debug("""
        Invalid or no XORMappedAddress. Ignoring conn check response.
        Conn check tid: #{inspect(msg.transaction_id)},
        Conn check pair: #{inspect(conn_check_pair.id)}.
        """)

        state
    end
  end

  defp handle_conn_check_error_response(conn_check_pair, msg, state) do
    # TODO should we authenticate?
    # chrome seems not to add message integrity for 400 bad request errors
    # libnice seems to add message integrity for role conflict
    # RFC says we SHOULD add message integrity when possible
    case Message.get_attribute(msg, ErrorCode) do
      {:ok, %ErrorCode{code: 487}} ->
        new_role = if state.role == :controlling, do: :controlled, else: :controlling

        Logger.debug("""
        Conn check failed due to role conflict. Changing our role to: #{new_role}, \
        recomputing pair priorities, regenerating tiebreaker and rescheduling conn check \
        """)

        conn_check_pair = %CandidatePair{conn_check_pair | state: :waiting}
        checklist = Map.replace!(state.checklist, conn_check_pair.id, conn_check_pair)
        tiebreaker = generate_tiebreaker()
        %{state | role: new_role, checklist: checklist, tiebreaker: tiebreaker}

      other ->
        Logger.debug(
          "Conn check failed due to error resposne from the peer, error: #{inspect(other)}"
        )

        conn_check_pair = %CandidatePair{conn_check_pair | state: :failed}
        put_in(state, [:checklist, conn_check_pair.id], conn_check_pair)
    end
  end

  defp handle_gathering_transaction_response(socket, src_ip, src_port, msg, state) do
    case msg.type.class do
      :success_response ->
        handle_gathering_transaction_success_response(socket, src_ip, src_port, msg, state)

      :error_response ->
        handle_gathering_transaction_error_response(socket, src_ip, src_port, msg, state)
    end
  end

  defp handle_gathering_transaction_success_response(_socket, _src_ip, _src_port, msg, state) do
    t = Map.fetch!(state.gathering_transactions, msg.transaction_id)

    {:ok, %XORMappedAddress{address: xor_addr, port: xor_port}} =
      Message.get_attribute(msg, XORMappedAddress)

    case find_cand(state.local_cands, xor_addr, xor_port) do
      nil ->
        c =
          Candidate.new(
            :srflx,
            xor_addr,
            xor_port,
            t.host_cand.address,
            t.host_cand.port,
            t.host_cand.socket
          )

        Logger.debug("New srflx candidate: #{inspect(c)}")
        notify(state.on_new_candidate, {:new_candidate, Candidate.marshal(c)})
        add_srflx_cand(c, state)

      cand ->
        Logger.debug("""
        Not adding srflx candidate as we already have a candidate with the same address.
        Candidate: #{inspect(cand)}
        """)
    end
    |> update_in([:gathering_transactions, t.t_id], fn t -> %{t | state: :complete} end)
  end

  defp handle_gathering_transaction_error_response(_socket, _src_ip, _src_port, msg, state) do
    t = Map.fetch!(state.gathering_transactions, msg.transaction_id)

    error_code =
      case Message.get_attribute(msg, ErrorCode) do
        {:ok, error_code} -> error_code
        _other -> nil
      end

    Logger.debug(
      "Gathering transaction failed, t_id: #{msg.transaction_id}, reason: #{inspect(error_code)}"
    )

    update_in(state, [:gathering_transactions, t.t_id], fn t -> %{t | state: :failed} end)
  end

  defp add_srflx_cand(c, state) do
    # replace address and port with candidate base
    # and prune the checklist - see sec. 6.1.2.4
    local_cand = %Candidate{c | address: c.base_address, port: c.base_port}

    remote_cands = get_matching_candidates(state.remote_cands, local_cand)

    checklist_foundations = Checklist.get_foundations(state.checklist)

    new_pairs =
      for remote_cand <- remote_cands, into: %{} do
        pair_state = get_pair_state(local_cand, remote_cand, checklist_foundations)
        pair = CandidatePair.new(local_cand, remote_cand, state.role, pair_state)
        {pair.id, pair}
      end

    checklist = Checklist.prune(Map.merge(state.checklist, new_pairs))

    added_pairs = Map.drop(checklist, Map.keys(state.checklist))

    if added_pairs == %{} do
      Logger.debug("Not adding any new pairs as they were redundant")
    else
      Logger.debug("New candidate pairs: #{inspect(added_pairs)}")
    end

    %{state | checklist: checklist, local_cands: [c | state.local_cands]}
  end

  # Adds valid pair according to sec 7.2.5.3.2
  # TODO sec. 7.2.5.3.3
  # The agent MUST set the states for all other Frozen candidate pairs in
  # all checklists with the same foundation to Waiting.
  #
  # Check against valid_pair == conn_check_pair before
  # checking against valid_pair == checklist_pair as
  # the second condition is always true if the first one is
  defp add_valid_pair(valid_pair, conn_check_pair, _, state)
       when are_pairs_equal(valid_pair, conn_check_pair) do
    Logger.debug("""
    New valid pair: #{conn_check_pair.id} \
    resulted from conn check on pair: #{conn_check_pair.id}\
    """)

    conn_check_pair = %CandidatePair{
      conn_check_pair
      | succeeded_pair_id: conn_check_pair.id,
        discovered_pair_id: conn_check_pair.id,
        state: :succeeded,
        valid?: true
    }

    checklist = Map.replace!(state.checklist, conn_check_pair.id, conn_check_pair)

    state = %{state | checklist: checklist}
    {conn_check_pair.id, state}
  end

  defp add_valid_pair(
         valid_pair,
         conn_check_pair,
         %CandidatePair{valid?: true} = checklist_pair,
         state
       )
       when are_pairs_equal(valid_pair, checklist_pair) do
    Logger.debug("""
    New valid pair: #{checklist_pair.id} \
    resulted from conn check on pair: #{conn_check_pair.id} \
    but there is already such a pair in the checklist marked as valid.
    Should this ever happen after we don't add redundant srflx candidates?
    Checklist pair: #{checklist_pair.id}.
    """)

    # if we get here, don't update discovered_pair_id and succeeded_pair_id of
    # the checklist pair as they are already set
    conn_check_pair = %CandidatePair{
      conn_check_pair
      | state: :succeeded,
        succeeded_pair_id: conn_check_pair.id,
        discovered_pair_id: checklist_pair.id
    }

    checklist_pair = %CandidatePair{checklist_pair | state: :succeeded}

    checklist =
      state.checklist
      |> Map.replace!(checklist_pair.id, checklist_pair)
      |> Map.replace!(conn_check_pair.id, conn_check_pair)

    state = %{state | checklist: checklist}
    {checklist_pair.id, state}
  end

  defp add_valid_pair(valid_pair, conn_check_pair, checklist_pair, state)
       when are_pairs_equal(valid_pair, checklist_pair) do
    Logger.debug("""
    New valid pair: #{checklist_pair.id} \
    resulted from conn check on pair: #{conn_check_pair.id}\
    """)

    conn_check_pair = %CandidatePair{
      conn_check_pair
      | discovered_pair_id: checklist_pair.id,
        succeeded_pair_id: conn_check_pair.id,
        state: :succeeded
    }

    checklist_pair = %CandidatePair{
      checklist_pair
      | discovered_pair_id: checklist_pair.id,
        succeeded_pair_id: conn_check_pair.id,
        state: :succeeded,
        valid?: true
    }

    checklist =
      state.checklist
      |> Map.replace!(conn_check_pair.id, conn_check_pair)
      |> Map.replace!(checklist_pair.id, checklist_pair)

    state = %{state | checklist: checklist}
    {checklist_pair.id, state}
  end

  defp add_valid_pair(valid_pair, conn_check_pair, _, state) do
    # TODO compute priority according to sec 7.2.5.3.2
    Logger.debug("""
    Adding new candidate pair resulted from conn check \
    on pair: #{conn_check_pair.id}. Pair: #{inspect(valid_pair)}\
    """)

    Logger.debug("New valid pair: #{valid_pair.id}")

    conn_check_pair = %CandidatePair{
      conn_check_pair
      | discovered_pair_id: valid_pair.id,
        succeeded_pair_id: conn_check_pair.id,
        state: :succeeded
    }

    valid_pair = %CandidatePair{
      valid_pair
      | discovered_pair_id: valid_pair.id,
        succeeded_pair_id: conn_check_pair.id
    }

    checklist =
      state.checklist
      |> Map.replace!(conn_check_pair.id, conn_check_pair)
      |> Map.put(valid_pair.id, valid_pair)

    state = %{state | checklist: checklist}
    {valid_pair.id, state}
  end

  @doc false
  @spec send_binding_success_response(CandidatePair.t(), Message.t(), binary()) :: :ok
  def send_binding_success_response(pair, msg, key) do
    src_ip = pair.remote_cand.address
    src_port = pair.remote_cand.port
    send_binding_success_response(pair.local_cand.socket, src_ip, src_port, msg, key)
  end

  @doc false
  @spec send_bad_request_error_response(CandidatePair.t(), Message.t()) :: :ok
  def send_bad_request_error_response(pair, msg) do
    src_ip = pair.remote_cand.address
    src_port = pair.remote_cand.port
    send_bad_request_error_response(pair.local_cand.socket, src_ip, src_port, msg)
  end

  defp send_binding_success_response(socket, src_ip, src_port, req, key) do
    type = %Type{class: :success_response, method: :binding}

    resp =
      Message.new(req.transaction_id, type, [%XORMappedAddress{address: src_ip, port: src_port}])
      |> Message.with_integrity(key)
      |> Message.with_fingerprint()
      |> Message.encode()

    do_send(socket, {src_ip, src_port}, resp)
  end

  defp send_bad_request_error_response(socket, src_ip, src_port, req) do
    type = %Type{class: :error_response, method: :binding}

    response =
      Message.new(req.transaction_id, type, [%ErrorCode{code: 400}])
      |> Message.encode()

    do_send(socket, {src_ip, src_port}, response)
  end

  defp send_role_conflict_error_response(socket, src_ip, src_port, req, key) do
    type = %Type{class: :error_response, method: :binding}

    response =
      Message.new(req.transaction_id, type, [%ErrorCode{code: 487}])
      |> Message.with_integrity(key)
      |> Message.with_fingerprint()
      |> Message.encode()

    do_send(socket, {src_ip, src_port}, response)
  end

  defp get_matching_candidates(candidates, cand) do
    Enum.filter(candidates, &(Candidate.family(&1) == Candidate.family(cand)))
  end

  defp is_symmetric(socket, response_src, conn_check_pair) do
    request_dst = {conn_check_pair.remote_cand.address, conn_check_pair.remote_cand.port}
    response_src == request_dst and socket == conn_check_pair.local_cand.socket
  end

  defp get_pair_state(local_cand, remote_cand, checklist_foundations) do
    f = {local_cand.foundation, remote_cand.foundation}
    if f in checklist_foundations, do: :frozen, else: :waiting
  end

  defp get_or_create_local_cand(xor_addr, conn_check_pair, state) do
    local_cand = find_cand(state.local_cands, xor_addr.address, xor_addr.port)

    if local_cand do
      {local_cand, state}
    else
      # prflx candidate sec 7.2.5.3.1
      # TODO calculate correct prio and foundation
      cand =
        Candidate.new(
          :prflx,
          xor_addr.address,
          xor_addr.port,
          conn_check_pair.local_cand.base_address,
          conn_check_pair.local_cand.base_port,
          conn_check_pair.local_cand.socket
        )

      Logger.debug("Adding new local prflx candidate: #{inspect(cand)}")
      state = %{state | local_cands: [cand | state.local_cands]}
      {cand, state}
    end
  end

  defp get_or_create_remote_cand(src_ip, src_port, _prio_attr, state) do
    case find_cand(state.remote_cands, src_ip, src_port) do
      nil ->
        # TODO calculate correct prio using prio_attr
        cand = Candidate.new(:prflx, src_ip, src_port, nil, nil, nil)
        Logger.debug("Adding new remote prflx candidate: #{inspect(cand)}")
        state = %{state | remote_cands: [cand | state.remote_cands]}
        {cand, state}

      %Candidate{} = cand ->
        {cand, state}
    end
  end

  defp maybe_nominate(state) do
    if time_to_nominate?(state) do
      Logger.debug("Time to nominate a pair! Looking for a best valid pair...")
      try_nominate(state)
    else
      state
    end
  end

  defp time_to_nominate?(%{state: :completed}), do: false

  defp time_to_nominate?(state) do
    {nominating?, _} = state.nominating?
    # if we are not during nomination and we know there won't be further candidates,
    # there are no checks waiting or in-progress,
    # and we are the controlling agent, then we can nominate
    nominating? == false and state.gathering_state == :complete and
      state.eoc and
      Checklist.finished?(state.checklist) and
      state.role == :controlling
  end

  @doc false
  @spec try_nominate(map()) :: map()
  def try_nominate(state) do
    case Checklist.get_pair_for_nomination(state.checklist) do
      %CandidatePair{} = pair ->
        Logger.debug("Trying to nominate pair: #{inspect(pair.id)}")
        pair = %CandidatePair{pair | nominate?: true}
        state = put_in(state, [:checklist, pair.id], pair)
        state = %{state | nominating?: {true, pair.id}}
        pair = Map.fetch!(state.checklist, pair.succeeded_pair_id)
        pair = %CandidatePair{pair | state: :waiting, nominate?: true}
        {pair, state} = send_conn_check(pair, state)
        put_in(state, [:checklist, pair.id], pair)

      nil ->
        # TODO revisit this
        # should we check if state.state == :in_progress?
        Logger.debug("""
        No pairs for nomination. ICE failed. #{inspect(state.checklist, pretty: true)}
        """)

        change_connection_state(:failed, state)
    end
  end

  defp update_gathering_state(%{gathering_state: :complete} = state), do: state

  defp update_gathering_state(state) do
    transaction_in_progress? =
      Enum.any?(state.gathering_transactions, fn {_id, %{state: t_state}} ->
        t_state in [:waiting, :in_progress]
      end)

    cond do
      state.gathering_state == :new and transaction_in_progress? ->
        Logger.debug("Gathering state change: new -> gathering")
        notify(state.on_gathering_state_change, {:gathering_state_change, :gathering})
        %{state | gathering_state: :gathering}

      state.gathering_state == :gathering and not transaction_in_progress? ->
        Logger.debug("Gathering state change: gathering -> complete")
        notify(state.on_gathering_state_change, {:gathering_state_change, :complete})
        %{state | gathering_state: :complete}

      true ->
        state
    end
  end

  defp do_restart(state) do
    valid_pairs = state.checklist |> Map.values() |> Enum.filter(fn pair -> pair.valid? end)
    valid_sockets = Enum.map(valid_pairs, fn p -> p.local_cand.socket end)

    {prev_selected_pair, prev_valid_pairs} =
      if valid_pairs == [] do
        {state.prev_selected_pair, state.prev_valid_pairs}
      else
        # TODO cleanup prev pairs
        {state.selected_pair, valid_pairs}
      end

    state.local_cands
    |> Enum.uniq_by(fn c -> c.socket end)
    |> Enum.each(fn c ->
      if c.socket not in valid_sockets do
        Logger.debug(
          "Closing local candidate's socket: #{inspect(c.base_address)}:#{c.base_port}"
        )

        :ok = :gen_udp.close(c.socket)
      end
    end)

    {ufrag, pwd} = generate_credentials()

    new_ice_state =
      cond do
        state.state in [:disconnected, :failed] -> :checking
        state.state == :completed -> :connected
        true -> state.state
      end

    state =
      if new_ice_state != state.state do
        change_connection_state(new_ice_state, state)
      else
        state
      end

    Logger.debug("Gathering state change: #{state.gathering_state} -> new")
    notify(state.on_gathering_state_change, {:gathering_state_change, :new})

    %{
      state
      | state: new_ice_state,
        gathering_state: :new,
        gathering_transactions: %{},
        selected_pair: nil,
        prev_selected_pair: prev_selected_pair,
        prev_valid_pairs: prev_valid_pairs,
        conn_checks: %{},
        checklist: %{},
        local_cands: [],
        remote_cands: [],
        local_ufrag: ufrag,
        local_pwd: pwd,
        remote_ufrag: nil,
        remote_pwd: nil,
        eoc: false,
        nominating?: {false, nil}
    }
    |> update_ta_timer()
  end

  defp find_cand(cands, ip, port) do
    Enum.find(cands, fn cand -> cand.address == ip and cand.port == port end)
  end

  defp find_host_cand(cands, socket) do
    # this function returns only host candidates
    Enum.find(cands, fn cand -> cand.socket == socket and cand.type == :host end)
  end

  defp generate_tiebreaker() do
    <<tiebreaker::64>> = :crypto.strong_rand_bytes(8)
    tiebreaker
  end

  defp generate_credentials() do
    # TODO am I using Base.encode64 correctly?
    ufrag = :crypto.strong_rand_bytes(3) |> Base.encode64()
    pwd = :crypto.strong_rand_bytes(16) |> Base.encode64()
    {ufrag, pwd}
  end

  defp authenticate_msg(msg, local_pwd) do
    with {:ok, key} <- Message.authenticate_st(msg, local_pwd),
         :ok <- Message.check_fingerprint(msg) do
      {:ok, key}
    else
      {:error, _reason} -> {:error, :invalid_auth_attributes}
    end
  end

  @doc false
  @spec change_connection_state(atom(), map()) :: map()
  def change_connection_state(new_conn_state, state) do
    Logger.debug("Connection state change: #{state.state} -> #{new_conn_state}")
    notify(state.on_connection_state_change, {:connection_state_change, new_conn_state})
    %{state | state: new_conn_state}
  end

  defp update_connection_state(%{state: :new} = state) do
    if Checklist.waiting?(state.checklist) or Checklist.in_progress?(state.checklist) do
      change_connection_state(:checking, state)
    else
      state
    end
  end

  defp update_connection_state(%{state: :checking} = state) do
    cond do
      Checklist.get_valid_pair(state.checklist) != nil ->
        Logger.debug("Found a valid pair. Changing connection state to connected")
        change_connection_state(:connected, state)

      state.eoc == true and state.gathering_state == :complete and
          Checklist.finished?(state.checklist) ->
        Logger.debug("""
        Finished all conn checks, there won't be any further local or remote candidates
        and we don't have any valid or selected pair. Changing connection state to failed.
        """)

        change_connection_state(:failed, state)

      true ->
        state
    end
  end

  # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
  defp update_connection_state(%{state: :connected} = state) do
    cond do
      state.eoc == true and state.gathering_state == :complete and
        Checklist.get_valid_pair(state.checklist) == nil and
          Checklist.finished?(state.checklist) ->
        change_connection_state(:failed, state)

      # Assuming the controlling side uses regulard nomination,
      # the controlled side could move to the completed
      # state as soon as it receives nomination request (or after
      # successful triggered check caused by nomination request).
      # However, to be compatible with the older RFC's aggresive
      # nomination, we wait for the end-of-candidates indication
      # and checklist to be finished.
      # This also means, that if the other side never sets eoc,
      # we will never move to the completed state.
      # This seems to be compliant with libwebrtc.
      state.role == :controlled and state.eoc == true and state.gathering_state == :complete and
        state.selected_pair != nil and Checklist.finished?(state.checklist) ->
        Logger.debug("""
        Finished all conn checks, there won't be any further local or remote candidates
        and we have selected pair. Changing connection state to completed.
        """)

        change_connection_state(:completed, state)

      state.role == :controlling and state.selected_pair != nil ->
        change_connection_state(:completed, state)

      state.role == :controlling and match?({true, _pair_id}, state.nominating?) and
          Map.fetch!(state.checklist, elem(state.nominating?, 1)).state == :failed ->
        {_, pair_id} = state.nominating?

        Logger.debug("""
        Pair we tried to nominate failed. Changing connection state to failed. \
        Pair id: #{pair_id}
        """)

        change_connection_state(:failed, state)

      true ->
        state
    end
  end

  # TODO handle more states
  defp update_connection_state(state) do
    state
  end

  defp update_ta_timer(state) do
    if is_work_to_do(state) do
      if state.ta_timer != nil do
        # do nothing, timer already works
        state
      else
        Logger.debug("Starting Ta timer")
        enable_timer(state)
      end
    else
      if state.ta_timer != nil do
        Logger.debug("Stopping Ta timer")
        disable_timer(state)
      else
        # do nothing, timer already stopped
        state
      end
    end
  end

  defp is_work_to_do(state) when state.state in [:completed, :failed], do: false

  defp is_work_to_do(state) do
    gath_trans_in_progress? =
      Enum.any?(state.gathering_transactions, fn {_id, %{state: t_state}} ->
        t_state in [:waiting, :in_progress]
      end)

    not Checklist.finished?(state.checklist) or gath_trans_in_progress?
  end

  defp enable_timer(state) do
    timer = Process.send_after(self(), :ta_timeout, 0)
    %{state | ta_timer: timer}
  end

  defp disable_timer(state) do
    Process.cancel_timer(state.ta_timer)

    # flush mailbox
    receive do
      :ta_timeout -> :ok
    after
      0 -> :ok
    end

    %{state | ta_timer: nil}
  end

  defp send_keepalive(pair) do
    type = %Type{class: :indication, method: :binding}

    req =
      type
      |> Message.new()
      |> Message.with_fingerprint()

    dst = {pair.remote_cand.address, pair.remote_cand.port}
    do_send(pair.local_cand.socket, dst, Message.encode(req))
  end

  @doc false
  @spec send_conn_check(CandidatePair.t(), map()) :: {CandidatePair.t(), map()}
  def send_conn_check(pair, state) do
    type = %Type{class: :request, method: :binding}

    role_attr =
      if state.role == :controlling do
        %ICEControlling{tiebreaker: state.tiebreaker}
      else
        %ICEControlled{tiebreaker: state.tiebreaker}
      end

    # priority sent to the other side has to be
    # computed with the candidate type preference of
    # peer-reflexive; refer to sec 7.1.1
    priority = Candidate.priority(:prflx)

    attrs = [
      %Username{value: "#{state.remote_ufrag}:#{state.local_ufrag}"},
      %Priority{priority: priority},
      role_attr
    ]

    # we can nominate only when being the controlling agent
    # the controlled agent uses nominate? flag according to 7.3.1.5
    attrs =
      if pair.nominate? and state.role == :controlling do
        attrs ++ [%UseCandidate{}]
      else
        attrs
      end

    req =
      Message.new(type, attrs)
      |> Message.with_integrity(state.remote_pwd)
      |> Message.with_fingerprint()

    dst = {pair.remote_cand.address, pair.remote_cand.port}

    do_send(pair.local_cand.socket, dst, Message.encode(req))

    pair = %CandidatePair{pair | state: :in_progress}

    conn_check = %{
      pair_id: pair.id,
      send_time: System.monotonic_time(:millisecond)
    }

    state = put_in(state, [:conn_checks, req.transaction_id], conn_check)

    {pair, state}
  end

  defp do_send(socket, dst, data) do
    # FIXME that's a workaround for EPERM
    # retrying after getting EPERM seems to help
    case :gen_udp.send(socket, dst, data) do
      :ok ->
        :ok

      err ->
        Logger.error("UDP send error: #{inspect(err)}. Retrying...")

        case :gen_udp.send(socket, dst, data) do
          :ok ->
            Logger.debug("Successful retry")

          err ->
            Logger.error("Unseccessful retry: #{inspect(err)}. Giving up.")
        end
    end
  end

  defp notify(nil, _msg), do: :ok
  defp notify(dst, msg), do: send(dst, {:ex_ice, self(), msg})
end