defmodule OpentelemetryProcessPropagator.Task.Supervisor do
@moduledoc """
`OpentelemetryProcessPropagator.Task.Supervisor` provides a set of extensions
to the `Task.Supervisor` module to reduce some boilerplate in propagating OpenTelemetry
contexts across process boundaries. Since these are extensions rather
than a replacement of Elixir's module, this library can be aliased
into a file without concern for creating spans where you do not want them.
Each `Task.Supervisor` function is replicated with two variants: `*_with_span`
and `*_with_linked_span`. Each of these variations has a specific use case.
The original implementation for each function automatically propagates the
current context.
* `*` - propagates the current context
* `*_with_span` - propagates the current context and starts a new child span.
* `*_with_linked_span` - propagates the current context and starts a new linked span.
> #### Module Redefinement {: .info}
>
> This module does not redefine the `Task.Supervisor` module, instead providing a wrapper of the module,
> so this functionality will not globally modify the default behavior of the `Task` module.
"""
alias OpentelemetryProcessPropagator.Task.Wrapper
require OpenTelemetry.Tracer
@doc false
defdelegate child_spec(opts), to: Task.Supervisor
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async(Supervisor.supervisor(), (() -> any())) :: Task.t()
def async(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t()
def async(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async/3` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async/5` for more information.
"""
@spec async_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink(Supervisor.supervisor(), (() -> any())) :: Task.t()
def async_nolink(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task with the current `t:OpenTelemetry.Ctx.t/0` that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink(Supervisor.supervisor(), module(), atom(), [term()]) :: Task.t()
def async_nolink(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_nolink_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new child span that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_nolink_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_nolink(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async_nolink/3` for more information.
"""
@spec async_nolink_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any())
) :: Task.t()
def async_nolink_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_nolink(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task with a new linked span that can be awaited on.
See `Task.Supervisor.async_nolink/5` for more information.
"""
@spec async_nolink_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()]
) :: Task.t()
def async_nolink_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_nolink(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream(
Supervisor.supervisor(),
Enumerable.t(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream(supervisor, enumerable, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream(
Supervisor.supervisor(),
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream(supervisor, enumerable, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_ctx,
[ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_span(supervisor, enumerable, name, start_opts, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream/4` for more information.
"""
@spec async_stream_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream(
supervisor,
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream/6` for more information.
"""
@spec async_stream_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_with_linked_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream(
supervisor,
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with the current `t:OpenTelemetry.Ctx.t/0`
attached.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink(
Supervisor.supervisor(),
Enumerable.t(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink(supervisor, enumerable, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
fun.(arg)
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with the
current `t:OpenTelemetry.Ctx.t/0` attached.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink(
Supervisor.supervisor(),
Enumerable.t(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink(supervisor, enumerable, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_ctx,
[ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new child span.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink_with_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Returns a stream that runs the given function `fun` concurrently
on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream_nolink/4` for more information.
"""
@spec async_stream_nolink_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
(term() -> term()),
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_linked_span(supervisor, enumerable, name, start_opts, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
fn arg ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.(arg)
end
end,
options
)
end
@doc """
Returns a stream where the given function (`module` and `function`)
is mapped concurrently on each element in `enumerable` with a new linked span.
See `Task.Supervisor.async_stream_nolink/6` for more information.
"""
@spec async_stream_nolink_with_linked_span(
Supervisor.supervisor(),
Enumerable.t(),
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
module(),
atom(),
[term()],
keyword()
) :: Enumerable.t()
def async_stream_nolink_with_linked_span(
supervisor,
enumerable,
name,
start_opts,
module,
function_name,
args,
options \\ []
) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.async_stream_nolink(
supervisor,
enumerable,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
@doc """
Starts a task as a child of the given `supervisor` with the
current `t:OpenTelemetry.Ctx.t/0`.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child(
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child(supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` with the
current `t:OpenTelemetry.Ctx.t/0`.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child(
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child(supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(supervisor, Wrapper, :with_ctx, [ctx, {module, function_name, args}], options)
end
@doc """
Starts a task as a child of the given `supervisor` in a new child span.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_span(name, start_opts, supervisor, fun, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
fn ->
OpenTelemetry.Ctx.attach(ctx)
OpenTelemetry.Tracer.with_span name, start_opts do
fun.()
end
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new child span.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child_with_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
Task.Supervisor.start_child(
supervisor,
Wrapper,
:with_span,
[name, start_opts, ctx, {module, function_name, args}],
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new linked span.
See `Task.Supervisor.start_child/3` for more information.
"""
@spec start_child_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
(() -> any()),
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_linked_span(name, start_opts, supervisor, fun, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.start_child(
supervisor,
fn ->
link = OpenTelemetry.link(parent)
OpenTelemetry.Tracer.with_span name, Map.put(start_opts, :links, [link]) do
fun.()
end
end,
options
)
end
@doc """
Starts a task as a child of the given `supervisor` in a new linked span.
See `Task.Supervisor.start_child/5` for more information.
"""
@spec start_child_with_linked_span(
OpenTelemetry.span_name(),
OpenTelemetry.Span.start_opts(),
Supervisor.supervisor(),
module(),
atom(),
[term()],
keyword()
) :: DynamicSupervisor.on_start_child()
def start_child_with_linked_span(name, start_opts, supervisor, module, function_name, args, options \\ []) do
parent = OpenTelemetry.Tracer.current_span_ctx()
Task.Supervisor.start_child(
supervisor,
Wrapper,
:with_linked_span,
[name, start_opts, parent, {module, function_name, args}],
options
)
end
defdelegate children(supervisor), to: Task.Supervisor
defdelegate start_link(), to: Task.Supervisor
defdelegate terminate_child(supervisor, pid), to: Task.Supervisor
end