defmodule PgFlow.Telemetry do
@moduledoc """
Telemetry events emitted by PgFlow.
PgFlow uses `:telemetry` to emit events at key points in the workflow lifecycle.
These events can be used for monitoring, logging, and metrics collection.
## Events
### Worker Lifecycle
- `[:pgflow, :worker, :start]` — Worker process started
- `[:pgflow, :worker, :stop]` — Worker process stopped
### Poll Cycles
- `[:pgflow, :worker, :poll, :start]` — Poll cycle started
- `[:pgflow, :worker, :poll, :stop]` — Poll cycle completed
### Task Execution
- `[:pgflow, :worker, :task, :start]` — Task execution started
- `[:pgflow, :worker, :task, :stop]` — Task execution completed successfully
- `[:pgflow, :worker, :task, :exception]` — Task execution failed
### Run Lifecycle
- `[:pgflow, :run, :started]` — Flow run created (emitted by `PgFlow.Client`)
- `[:pgflow, :run, :completed]` — Flow run completed (emitted by worker after task cascades)
- `[:pgflow, :run, :failed]` — Flow run failed (emitted by worker after task cascades)
## Attaching Handlers
:telemetry.attach_many(
"my-handler",
[
[:pgflow, :worker, :task, :stop],
[:pgflow, :run, :completed],
[:pgflow, :run, :failed]
],
&MyModule.handle_event/4,
nil
)
## Default Logger
PgFlow includes a default logger handler that can be attached by setting
`attach_default_logger: true` in the configuration.
Note: The default logger is disabled by default since `PgFlow.Logger` provides
structured logging directly in the worker. Enable this if you need telemetry-based
logging for specific use cases like metrics collection or external log aggregation.
"""
require Logger
alias PgFlow.Logger, as: PgLogger
@doc """
Attaches the default telemetry handlers for logging.
This is called automatically on application start if `attach_default_logger: true`
is set in the configuration.
"""
@spec attach_default_logger() :: :ok | {:error, :already_exists}
def attach_default_logger do
events = [
[:pgflow, :worker, :start],
[:pgflow, :worker, :stop],
[:pgflow, :worker, :poll, :start],
[:pgflow, :worker, :poll, :stop],
[:pgflow, :worker, :task, :start],
[:pgflow, :worker, :task, :stop],
[:pgflow, :worker, :task, :exception],
[:pgflow, :run, :started],
[:pgflow, :run, :completed],
[:pgflow, :run, :failed]
]
:telemetry.attach_many(
"pgflow-default-logger",
events,
&__MODULE__.handle_event/4,
%{}
)
end
@doc """
Detaches the default telemetry handlers.
"""
@spec detach_default_logger() :: :ok | {:error, :not_found}
def detach_default_logger do
:telemetry.detach("pgflow-default-logger")
end
@doc false
# Worker lifecycle events - minimal logging since Worker.Server handles startup banner
def handle_event([:pgflow, :worker, :start], _measurements, metadata, _config) do
Logger.debug("[Telemetry] Worker started for flow #{metadata.flow_slug}")
end
def handle_event([:pgflow, :worker, :stop], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements[:duration] || 0, :native, :millisecond)
Logger.debug(
"[Telemetry] Worker stopped for flow #{metadata.flow_slug} after #{duration_ms}ms"
)
end
# Poll events - no-op since Worker.Server handles structured polling logs
def handle_event([:pgflow, :worker, :poll, :start], _measurements, _metadata, _config) do
:ok
end
def handle_event([:pgflow, :worker, :poll, :stop], _measurements, _metadata, _config) do
:ok
end
# Task events - no-op since Worker.Server handles structured task logging
def handle_event([:pgflow, :worker, :task, :start], _measurements, _metadata, _config) do
:ok
end
def handle_event([:pgflow, :worker, :task, :stop], _measurements, _metadata, _config) do
:ok
end
def handle_event([:pgflow, :worker, :task, :exception], _measurements, _metadata, _config) do
:ok
end
# Run lifecycle events - use PgFlow.Logger for consistency
def handle_event([:pgflow, :run, :started], _measurements, metadata, _config) do
PgLogger.run_started(metadata.flow_slug, metadata.run_id)
end
def handle_event([:pgflow, :run, :completed], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements[:duration] || 0, :native, :millisecond)
PgLogger.run_completed(metadata.flow_slug, metadata.run_id, duration_ms)
end
def handle_event([:pgflow, :run, :failed], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements[:duration] || 0, :native, :millisecond)
error = metadata[:error] || "Unknown error"
PgLogger.run_failed(metadata.flow_slug, metadata.run_id, duration_ms, error)
end
# Catch-all for any unhandled events
def handle_event(_event, _measurements, _metadata, _config) do
:ok
end
end