defmodule Membrane.ICE.Endpoint do
@moduledoc """
Endpoint used for establishing ICE connection, sending and receiving messages.
### Architecture and pad semantic
Both input and output pads are dynamic ones.
One instance of ICE Endpoint is responsible for handling only one ICE stream with only one component.
### Linking using output pad
To receive messages after establishing ICE connection you have to link ICE Endpoint to your element
via `Pad.ref(:output, 1)`. `1` is an id of component from which your element will receive messages - because
there will be always at most one component, id of it will be equal `1`.
**Important**: you can link to ICE Endpoint using its output pad in any moment you want but if you don't
want to miss any messages do it before playing your pipeline.
### Linking using input pad
To send messages after establishing ICE connection you have to link to ICE Endpoint via
`Pad.ref(:input, 1)`. `1` is an id of component which will be used to send
messages via net. To send data from multiple elements via the same component you have to
use [membrane_funnel_plugin](https://github.com/membraneframework/membrane_funnel_plugin).
### Notifications API
ICE Endpoint handles the following notifications:
- `:gather_candidates`
- `{:set_remote_credentials, credentials}` - credentials are string in form of "ufrag passwd"
- `:peer_candidate_gathering_done`
ICE Endpoint sends the following notifications:
- `{:new_candidate_full, candidate}`
Triggered by: `:gather_candidates`
- `{:udp_integrated_turn, udp_integrated_turn}`
- `{:handshake_init_data, component_id, handshake_init_data}`
- `{:connection_ready, stream_id, component_id}`
- `{:component_state_failed, stream_id, component_id}`
### Sending and receiving messages
To send or receive messages just link to ICE Endpoint using relevant pads.
As soon as connection is established your element will receive demands and incoming messages.
### Establishing a connection
#### Gathering ICE candidates
Data about integrated TURN servers set up by `Membrane.ICE.Endpoint`, passed to the parent via notification, should be
forwarded to the second peer, that will try to establish ICE connection with `Membrane.ICE.Endpoint`. The second peer
should have at least one allocation, in any of running integrated TURN servers (Firefox or Chrome will probably
have one allocation per TURN Server)
#### Performing ICE connectivity checks, selecting candidates pair
All ICE candidates from the second peer, that are not relay candidates corresponded to allocations on integrated TURN
servers, will be ignored. Every ICE connectivity check sent via integrated TURN server is captured, parsed, and
forwarded to ICE Endpoint in message `{:connectivity_check, attributes, allocation_pid}`. ICE Endpoint sends to
messages in form of `{:send_connectivity_check, attributes}` on `allocation_pid`, to send his connectivity checks
to the second peer. Role of ICE Endpoint can be ice-controlled, but cannot be ice-controlling. It is suggested, to use
`ice-lite` option in SDP message, but it is not necessary. ICE Endpoint supports both, aggressive and normal nomination.
After starting ICE or after every ICE restart, ICE Endpoint will pass all traffic and connectivity checks via
allocation, which corresponds to the last selected ICE candidates pair.
"""
use Membrane.Endpoint
require Membrane.Logger
require Membrane.OpenTelemetry
require Membrane.TelemetryMetrics
alias __MODULE__.Allocation
alias Membrane.Funnel
alias Membrane.ICE.{CandidatePortAssigner, Utils}
alias Membrane.RemoteStream
alias Membrane.SRTP
@component_id 1
@stream_id 1
@time_between_keepalives 1_000_000_000
@ice_restart_timeout 5_000
@payload_received_event [Membrane.ICE, :ice, :payload, :received]
@payload_sent_event [Membrane.ICE, :ice, :payload, :sent]
@request_received_event [Membrane.ICE, :stun, :request, :received]
@response_sent_event [Membrane.ICE, :stun, :response, :sent]
@indication_sent_event [Membrane.ICE, :stun, :indication, :sent]
@buffers_with_timestamps_sent [Membrane.ICE, :ice, :bufffer, :sent]
@buffers_processing_time [Membrane.ICE, :ice, :buffer, :processing_time]
@emitted_events [
@payload_received_event,
@payload_sent_event,
@request_received_event,
@response_sent_event,
@indication_sent_event,
@buffers_with_timestamps_sent,
@buffers_processing_time
]
@life_span_id "ice_endpoint.life_span"
@dtls_handshake_span_id "ice_endpoint.dtls_handshake"
@alloc_span_name "ice_endpoint.turn_allocation"
@typedoc """
Options defining the behavior of ICE.Endpoint in relation to integrated TURN servers.
- `:ip` - IP, where integrated TURN server will open its sockets
- `:mock_ip` - IP, that will be part of the allocation address contained in Allocation Succes
message. Because of the fact, that in integrated TURNS no data is relayed via allocation address,
there is no need to open socket there. There are some cases, where it is necessary, to tell
the browser, that we have opened allocation on different IP, that we have TURN listening on,
eg. we are using Docker container
- `:ports_range` - range, where integrated TURN server will try to open ports
- `:cert_file` - path to file with certificate and private key, used for estabilishing TLS connection
for TURN using TLS over TCP
"""
@type integrated_turn_options_t() :: [
ip: :inet.ip4_address() | nil,
mock_ip: :inet.ip4_address() | nil,
ports_range: {:inet.port_number(), :inet.port_number()} | nil,
cert_file: binary() | nil
]
def_options dtls?: [
spec: boolean(),
default: true,
description: "`true`, if using DTLS Handshake, `false` otherwise"
],
ice_lite?: [
spec: boolean(),
default: true,
description:
"`true`, when ice-lite option was send in SDP message, `false` otherwise"
],
handshake_opts: [
spec: keyword(),
default: [],
description:
"Options for `ExDTLS` module. They will be passed to `ExDTLS.start_link/1`"
],
integrated_turn_options: [
spec: [integrated_turn_options_t()],
description: "Integrated TURN Options"
],
telemetry_label: [
spec: Membrane.TelemetryMetrics.label(),
default: [],
description: "Label passed to Membrane.TelemetryMetrics functions"
],
trace_context: [
spec: :list | any(),
default: [],
description: "Trace context for otel propagation"
],
parent_span: [
spec: :opentelemetry.span_ctx() | nil,
default: nil,
description: "Parent span of #{@life_span_id}"
]
def_input_pad :input,
availability: :on_request,
accepted_format: _any,
mode: :pull,
demand_unit: :buffers
def_output_pad :output,
availability: :on_request,
accepted_format: %RemoteStream{content_format: nil, type: :packetized},
mode: :push
defmodule Allocation do
@enforce_keys [:pid]
# field `:in_nominated_pair` says, whenether or not, specific allocation
# is a browser ICE candidate, that belongs to nominated ICE candidates pair
defstruct @enforce_keys ++ [magic: nil, in_nominated_pair: false]
end
@impl true
def handle_init(_context, options) do
%__MODULE__{
integrated_turn_options: integrated_turn_options,
dtls?: dtls?,
handshake_opts: hsk_opts,
telemetry_label: telemetry_label,
trace_context: trace_context,
parent_span: parent_span
} = options
if trace_context != [], do: Membrane.OpenTelemetry.attach(trace_context)
start_span_opts = if parent_span, do: [parent_span: parent_span], else: []
Membrane.OpenTelemetry.start_span(@life_span_id, start_span_opts)
for event_name <- @emitted_events do
Membrane.TelemetryMetrics.register(event_name, telemetry_label)
end
state = %{
id: to_string(Enum.map(1..10, fn _i -> Enum.random(?a..?z) end)),
turn_allocs: %{},
integrated_turn_options: integrated_turn_options,
fake_candidate_ip: integrated_turn_options[:mock_ip] || integrated_turn_options[:ip],
selected_alloc: nil,
dtls?: dtls?,
hsk_opts: hsk_opts,
telemetry_label: telemetry_label,
component_connected?: false,
cached_hsk_packets: nil,
component_ready?: false,
pending_connection_ready?: false,
connection_status_sent?: false,
sdp_offer_arrived?: false,
ice_restart_timer: nil,
first_dtls_hsk_packet_arrived: false
}
{[], state}
end
@impl true
def handle_playing(ctx, %{dtls?: true} = state) do
case CandidatePortAssigner.assign_candidate_port() do
{:ok, candidate_port} ->
{:ok, dtls} = ExDTLS.start_link(state.hsk_opts)
{:ok, fingerprint} = ExDTLS.get_cert_fingerprint(dtls)
hsk_state = %{:dtls => dtls, :client_mode => state.hsk_opts[:client_mode]}
ice_ufrag = Utils.generate_ice_ufrag()
ice_pwd = Utils.generate_ice_pwd()
[udp_integrated_turn] =
Utils.start_integrated_turn_servers([:udp], state.integrated_turn_options,
parent: self()
)
Membrane.ResourceGuard.register(ctx.resource_guard, fn ->
Membrane.ICE.Utils.stop_integrated_turn(udp_integrated_turn)
end)
state =
Map.merge(state, %{
candidate_port: candidate_port,
udp_integrated_turn: udp_integrated_turn,
local_ice_pwd: ice_pwd,
handshake: %{state: hsk_state, status: :in_progress, keying_material_event: nil}
})
|> start_ice_restart_timer()
actions = [
stream_format: {Pad.ref(:output, @component_id), %RemoteStream{type: :packetized}},
start_timer: {:keepalive_timer, @time_between_keepalives},
notify_parent: {:udp_integrated_turn, udp_integrated_turn},
notify_parent: {:handshake_init_data, @component_id, fingerprint},
notify_parent: {:local_credentials, "#{ice_ufrag} #{ice_pwd}"}
]
{actions, state}
{:error, :no_free_candidate_port} = err ->
raise "ICE: No free candidate port available. #{inspect(err)}"
end
end
@impl true
def handle_playing(ctx, state) do
case CandidatePortAssigner.assign_candidate_port() do
{:ok, candidate_port} ->
ice_ufrag = Utils.generate_ice_ufrag()
ice_pwd = Utils.generate_ice_pwd()
[udp_integrated_turn] =
Utils.start_integrated_turn_servers([:udp], state.integrated_turn_options,
parent: self()
)
Membrane.ResourceGuard.register(ctx.resource_guard, fn ->
Membrane.ICE.Utils.stop_integrated_turn(udp_integrated_turn)
end)
state =
Map.merge(state, %{
candidate_port: candidate_port,
udp_integrated_turn: udp_integrated_turn,
local_ice_pwd: ice_pwd
})
|> start_ice_restart_timer()
actions = [
notify_parent: {:udp_integrated_turn, udp_integrated_turn},
notify_parent: {:handshake_init_data, @component_id, nil},
notify_parent: {:local_credentials, "#{ice_ufrag} #{ice_pwd}"}
]
{actions, state}
{:error, :no_free_candidate_port} = err ->
raise "ICE: No free candidate port available. #{inspect(err)}"
end
end
@impl true
def handle_pad_added(Pad.ref(:input, @component_id), ctx, state) do
actions = maybe_send_demands_actions(ctx, state)
{actions, state}
end
@impl true
def handle_pad_added(
Pad.ref(:output, @component_id) = pad,
ctx,
%{dtls?: true, handshake: %{status: :finished}} = state
) do
actions =
maybe_send_stream_format(ctx) ++ [event: {pad, state.handshake.keying_material_event}]
{actions, state}
end
@impl true
def handle_pad_added(Pad.ref(:output, @component_id), ctx, state) do
{maybe_send_stream_format(ctx), state}
end
@impl true
def handle_write(
Pad.ref(:input, @component_id) = pad,
%Membrane.Buffer{payload: payload, metadata: metadata},
_ctx,
%{selected_alloc: alloc} = state
)
when is_pid(alloc) do
send_ice_payload(alloc, payload, state.telemetry_label, Map.get(metadata, :timestamp))
{[demand: pad], state}
end
@impl true
def handle_event(
Pad.ref(:input, @component_id) = pad,
%Funnel.NewInputEvent{},
_ctx,
%{dtls?: true, handshake: %{status: :finished}} = state
) do
{[event: {pad, state.handshake.keying_material_event}], state}
end
@impl true
def handle_event(_pad, _event, _ctx, state), do: {[], state}
@impl true
def handle_tick(:keepalive_timer, _ctx, state) do
with %{selected_alloc: alloc_pid} when is_pid(alloc_pid) <- state,
%{^alloc_pid => %{magic: magic}} when magic != nil <- state.turn_allocs do
tr_id = Utils.generate_transaction_id()
Utils.send_binding_indication(alloc_pid, state.remote_ice_pwd, magic, tr_id)
Membrane.TelemetryMetrics.execute(@indication_sent_event, %{}, %{}, state.telemetry_label)
Membrane.Logger.debug(
"Sending Binding Indication with params: #{inspect(magic: magic, transaction_id: tr_id)}"
)
end
{[], state}
end
# TODO Use mocking turn server instead of this
@impl true
def handle_parent_notification(:test_get_pid, _ctx, state) do
msg = {:test_get_pid, self()}
{[notify_parent: msg], state}
end
@impl true
def handle_parent_notification(:gather_candidates, _ctx, state) do
msg = {
:new_candidate_full,
Utils.generate_fake_ice_candidate({state.fake_candidate_ip, state.candidate_port})
}
{[notify_parent: msg], state}
end
@impl true
def handle_parent_notification({:set_remote_credentials, credentials}, _ctx, state)
when state.pending_connection_ready? do
[_ice_ufrag, ice_pwd] = String.split(credentials)
state =
Map.merge(state, %{
remote_ice_pwd: ice_pwd,
sdp_offer_arrived?: true,
connection_status_sent?: true,
pending_connection_ready?: false
})
|> stop_ice_restart_timer()
Membrane.OpenTelemetry.add_event(@life_span_id, :component_ready)
actions = [notify_parent: {:connection_ready, @stream_id, @component_id}]
{actions, state}
end
@impl true
def handle_parent_notification({:set_remote_credentials, credentials}, _ctx, state) do
[_ice_ufrag, ice_pwd] = String.split(credentials)
state =
Map.merge(state, %{
remote_ice_pwd: ice_pwd,
sdp_offer_arrived?: true
})
{[], state}
end
@impl true
def handle_parent_notification(:restart_stream, _ctx, state) do
ice_ufrag = Utils.generate_ice_ufrag()
ice_pwd = Utils.generate_ice_pwd()
state =
Map.merge(state, %{
local_ice_pwd: ice_pwd,
connection_status_sent?: false,
sdp_offer_arrived?: false
})
|> start_ice_restart_timer()
Membrane.OpenTelemetry.add_event(@life_span_id, :restart_stream)
credentials = "#{ice_ufrag} #{ice_pwd}"
{[notify_parent: {:local_credentials, credentials}], state}
end
@impl true
def handle_parent_notification(:peer_candidate_gathering_done, _ctx, state) do
{[], state}
end
@impl true
def handle_info({:alloc_deleting, alloc_pid}, _ctx, state) do
Membrane.Logger.debug("Deleting allocation with pid #{inspect(alloc_pid)}")
{_alloc, state} = pop_in(state, [:turn_allocs, alloc_pid])
{[], state}
end
@impl true
def handle_info(
{:connectivity_check, attrs, alloc_pid},
ctx,
state
) do
state =
if Map.has_key?(state.turn_allocs, alloc_pid) do
state
else
Membrane.Logger.debug(
"First connectivity check arrived from allocation with pid #{inspect(alloc_pid)}"
)
Process.monitor(alloc_pid)
span_id = alloc_span_id(alloc_pid)
Membrane.OpenTelemetry.start_span(span_id,
name: @alloc_span_name,
parent_id: @life_span_id
)
Membrane.OpenTelemetry.set_attribute(span_id, :pid, inspect(alloc_pid))
put_in(state, [:turn_allocs, alloc_pid], %Allocation{pid: alloc_pid})
end
{state, actions} = do_handle_connectivity_check(Map.new(attrs), alloc_pid, ctx, state)
{actions, state}
end
@impl true
def handle_info({:DOWN, _ref, _process, alloc_pid, _reason}, _ctx, state) do
alloc_span_id(alloc_pid)
|> Membrane.OpenTelemetry.end_span()
{[], state}
end
@impl true
def handle_info({:ice_payload, payload, timestamp}, ctx, state) do
Membrane.TelemetryMetrics.execute(
@payload_received_event,
%{bytes: byte_size(payload)},
%{},
state.telemetry_label
)
if state.dtls? and Utils.is_dtls_hsk_packet(payload) do
state =
if state.first_dtls_hsk_packet_arrived do
state
else
Membrane.OpenTelemetry.start_span(@dtls_handshake_span_id, parent_id: @life_span_id)
%{state | first_dtls_hsk_packet_arrived: true}
end
ExDTLS.process(state.handshake.state.dtls, payload)
|> handle_process_result(ctx, state)
else
out_pad = Pad.ref(:output, @component_id)
actions =
cond do
not Map.has_key?(ctx.pads, out_pad) ->
Membrane.Logger.warn(
"No links for component: #{@component_id}. Ignoring incoming message."
)
[]
ctx.playback != :playing ->
Membrane.Logger.debug(
"Received message in playback state: #{ctx.playback}. Ignoring."
)
[]
true ->
[
buffer:
{out_pad, %Membrane.Buffer{payload: payload, metadata: %{timestamp: timestamp}}}
]
end
{actions, state}
end
end
@impl true
def handle_info({:retransmit, _dtls_pid, packets}, ctx, state) do
# Treat retransmitted packets in the same way as regular handshake_packets
handle_process_result({:handshake_packets, packets}, ctx, state)
end
@impl true
def handle_info(:ice_restart_timeout, _ctx, state) do
Membrane.Logger.debug("ICE restart failed due to timeout")
Membrane.OpenTelemetry.add_event(@life_span_id, :ice_restart_timeout)
state = %{state | connection_status_sent?: true, pending_connection_ready?: false}
actions = [notify_parent: {:connection_failed, @stream_id, @component_id}]
{actions, state}
end
@impl true
def handle_info(msg, _ctx, state) do
Membrane.Logger.warn("Received unknown message: #{inspect(msg)}")
{[], state}
end
defp do_handle_connectivity_check(%{class: :request} = attrs, alloc_pid, ctx, state) do
log_debug_connectivity_check(attrs)
Membrane.TelemetryMetrics.execute(@request_received_event, %{}, %{}, state.telemetry_label)
alloc_span_id(alloc_pid)
|> Membrane.OpenTelemetry.add_event(:binding_request_received,
allocation: inspect(alloc_pid),
use_candidate: attrs.use_candidate
)
alloc = state.turn_allocs[alloc_pid]
Utils.send_binding_success(
alloc_pid,
state.local_ice_pwd,
attrs.magic,
attrs.trid,
attrs.username
)
Membrane.TelemetryMetrics.execute(@response_sent_event, %{}, %{}, state.telemetry_label)
[magic: attrs.magic, transaction_id: attrs.trid, username: attrs.username]
|> then(&"Sending Binding Success with params: #{inspect(&1)}")
|> Membrane.Logger.debug()
alloc = %Allocation{alloc | magic: attrs.magic}
alloc =
if attrs.use_candidate,
do: %Allocation{alloc | in_nominated_pair: true},
else: alloc
state = put_in(state, [:turn_allocs, alloc_pid], alloc)
maybe_select_alloc(alloc, ctx, state)
end
defp do_handle_connectivity_check(attrs, _alloc_pid, _ctx, state) do
log_debug_connectivity_check(attrs)
{state, []}
end
defp log_debug_connectivity_check(attrs) do
request_type =
case attrs.class do
:response -> "Success"
:request -> "Request"
:error -> "Error"
end
Map.delete(attrs, :class)
|> Map.to_list()
|> then(&"Received Binding #{request_type} with params: #{inspect(&1)}")
|> Membrane.Logger.debug()
end
defp maybe_select_alloc(alloc, ctx, state) do
if alloc.in_nominated_pair and alloc.pid != state.selected_alloc do
select_alloc(alloc.pid, ctx, state)
else
{state, []}
end
end
defp select_alloc(alloc_pid, ctx, state) do
state = Map.put(state, :selected_alloc, alloc_pid)
Membrane.Logger.debug("Component #{@component_id} READY")
Membrane.OpenTelemetry.add_event(@life_span_id, :new_selected_allocation,
allocation: inspect(alloc_pid)
)
alloc_span_id(alloc_pid)
|> Membrane.OpenTelemetry.add_event(:allocation_selected)
state = %{state | component_connected?: true}
{state, actions} =
if state.dtls? == false or state.handshake.status == :finished do
maybe_send_connection_ready(state)
else
Membrane.Logger.debug("Checking for cached handshake packets")
if state.cached_hsk_packets == nil do
Membrane.Logger.debug("Nothing to be sent for component: #{@component_id}")
else
Membrane.Logger.debug(
"Sending cached handshake packets for component: #{@component_id}"
)
send_ice_payload(
state.selected_alloc,
state.cached_hsk_packets,
state.telemetry_label
)
end
with %{dtls?: true} <- state, %{dtls: dtls, client_mode: true} <- state.handshake.state do
{:ok, packets} = ExDTLS.do_handshake(dtls)
send_ice_payload(state.selected_alloc, packets, state.telemetry_label)
else
_state -> :ok
end
{state, actions} =
if state.handshake.status == :finished do
maybe_send_connection_ready(state)
else
{state, []}
end
{%{state | cached_hsk_packets: nil}, actions}
end
{state, demand_actions} = handle_component_state_ready(ctx, state)
actions = demand_actions ++ actions
{state, actions}
end
defp handle_process_result(:handshake_want_read, _ctx, state) do
{[], state}
end
defp handle_process_result({:ok, _packets}, _ctx, state) do
Membrane.Logger.warn("Got regular handshake packet. Ignoring for now.")
{[], state}
end
defp handle_process_result({:handshake_packets, packets}, _ctx, state) do
if state.component_connected? do
send_ice_payload(state.selected_alloc, packets, state.telemetry_label)
{[], state}
else
# if connection is not ready yet cache data
# TODO maybe try to send?
state = %{state | cached_hsk_packets: packets}
{[], state}
end
end
defp handle_process_result({:handshake_finished, hsk_data}, ctx, state),
do: handle_end_of_hsk(hsk_data, ctx, state)
defp handle_process_result({:handshake_finished, hsk_data, packets}, ctx, state) do
send_ice_payload(state.selected_alloc, packets, state.telemetry_label)
handle_end_of_hsk(hsk_data, ctx, state)
end
defp handle_process_result({:connection_closed, reason}, _ctx, state) do
Membrane.Logger.debug("Connection closed, reason: #{inspect(reason)}. Ignoring for now.")
{[], state}
end
defp handle_end_of_hsk(hsk_data, ctx, state) do
Membrane.OpenTelemetry.end_span(@dtls_handshake_span_id)
hsk_state = state.handshake.state
event = to_srtp_keying_material_event(hsk_data)
state =
Map.put(state, :handshake, %{
state: hsk_state,
status: :finished,
keying_material_event: event
})
{state, connection_ready_actions} = maybe_send_connection_ready(state)
actions =
connection_ready_actions ++
maybe_send_demands_actions(ctx, state) ++
maybe_send_keying_material_to_output(ctx, state)
{actions, state}
end
defp handle_component_state_ready(ctx, state) do
state = %{state | component_ready?: true}
actions = maybe_send_demands_actions(ctx, state)
{state, actions}
end
defp maybe_send_demands_actions(ctx, state) do
pad = Pad.ref(:input, @component_id)
# if something is linked, component is ready and handshake is done then send demands
if Map.has_key?(ctx.pads, pad) and state.component_ready? and
state.handshake.status == :finished do
event = if state.dtls?, do: [event: {pad, state.handshake.keying_material_event}], else: []
event ++ [demand: pad]
else
[]
end
end
defp maybe_send_keying_material_to_output(ctx, state) do
pad = Pad.ref(:output, @component_id)
if Map.has_key?(ctx.pads, pad),
do: [event: {pad, state.handshake.keying_material_event}],
else: []
end
defp maybe_send_stream_format(ctx) do
pad = Pad.ref(:output, @component_id)
if ctx.playback == :playing do
[stream_format: {pad, %RemoteStream{}}]
else
[]
end
end
defp start_ice_restart_timer(state) do
timer_ref = Process.send_after(self(), :ice_restart_timeout, @ice_restart_timeout)
%{state | ice_restart_timer: timer_ref}
end
defp stop_ice_restart_timer(%{ice_restart_timer: timer_ref} = state)
when is_reference(timer_ref) do
Process.cancel_timer(timer_ref)
state
end
defp stop_ice_restart_timer(state), do: state
defp maybe_send_connection_ready(
%{connection_status_sent?: false, sdp_offer_arrived?: true} = state
) do
state =
%{state | connection_status_sent?: true}
|> stop_ice_restart_timer()
Membrane.OpenTelemetry.add_event(@life_span_id, :component_state_ready)
actions = [notify_parent: {:connection_ready, @stream_id, @component_id}]
{state, actions}
end
defp maybe_send_connection_ready(
%{connection_status_sent?: false, sdp_offer_arrived?: false} = state
),
do: {%{state | pending_connection_ready?: true}, []}
defp maybe_send_connection_ready(state), do: {state, []}
defp to_srtp_keying_material_event(handshake_data) do
{local_keying_material, remote_keying_material, protection_profile} = handshake_data
%SRTP.KeyingMaterialEvent{
local_keying_material: local_keying_material,
remote_keying_material: remote_keying_material,
protection_profile: protection_profile
}
end
defp send_ice_payload(alloc_pid, payload, telemetry_label, timestamp \\ nil) do
Membrane.TelemetryMetrics.execute(
@payload_sent_event,
%{bytes: byte_size(payload)},
%{},
telemetry_label
)
if timestamp do
Membrane.TelemetryMetrics.execute(
@buffers_with_timestamps_sent,
%{},
%{},
telemetry_label
)
time =
(:erlang.monotonic_time() - timestamp) |> System.convert_time_unit(:native, :microsecond)
Membrane.TelemetryMetrics.execute(
@buffers_processing_time,
%{time: time},
%{},
telemetry_label
)
end
send(alloc_pid, {:send_ice_payload, payload})
end
# defp alloc_span_id(alloc_pid), do: "alloc_span:#{inspect(alloc_pid)}"
defp alloc_span_id(alloc_pid), do: {:turn_allocation_span, alloc_pid}
end