# credo:disable-for-this-file
defmodule Rulestead do
@moduledoc """
Root public module for the `rulestead` package.
Phase 3 keeps the store-facing APIs from Phase 2 and adds the pure evaluator
over an explicit in-memory authored flag payload:
- store-facing calls return `{:ok, value} | {:error, %Rulestead.Error{}}`
- bang variants raise the same `%Rulestead.Error{}`
- evaluation helpers consume an authored flag payload first and explicit
context second
"""
alias Rulestead.{
Admin.Authorizer,
Admin.Redaction,
ConfigError,
Context,
Error,
Evaluator,
Explainer,
Governance.ApprovalRequirement,
Governance.BlastRadiusThreshold,
Manifest,
Promotion.Apply,
Promotion.Compare,
Result,
Runtime,
Store,
StoreError,
Targeting.ImpactPreview,
Telemetry
}
alias Rulestead.Manifest.Plan
alias Rulestead.Manifest.Result, as: ManifestResult
alias Rulestead.Store.Command
@version Mix.Project.config()[:version] || "0.1.0"
@doc """
Returns the package version.
"""
@spec version() :: String.t()
def version, do: @version
@doc """
Tracks a custom analytics event.
"""
@spec track(Context.t() | map() | String.t(), String.t(), map()) :: :ok
defdelegate track(context_or_actor_id, event_name, metadata \\ %{}), to: Rulestead.Analytics
@doc """
Fetches the authored flag state for a `flag_key` and `environment_key`.
"""
@spec fetch_flag(String.t() | atom(), String.t() | atom(), keyword()) :: Store.result(map())
def fetch_flag(flag_key, environment_key, opts \\ []) do
flag_key
|> Command.FetchFlag.new(environment_key, opts)
|> fetch_flag()
end
@doc """
Fetches the authored flag state for a pre-built store command.
"""
@spec fetch_flag(Command.FetchFlag.t()) :: Store.result(map())
def fetch_flag(%Command.FetchFlag{} = command) do
run_store(:fetch_flag, [command], command)
end
@doc """
Compares authored source and target environment state for promotion preview flows.
"""
@spec compare_environments(String.t() | atom(), String.t() | atom(), keyword()) ::
Store.result(map())
def compare_environments(source_environment_key, target_environment_key, opts \\ []) do
source_environment_key
|> Command.CompareEnvironments.new(target_environment_key, opts)
|> compare_environments()
end
@doc """
Compares authored source and target environment state for a pre-built compare command.
"""
@spec compare_environments(Command.CompareEnvironments.t()) :: Store.result(map())
def compare_environments(%Command.CompareEnvironments{} = command) do
admin_read(:compare_environments, command)
end
@doc """
Applies a bounded direct promotion bundle through compare revalidation and the configured store.
"""
@spec apply_promotion(Command.ApplyPromotion.t()) :: Store.result(map())
def apply_promotion(%Command.ApplyPromotion{} = command) do
with :ok <- Apply.validate(command) do
admin_write(:apply_promotion, command)
end
end
@doc """
Builds and applies a direct promotion bundle from root-level attributes.
"""
@spec apply_promotion(map() | keyword(), keyword()) :: Store.result(map())
def apply_promotion(attrs, opts \\ []) when is_map(attrs) or is_list(attrs) do
attrs
|> Command.ApplyPromotion.new(opts)
|> apply_promotion()
end
@doc """
Exports a deterministic authored-state manifest for one environment.
"""
@spec export_manifest(String.t() | atom(), keyword()) :: {:ok, map()} | {:error, Error.t()}
def export_manifest(environment_key, opts \\ []) do
Manifest.export(environment_key, opts)
end
@doc """
Previews a manifest import as a saved apply plan artifact.
"""
@spec import_manifest(binary() | map(), keyword()) :: {:ok, map()} | {:error, Error.t()}
def import_manifest(content, opts \\ []) do
Manifest.Import.plan(content, opts)
end
@doc """
Applies a previously generated manifest import plan artifact.
"""
@spec apply_manifest_plan(binary() | map(), keyword()) :: {:ok, map()} | {:error, Error.t()}
def apply_manifest_plan(content, opts \\ []) do
Manifest.Import.apply(content, opts)
end
@doc """
Builds a saved promote plan artifact from a live compare preview.
"""
@spec plan_promotion(String.t() | atom(), String.t() | atom(), keyword()) ::
{:ok, map()} | {:error, Error.t()}
def plan_promotion(source_environment_key, target_environment_key, opts \\ []) do
compare_opts =
opts
|> Keyword.take([:flag_keys, :tenant_key])
|> Enum.reject(fn {_key, value} -> is_nil(value) end)
with {:ok, compare} <-
compare_environments(source_environment_key, target_environment_key, compare_opts) do
plan =
Plan.build_promote(%{
source_environment_key: compare.source_environment.key,
target_environment_key: compare.target_environment.key,
status: promotion_plan_status(compare),
tenant_key: compare.tenant_key,
compare_token: compare.compare_token,
source_fingerprint: compare.source_fingerprint,
target_fingerprint: compare.target_fingerprint,
dependency_closure_keys: compare.dependency_closure_keys,
proposed_target_bundle:
Map.new(compare.flags, fn flag ->
{flag.flag_key, flag.proposed_target_state}
end)
})
{:ok,
ManifestResult.new(%{
status: plan["status"],
command: "rulestead.promote.plan",
summary: %{
"source_environment_key" => plan["source_environment_key"],
"target_environment_key" => plan["target_environment_key"],
"flag_count" => length(plan["flag_keys"]),
"plan_token" => plan["plan_token"]
},
findings: promotion_findings(compare),
details: %{"plan" => plan}
})}
end
end
@doc """
Applies a previously generated promote plan artifact.
"""
@spec apply_promotion_plan(binary() | map(), keyword()) :: {:ok, map()} | {:error, Error.t()}
def apply_promotion_plan(content, opts \\ []) do
with {:ok, plan} <- Plan.load(content),
:ok <- validate_promotion_plan_mode(plan),
{:ok, reason} <- require_promotion_reason(opts),
:ok <- validate_target_tenant(plan, opts),
command <- promotion_apply_command(plan, reason, opts),
{:ok, envelope} <- dispatch_promotion_plan(plan, command, reason, opts) do
{:ok, envelope}
else
{:error, %Error{message: "promotion compare preview is stale"}} ->
stale_promotion_result(content)
{:error, %Error{message: "promotion target tenant drifted"}} ->
stale_promotion_result(content, "target tenant drifted")
{:error, %Error{} = error} ->
map_promotion_apply_error(content, error)
end
end
@doc """
Creates a flag through the configured store adapter.
"""
@spec create_flag(Command.CreateFlag.t()) :: Store.result(map())
def create_flag(%Command.CreateFlag{} = command) do
admin_write(:create_flag, command)
end
@doc """
Creates a flag from root-level attributes.
"""
@spec create_flag(map() | keyword(), keyword()) :: Store.result(map())
def create_flag(attrs, opts \\ []) when is_map(attrs) or is_list(attrs) do
attrs
|> Command.CreateFlag.new(opts)
|> create_flag()
end
@doc """
Updates flag metadata through the configured store adapter.
"""
@spec update_flag(Command.UpdateFlag.t()) :: Store.result(map())
def update_flag(%Command.UpdateFlag{} = command) do
admin_write(:update_flag, command)
end
@doc """
Updates a flag from root-level attributes.
"""
@spec update_flag(String.t() | atom(), map() | keyword(), keyword()) :: Store.result(map())
def update_flag(flag_key, attrs, opts \\ []) when is_map(attrs) or is_list(attrs) do
flag_key
|> Command.UpdateFlag.new(attrs, opts)
|> update_flag()
end
@doc """
Bang variant of `fetch_flag/3`.
"""
@spec fetch_flag!(String.t() | atom(), String.t() | atom(), keyword()) :: map()
def fetch_flag!(flag_key, environment_key, opts \\ []) do
flag_key
|> fetch_flag(environment_key, opts)
|> unwrap!()
end
@doc """
Saves a draft ruleset through the configured store adapter.
"""
@spec save_draft_ruleset(Command.SaveDraftRuleset.t()) :: Store.result(map())
def save_draft_ruleset(%Command.SaveDraftRuleset{} = command) do
admin_write(:save_draft_ruleset, command)
end
@doc """
Bang variant of `save_draft_ruleset/1`.
"""
@spec save_draft_ruleset!(Command.SaveDraftRuleset.t()) :: map()
def save_draft_ruleset!(%Command.SaveDraftRuleset{} = command) do
command
|> save_draft_ruleset()
|> unwrap!()
end
@doc """
Publishes a ruleset version through the configured store adapter.
"""
@spec publish_ruleset(Command.PublishRuleset.t()) :: Store.result(map())
def publish_ruleset(%Command.PublishRuleset{} = command) do
admin_write(:publish_ruleset, command)
end
@doc """
Bang variant of `publish_ruleset/1`.
"""
@spec publish_ruleset!(Command.PublishRuleset.t()) :: map()
def publish_ruleset!(%Command.PublishRuleset{} = command) do
command
|> publish_ruleset()
|> unwrap!()
end
@doc """
Archives a flag through the configured store adapter.
"""
@spec archive_flag(Command.ArchiveFlag.t()) :: Store.result(map())
def archive_flag(%Command.ArchiveFlag{} = command) do
admin_write(:archive_flag, command)
end
@doc """
Bang variant of `archive_flag/1`.
"""
@spec archive_flag!(Command.ArchiveFlag.t()) :: map()
def archive_flag!(%Command.ArchiveFlag{} = command) do
command
|> archive_flag()
|> unwrap!()
end
@doc """
Engages a per-flag per-environment kill switch.
"""
@spec engage_kill_switch(Command.EngageKillSwitch.t()) :: Store.result(map())
def engage_kill_switch(%Command.EngageKillSwitch{} = command) do
admin_write(:engage_kill_switch, command)
end
@spec engage_kill_switch(String.t() | atom(), String.t() | atom(), map(), keyword()) ::
Store.result(map())
def engage_kill_switch(flag_key, environment_key, actor, opts \\ []) do
flag_key
|> Command.EngageKillSwitch.new(environment_key,
actor: actor,
reason: Keyword.get(opts, :reason),
metadata: Keyword.get(opts, :metadata, %{})
)
|> engage_kill_switch()
end
@doc """
Releases a per-flag per-environment kill switch.
"""
@spec release_kill_switch(Command.ReleaseKillSwitch.t()) :: Store.result(map())
def release_kill_switch(%Command.ReleaseKillSwitch{} = command) do
admin_write(:release_kill_switch, command)
end
@spec release_kill_switch(String.t() | atom(), String.t() | atom(), map(), keyword()) ::
Store.result(map())
def release_kill_switch(flag_key, environment_key, actor, opts \\ []) do
flag_key
|> Command.ReleaseKillSwitch.new(environment_key,
actor: actor,
reason: Keyword.get(opts, :reason),
metadata: Keyword.get(opts, :metadata, %{})
)
|> release_kill_switch()
end
@doc """
Lists redacted audit events for one flag or all flags.
"""
@spec list_audit_events(Command.ListAuditEvents.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_audit_events(command_or_opts \\ Command.ListAuditEvents.new())
def list_audit_events(%Command.ListAuditEvents{} = command) do
admin_read(:list_audit_events, command)
end
def list_audit_events(opts) when is_list(opts) do
opts
|> Command.ListAuditEvents.new()
|> list_audit_events()
end
@doc """
Writes a linked inverse action for a prior audit event.
"""
@spec rollback_audit_event(Command.RollbackAuditEvent.t()) :: Store.result(map())
def rollback_audit_event(%Command.RollbackAuditEvent{} = command) do
admin_write(:rollback_audit_event, command)
end
@spec rollback_audit_event(String.t(), keyword()) :: Store.result(map())
def rollback_audit_event(audit_event_id, opts \\ []) when is_binary(audit_event_id) do
audit_event_id
|> Command.RollbackAuditEvent.new(
actor: Keyword.get(opts, :actor),
reason: Keyword.get(opts, :reason),
metadata: Keyword.get(opts, :metadata, %{})
)
|> rollback_audit_event()
end
@doc """
Submits a governed change request through the configured store adapter.
"""
@spec submit_change_request(Command.SubmitChangeRequest.t()) :: Store.result(map())
def submit_change_request(%Command.SubmitChangeRequest{} = command) do
admin_write(:submit_change_request, command)
end
@doc """
Approves a governed change request through the configured store adapter.
"""
@spec approve_change_request(Command.ApproveChangeRequest.t()) :: Store.result(map())
def approve_change_request(%Command.ApproveChangeRequest{} = command) do
admin_write(:approve_change_request, command)
end
@doc """
Rejects a governed change request through the configured store adapter.
"""
@spec reject_change_request(Command.RejectChangeRequest.t()) :: Store.result(map())
def reject_change_request(%Command.RejectChangeRequest{} = command) do
admin_write(:reject_change_request, command)
end
@doc """
Cancels a governed change request through the configured store adapter.
"""
@spec cancel_change_request(Command.CancelChangeRequest.t()) :: Store.result(map())
def cancel_change_request(%Command.CancelChangeRequest{} = command) do
admin_write(:cancel_change_request, command)
end
@doc """
Executes an approved governed change request through the configured store adapter.
"""
@spec execute_change_request(Command.ExecuteChangeRequest.t()) :: Store.result(map())
def execute_change_request(%Command.ExecuteChangeRequest{} = command) do
admin_write(:execute_change_request, command)
end
@doc """
Schedules an approved governed change request through the configured store adapter.
"""
@spec schedule_change_request(Command.ScheduleChangeRequest.t()) :: Store.result(map())
def schedule_change_request(%Command.ScheduleChangeRequest{} = command) do
admin_write(:schedule_change_request, command)
end
@doc """
Fetches one change request through the configured store adapter.
"""
@spec fetch_change_request(Command.FetchChangeRequest.t()) :: Store.result(map())
def fetch_change_request(%Command.FetchChangeRequest{} = command) do
run_store(:fetch_change_request, [command], command)
end
@doc """
Lists change requests through the configured store adapter.
"""
@spec list_change_requests(Command.ListChangeRequests.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_change_requests(command_or_opts \\ Command.ListChangeRequests.new())
def list_change_requests(%Command.ListChangeRequests{} = command) do
run_store(:list_change_requests, [command], command)
end
def list_change_requests(opts) when is_list(opts) do
opts
|> Command.ListChangeRequests.new()
|> list_change_requests()
end
@doc """
Schedules a narrowly allowed direct governed action through the configured store adapter.
"""
@spec schedule_governed_action(Command.ScheduleGovernedAction.t()) :: Store.result(map())
def schedule_governed_action(%Command.ScheduleGovernedAction{} = command) do
admin_write(:schedule_governed_action, command)
end
@doc """
Cancels a scheduled execution through the configured store adapter.
"""
@spec cancel_scheduled_execution(Command.CancelScheduledExecution.t()) :: Store.result(map())
def cancel_scheduled_execution(%Command.CancelScheduledExecution{} = command) do
admin_write(:cancel_scheduled_execution, command)
end
@doc """
Requeues a quarantined scheduled execution through the configured store adapter.
"""
@spec requeue_scheduled_execution(Command.RequeueScheduledExecution.t()) :: Store.result(map())
def requeue_scheduled_execution(%Command.RequeueScheduledExecution{} = command) do
admin_write(:requeue_scheduled_execution, command)
end
@doc """
Fetches one scheduled execution through the configured store adapter.
"""
@spec fetch_scheduled_execution(Command.FetchScheduledExecution.t()) :: Store.result(map())
def fetch_scheduled_execution(%Command.FetchScheduledExecution{} = command) do
run_store(:fetch_scheduled_execution, [command], command)
end
@doc """
Lists scheduled executions through the configured store adapter.
"""
@spec list_scheduled_executions(Command.ListScheduledExecutions.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_scheduled_executions(command_or_opts \\ Command.ListScheduledExecutions.new())
def list_scheduled_executions(%Command.ListScheduledExecutions{} = command) do
run_store(:list_scheduled_executions, [command], command)
end
def list_scheduled_executions(opts) when is_list(opts) do
opts
|> Command.ListScheduledExecutions.new()
|> list_scheduled_executions()
end
@doc """
Records an inbound webhook receipt through the configured store adapter.
"""
@spec receive_inbound_webhook(Command.ReceiveInboundWebhook.t()) :: Store.result(map())
def receive_inbound_webhook(%Command.ReceiveInboundWebhook{} = command) do
result = run_store(:receive_inbound_webhook, [command], command)
case result do
{:ok, receipt} ->
event = if receipt.verified_state == :accepted, do: :received, else: :rejected
Telemetry.execute(
[:rulestead, :ops, :webhook, event],
%{count: 1},
Telemetry.webhook_metadata(receipt, %{reason: event})
)
{:error, _error} ->
:ok
end
result
end
@doc """
Fetches one webhook receipt through the configured store adapter.
"""
@spec fetch_webhook_record(String.t() | Command.FetchWebhookRecord.t(), keyword()) ::
Store.result(map())
def fetch_webhook_record(id_or_command, opts \\ [])
def fetch_webhook_record(%Command.FetchWebhookRecord{} = command, _opts) do
admin_read(:fetch_webhook_record, command)
end
def fetch_webhook_record(receipt_id, opts) when is_binary(receipt_id) do
receipt_id
|> Command.FetchWebhookRecord.new(opts)
|> fetch_webhook_record([])
end
@doc """
Lists webhook receipts through the configured store adapter.
"""
@spec list_webhook_records(Command.ListWebhookRecords.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_webhook_records(command_or_opts \\ [])
def list_webhook_records(%Command.ListWebhookRecords{} = command) do
admin_read(:list_webhook_records, command)
end
def list_webhook_records(opts) when is_list(opts) do
opts
|> Command.ListWebhookRecords.new()
|> list_webhook_records()
end
@doc """
Creates a new webhook destination.
"""
@spec create_webhook_destination(Command.CreateWebhookDestination.t() | map() | keyword()) ::
Store.result(map())
def create_webhook_destination(%Command.CreateWebhookDestination{} = command) do
admin_write(:create_webhook_destination, command)
end
def create_webhook_destination(attrs) when is_map(attrs) or is_list(attrs) do
attrs
|> Command.CreateWebhookDestination.new()
|> create_webhook_destination()
end
@doc """
Updates an existing webhook destination.
"""
@spec update_webhook_destination(
Command.UpdateWebhookDestination.t()
| {String.t(), map() | keyword()}
) :: Store.result(map())
def update_webhook_destination(%Command.UpdateWebhookDestination{} = command) do
admin_write(:update_webhook_destination, command)
end
def update_webhook_destination(id, attrs)
when is_binary(id) and (is_map(attrs) or is_list(attrs)) do
id
|> Command.UpdateWebhookDestination.new(attrs)
|> update_webhook_destination()
end
@doc """
Fetches a single webhook destination by ID.
"""
@spec fetch_webhook_destination(Command.FetchWebhookDestination.t() | String.t(), keyword()) ::
Store.result(map())
def fetch_webhook_destination(id_or_command, opts \\ [])
def fetch_webhook_destination(%Command.FetchWebhookDestination{} = command, _opts) do
admin_read(:fetch_webhook_destination, command)
end
def fetch_webhook_destination(id, opts) when is_binary(id) do
id
|> Command.FetchWebhookDestination.new(opts)
|> fetch_webhook_destination([])
end
@doc """
Lists webhook destinations.
"""
@spec list_webhook_destinations(Command.ListWebhookDestinations.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_webhook_destinations(command_or_opts \\ [])
def list_webhook_destinations(%Command.ListWebhookDestinations{} = command) do
admin_read(:list_webhook_destinations, command)
end
def list_webhook_destinations(opts) when is_list(opts) do
opts
|> Command.ListWebhookDestinations.new()
|> list_webhook_destinations()
end
@doc """
Lists webhook outbound deliveries.
"""
@spec list_webhook_deliveries(Command.ListWebhookDeliveries.t() | keyword()) ::
Store.result(Command.Page.t(map()))
def list_webhook_deliveries(command_or_opts \\ [])
def list_webhook_deliveries(%Command.ListWebhookDeliveries{} = command) do
admin_read(:list_webhook_deliveries, command)
end
def list_webhook_deliveries(opts) when is_list(opts) do
opts
|> Command.ListWebhookDeliveries.new()
|> list_webhook_deliveries()
end
@doc """
Retries a failed webhook delivery.
"""
@spec retry_webhook_delivery(Command.RetryWebhookDelivery.t() | String.t(), keyword()) ::
Store.result(map())
def retry_webhook_delivery(id_or_command, opts \\ [])
def retry_webhook_delivery(%Command.RetryWebhookDelivery{} = command, _opts) do
admin_write(:retry_webhook_delivery, command)
end
def retry_webhook_delivery(delivery_id, opts) when is_binary(delivery_id) do
delivery_id
|> Command.RetryWebhookDelivery.new(opts)
|> retry_webhook_delivery([])
end
@doc """
Normalizes a verified inbound webhook event into the local governance path.
"""
@spec execute_inbound_event(Rulestead.Webhooks.InboundEvent.t(), map()) :: Store.result(map())
def execute_inbound_event(%Rulestead.Webhooks.InboundEvent{} = event, receipt) do
actor = %{
id: "system:webhook:#{event.endpoint_key || event.provider}",
roles: [:operator, :prod_operator],
display: "Webhook Ingress (#{event.provider})"
}
metadata =
(event.metadata || %{})
|> Map.put("webhook_provider", event.provider)
|> Map.put("webhook_delivery_id", event.delivery_id)
|> Map.put("webhook_receipt_id", receipt.id)
|> Map.put("correlation_id", event.correlation_id)
|> Map.put("environment_key", event.payload["environment_key"])
# Convert inbound payload into a governance command
case inbound_to_command(event, actor, metadata) do
{:ok, command} ->
dispatch_governance_command(command)
{:error, error} ->
{:error, error}
end
end
defp inbound_to_command(event, actor, metadata) do
action = event.payload["action"]
resource_type = event.payload["resource_type"] || "flag"
resource_key = event.payload["resource_key"] || event.payload["flag_key"]
environment_key = event.payload["environment_key"]
case action do
"publish" ->
{:ok,
Command.PublishRuleset.new(resource_key, environment_key,
actor: actor,
version: event.payload["version"],
metadata: metadata
)}
"submit_change_request" ->
{:ok,
Command.SubmitChangeRequest.new(
%{
resource_type: stringify_resource(resource_type),
resource_key: resource_key,
environment_key: environment_key,
action: String.to_existing_atom(event.payload["command_operation"]),
command: event.payload["command_attrs"] || %{},
approval_requirement: %{
# Placeholder, will be resolved by Authorizer
action: String.to_existing_atom(event.payload["command_operation"]),
environment_key: environment_key,
required_approvals: 0,
change_request_required?: true,
self_approval_allowed?: false
}
},
actor: actor,
reason: event.payload["reason"],
metadata: metadata
)}
"engage_kill_switch" ->
{:ok,
Command.EngageKillSwitch.new(resource_key, environment_key,
actor: actor,
reason: event.payload["reason"],
metadata: metadata
)}
"release_kill_switch" ->
{:ok,
Command.ReleaseKillSwitch.new(resource_key, environment_key,
actor: actor,
reason: event.payload["reason"],
metadata: metadata
)}
"schedule_governed_action" ->
scheduled_for =
event.payload["scheduled_for"] ||
event.payload["received_at"] ||
event.received_at
execution_mode =
case event.payload["execution_mode"] do
nil -> :policy_bypass
mode when is_atom(mode) -> mode
mode when is_binary(mode) -> String.to_existing_atom(mode)
end
{:ok,
Command.ScheduleGovernedAction.new(
%{
action: String.to_existing_atom(event.payload["command_operation"]),
environment_key: environment_key,
resource_type: stringify_resource(resource_type),
resource_key: resource_key,
command: event.payload["command_attrs"] || %{},
scheduled_for: scheduled_for,
execution_mode: execution_mode
},
actor: actor,
reason: event.payload["reason"],
metadata: metadata
)}
# We can expand this list as needed
_ ->
{:error, StoreError.invalid_command("unsupported inbound webhook action: #{action}")}
end
end
defp dispatch_governance_command(%Command.PublishRuleset{} = command),
do: publish_ruleset(command)
defp dispatch_governance_command(%Command.SubmitChangeRequest{} = command),
do: submit_change_request(command)
defp dispatch_governance_command(%Command.EngageKillSwitch{} = command),
do: engage_kill_switch(command)
defp dispatch_governance_command(%Command.ReleaseKillSwitch{} = command),
do: release_kill_switch(command)
defp dispatch_governance_command(%Command.ScheduleGovernedAction{} = command),
do: schedule_governed_action(command)
@doc """
Resolves whether a governed action must go through a change request.
"""
@spec authorize_governed_action(term(), atom(), term(), String.t() | atom() | nil) ::
{:ok, Rulestead.Governance.ApprovalRequirement.t()}
| {:error, Rulestead.Error.t(), Authorizer.audit_payload()}
def authorize_governed_action(actor, action, resource, environment_key) do
Authorizer.authorize_governed_action(actor, action, resource, environment_key)
end
@doc """
Resolves whether an actor may approve a specific change request.
"""
@spec authorize_change_request_approval(
term(),
term(),
atom(),
term(),
String.t() | atom() | nil
) ::
{:ok, Rulestead.Governance.ApprovalRequirement.t()}
| {:error, Rulestead.Error.t(), Authorizer.audit_payload()}
def authorize_change_request_approval(actor, submitter, action, resource, environment_key) do
Authorizer.authorize_change_request_approval(
actor,
submitter,
action,
resource,
environment_key
)
end
@doc """
Lists flags through the configured store adapter.
Phase 2 keeps this as the shared list/search surface for store adapters.
"""
@spec list_flags() :: Store.result(Command.Page.t(map()))
def list_flags do
list_flags(Command.ListFlags.new())
end
@spec list_flags(keyword()) :: Store.result(Command.Page.t(map()))
def list_flags(opts) when is_list(opts) do
opts
|> Command.ListFlags.new()
|> list_flags()
end
@spec list_flags(Command.ListFlags.t()) :: Store.result(Command.Page.t(map()))
def list_flags(%Command.ListFlags{} = command) do
run_store(:list_flags, [command], command)
end
@doc """
Bang variant of `list_flags/0` and `list_flags/1`.
"""
@spec list_flags!() :: Command.Page.t(map())
@spec list_flags!(Command.ListFlags.t() | keyword()) :: Command.Page.t(map())
def list_flags!(command \\ Command.ListFlags.new()) do
command
|> list_flags()
|> unwrap!()
end
@doc """
Lists environments through the configured store adapter.
"""
@spec list_environments() :: Store.result([map()])
def list_environments do
list_environments(Command.ListEnvironments.new())
end
@spec list_environments(keyword()) :: Store.result([map()])
def list_environments(opts) when is_list(opts) do
opts
|> Command.ListEnvironments.new()
|> list_environments()
end
@spec list_environments(Command.ListEnvironments.t()) :: Store.result([map()])
def list_environments(%Command.ListEnvironments{} = command) do
run_store(:list_environments, [command], command)
end
@doc """
Lists reusable audiences through the configured store adapter.
"""
@spec list_audiences() :: Store.result([map()])
def list_audiences do
list_audiences(Command.ListAudiences.new())
end
@spec list_audiences(keyword()) :: Store.result([map()])
def list_audiences(opts) when is_list(opts) do
opts
|> Command.ListAudiences.new()
|> list_audiences()
end
@spec list_audiences(Command.ListAudiences.t()) :: Store.result([map()])
def list_audiences(%Command.ListAudiences{} = command) do
run_store(:list_audiences, [command], command)
end
@doc """
Lists audience dependency inventory rows with deterministic scoped ordering.
"""
@spec list_audience_dependencies() :: Store.result(map())
def list_audience_dependencies do
list_audience_dependencies(Command.ListAudienceDependencies.new())
end
@spec list_audience_dependencies(keyword()) :: Store.result(map())
def list_audience_dependencies(opts) when is_list(opts) do
opts
|> Command.ListAudienceDependencies.new()
|> list_audience_dependencies()
end
@spec list_audience_dependencies(Command.ListAudienceDependencies.t()) :: Store.result(map())
def list_audience_dependencies(%Command.ListAudienceDependencies{} = command) do
admin_read(:list_audience_dependencies, command)
end
@doc """
Previews the bounded impact of a reusable audience mutation.
"""
@spec preview_audience_impact(String.t() | atom(), String.t() | atom(), keyword()) ::
Store.result(map())
def preview_audience_impact(audience_key, operation, opts \\ []) do
audience_key
|> Command.PreviewAudienceImpact.new(operation, opts)
|> preview_audience_impact()
end
@spec preview_audience_impact(Command.PreviewAudienceImpact.t()) :: Store.result(map())
def preview_audience_impact(%Command.PreviewAudienceImpact{} = command) do
admin_read(:preview_audience_impact, command)
end
@doc """
Assesses audience mutation blast radius from preview payload inputs.
Returns a deterministic threshold verdict suitable for operator display and
change-request embedding. Does not perform I/O — supply preview data from
`preview_audience_impact/2`.
"""
@spec assess_audience_blast_radius(map(), keyword()) :: {:ok, map()} | {:error, Error.t()}
def assess_audience_blast_radius(preview_or_attrs, opts \\ []) do
BlastRadiusThreshold.assess(preview_or_attrs, opts)
end
@doc """
Applies a reusable audience mutation after validating fresh preview evidence.
Protected-environment threshold evaluation runs in the store pipeline via
`Rulestead.Governance.BlastRadiusThreshold.validate_protected_apply/3`.
Custom store adapters must call that helper to inherit the same contract.
"""
@spec apply_audience_mutation(Command.ApplyAudienceMutation.t() | map() | keyword(), keyword()) ::
Store.result(map())
def apply_audience_mutation(command_or_attrs, opts \\ [])
def apply_audience_mutation(%Command.ApplyAudienceMutation{} = command, _opts) do
with :ok <- validate_audience_mutation_confirmation(command) do
admin_write(:apply_audience_mutation, command)
end
end
def apply_audience_mutation(attrs, opts) when is_map(attrs) or is_list(attrs) do
attrs
|> Command.ApplyAudienceMutation.new(opts)
|> apply_audience_mutation()
end
@doc """
Records bounded evaluation freshness for one flag/environment pair.
"""
@spec record_evaluation(Command.RecordEvaluation.t()) :: Store.result(map())
def record_evaluation(%Command.RecordEvaluation{} = command) do
run_store(:record_evaluation, [command], command)
end
@doc """
Records bounded evaluation freshness using root-level arguments.
"""
@spec record_evaluation(String.t() | atom(), String.t() | atom(), DateTime.t()) ::
Store.result(map())
def record_evaluation(flag_key, environment_key, %DateTime{} = last_evaluated_at) do
flag_key
|> Command.RecordEvaluation.new(environment_key, last_evaluated_at)
|> record_evaluation()
end
@doc """
Publishes a bounded rollout stage advancement through the configured store adapter.
"""
@spec advance_rollout(Command.AdvanceRollout.t()) :: Store.result(map())
def advance_rollout(%Command.AdvanceRollout{} = command) do
admin_write(:advance_rollout, command)
end
@doc """
Builds and executes a bounded rollout stage advancement from root-level arguments.
"""
@spec advance_rollout(String.t() | atom(), String.t() | atom(), map() | keyword(), keyword()) ::
Store.result(map())
def advance_rollout(flag_key, environment_key, attrs, opts \\ [])
when is_map(attrs) or is_list(attrs) do
flag_key
|> Command.AdvanceRollout.new(environment_key, attrs, opts)
|> advance_rollout()
end
@doc """
Evaluates guardrail facts for an active rollout stage and records the resulting operational state.
"""
@spec evaluate_guarded_rollout(Command.EvaluateGuardedRollout.t()) :: Store.result(map())
def evaluate_guarded_rollout(%Command.EvaluateGuardedRollout{} = command) do
run_store(:evaluate_guarded_rollout, [command], command)
end
@doc """
Builds and executes a guarded rollout evaluation from root-level arguments.
"""
@spec evaluate_guarded_rollout(
String.t() | atom(),
String.t() | atom(),
map() | keyword(),
keyword()
) :: Store.result(map())
def evaluate_guarded_rollout(flag_key, environment_key, attrs, opts \\ [])
when is_map(attrs) or is_list(attrs) do
flag_key
|> Command.EvaluateGuardedRollout.new(environment_key, attrs, opts)
|> evaluate_guarded_rollout()
end
@doc """
Upserts opt-in auto-advance policy for one staged rollout rule.
Phase 61 persists the authored policy contract only; enabling policy does not
schedule observation ticks or advance rollout stages.
"""
@spec upsert_rollout_auto_advance_policy(Command.UpsertRolloutAutoAdvancePolicy.t()) ::
Store.result(map())
def upsert_rollout_auto_advance_policy(%Command.UpsertRolloutAutoAdvancePolicy{} = command) do
admin_write(:upsert_rollout_auto_advance_policy, command)
end
@doc """
Builds and upserts an auto-advance policy from root-level arguments.
"""
@spec upsert_rollout_auto_advance_policy(
String.t() | atom(),
String.t() | atom(),
map() | keyword(),
keyword()
) :: Store.result(map())
def upsert_rollout_auto_advance_policy(flag_key, environment_key, attrs, opts \\ [])
when is_map(attrs) or is_list(attrs) do
attrs = Map.new(attrs)
rule_key = Map.fetch!(attrs, :rule_key)
flag_key
|> Command.UpsertRolloutAutoAdvancePolicy.new(environment_key, rule_key, attrs, opts)
|> upsert_rollout_auto_advance_policy()
end
@doc """
Fetches the durable auto-advance policy for one staged rollout rule.
"""
@spec fetch_rollout_auto_advance_policy(Command.FetchRolloutAutoAdvancePolicy.t()) ::
Store.result(map())
def fetch_rollout_auto_advance_policy(%Command.FetchRolloutAutoAdvancePolicy{} = command) do
run_store(:fetch_rollout_auto_advance_policy, [command], command)
end
@doc """
Builds and executes an auto-advance policy fetch from root-level arguments.
"""
@spec fetch_rollout_auto_advance_policy(
String.t() | atom(),
String.t() | atom(),
String.t() | atom(),
keyword()
) :: Store.result(map())
def fetch_rollout_auto_advance_policy(flag_key, environment_key, rule_key, _opts \\ []) do
flag_key
|> Command.FetchRolloutAutoAdvancePolicy.new(environment_key, rule_key)
|> fetch_rollout_auto_advance_policy()
end
@doc """
Evaluates auto-advance eligibility for one staged rollout rule.
Evaluation-only in Phase 61: returns eligibility without mutating rollout
stage, guardrail decisions, or scheduling governed `advance_rollout`.
"""
@spec evaluate_rollout_auto_advance(Command.EvaluateRolloutAutoAdvance.t()) ::
Store.result(map())
def evaluate_rollout_auto_advance(%Command.EvaluateRolloutAutoAdvance{} = command) do
run_store(:evaluate_rollout_auto_advance, [command], command)
end
@doc """
Builds and executes an auto-advance eligibility evaluation from root-level arguments.
"""
@spec evaluate_rollout_auto_advance(
String.t() | atom(),
String.t() | atom(),
map() | keyword(),
keyword()
) :: Store.result(map())
def evaluate_rollout_auto_advance(flag_key, environment_key, attrs, opts \\ [])
when is_map(attrs) or is_list(attrs) do
attrs = Map.new(attrs)
rule_key = Map.fetch!(attrs, :rule_key)
flag_key
|> Command.EvaluateRolloutAutoAdvance.new(environment_key, rule_key, attrs, opts)
|> evaluate_rollout_auto_advance()
end
@doc """
Fetches the latest derived guardrail status for one rollout rule or stage.
"""
@spec fetch_guardrail_status(Command.FetchGuardrailStatus.t()) :: Store.result(map())
def fetch_guardrail_status(%Command.FetchGuardrailStatus{} = command) do
admin_read(:fetch_guardrail_status, command)
end
@doc """
Builds a guardrail-status query from root-level arguments.
"""
@spec fetch_guardrail_status(String.t() | atom(), String.t() | atom(), keyword()) ::
Store.result(map())
def fetch_guardrail_status(flag_key, environment_key, opts \\ []) do
flag_key
|> Command.FetchGuardrailStatus.new(environment_key, opts)
|> fetch_guardrail_status()
end
@doc """
Evaluates an authored in-memory flag payload against an explicit context.
"""
@spec evaluate(map(), Context.t() | keyword() | map(), keyword()) ::
{:ok, Result.t()} | {:error, Error.t()}
def evaluate(flag_payload, context, opts \\ []) do
context = normalize_eval_context(context, opts)
Telemetry.span(
[:rulestead, :eval, :decide],
Telemetry.metadata(Telemetry.base_metadata(flag_payload, context)),
fn ->
result =
with {:ok, result} <- Evaluator.evaluate(flag_payload, context) do
emit_warnings(result)
# Record telemetry evaluation for hygiene cleanup
if result.flag_key do
# We spawn to ensure non-blocking, although ETS is fast, write-behind should never block.
# Alternatively, since our ETS is public and fast, we just call it directly.
Rulestead.Telemetry.Cache.record_evaluation(
result.flag_key,
context.environment || "default",
result.variant || "unknown",
DateTime.utc_now()
)
end
{:ok, result}
end
{result, eval_stop_metadata(result, flag_payload, context)}
end
)
rescue
error ->
reraise(error, __STACKTRACE__)
end
@doc """
Bang variant of `evaluate/3`.
"""
@spec evaluate!(map(), Context.t() | keyword() | map(), keyword()) :: Result.t()
def evaluate!(flag_payload, context, opts \\ []) do
flag_payload
|> evaluate(context, opts)
|> unwrap!()
end
@doc """
Returns the boolean enabled projection for an authored flag payload.
"""
@spec enabled?(map(), Context.t() | keyword() | map()) :: {:ok, boolean()} | {:error, Error.t()}
def enabled?(flag_payload, context) do
with {:ok, %Result{} = result} <- evaluate(flag_payload, context) do
{:ok, result.enabled?}
end
end
@doc """
Returns the projected value for an authored flag payload.
"""
@spec get_value(map(), Context.t() | keyword() | map(), term()) ::
{:ok, term()} | {:error, Error.t()}
def get_value(flag_payload, context, default) do
with {:ok, %Result{} = result} <- evaluate(flag_payload, context) do
value =
cond do
result.reason == :default and is_nil(result.value) -> default
is_nil(result.value) -> default
true -> result.value
end
{:ok, value}
end
end
@doc """
Returns the assigned variant key for an authored flag payload.
"""
@spec get_variant(map(), Context.t() | keyword() | map()) ::
{:ok, String.t() | nil} | {:error, Error.t()}
def get_variant(flag_payload, context) do
with {:ok, %Result{} = result} <- evaluate(flag_payload, context) do
{:ok, result.variant}
end
end
@doc """
Returns a human-readable explanation derived from the evaluation trace.
"""
@spec explain(map(), Context.t() | keyword() | map()) :: {:ok, String.t()} | {:error, Error.t()}
def explain(flag_payload, context) do
with {:ok, %Result{} = result} <- evaluate(flag_payload, context) do
{:ok, Explainer.explain(result.debug_trace)}
end
end
@doc """
Admin-safe runtime simulation for one flag and environment.
"""
@spec simulate_flag(
String.t() | atom(),
String.t() | atom(),
Context.t() | keyword() | map(),
keyword()
) ::
{:ok, map()} | {:error, Error.t()}
def simulate_flag(flag_key, environment_key, context, opts \\ []) do
actor = Keyword.get(opts, :actor)
with :ok <-
authorize_admin_read(
actor,
:simulate_flag,
%{resource_type: :flag, resource_key: flag_key},
environment_key
),
{:ok, result} <- Runtime.evaluate(environment_key, flag_key, context) do
redacted =
Redaction.redact_metadata(%{traits: Context.normalize(context).attributes},
allow: Keyword.get(opts, :allow, ["targeting_key"])
)
{:ok, %{result: result, redacted_context: redacted}}
end
end
@doc """
Admin-safe explain seam for one flag and environment.
"""
@spec explain_flag(
String.t() | atom(),
String.t() | atom(),
Context.t() | keyword() | map(),
keyword()
) ::
{:ok, map()} | {:error, Error.t()}
def explain_flag(flag_key, environment_key, context, opts \\ []) do
actor = Keyword.get(opts, :actor)
with :ok <-
authorize_admin_read(
actor,
:explain_flag,
%{resource_type: :flag, resource_key: flag_key},
environment_key
),
{:ok, explanation} <- Runtime.explain(environment_key, flag_key, context) do
redacted =
Redaction.redact_metadata(%{traits: Context.normalize(context).attributes},
allow: Keyword.get(opts, :allow, ["targeting_key"])
)
{:ok, %{explanation: explanation, redacted_context: redacted}}
end
end
@doc """
Returns bounded runtime diagnostics for the local node.
"""
@spec diagnostics() :: map()
def diagnostics, do: Runtime.diagnostics()
@doc """
Returns the bounded infrastructure health snapshot for the local node.
"""
@spec infrastructure_health() :: map()
def infrastructure_health, do: Runtime.Diagnostics.current().infrastructure_health
defp run_store(operation, args, command) do
case configured_store() do
{:ok, adapter} -> invoke_store(adapter, operation, args, command)
{:error, %Error{} = error} -> {:error, error}
end
end
defp configured_store do
case Application.get_env(:rulestead, :store) ||
Application.get_env(:rulestead, :store_adapter) do
nil ->
{:error, ConfigError.store_not_configured(metadata: %{config_key: "store"})}
adapter when is_atom(adapter) ->
ensure_adapter_module(adapter)
adapter_opts when is_list(adapter_opts) ->
adapter_opts
|> Keyword.get(:adapter)
|> normalize_configured_adapter(adapter_opts)
%{adapter: adapter} = config ->
normalize_configured_adapter(adapter, config)
%{module: adapter} = config ->
normalize_configured_adapter(adapter, config)
invalid ->
{:error,
ConfigError.store_adapter_invalid(metadata: %{configured_value: inspect(invalid)})}
end
end
defp normalize_configured_adapter(adapter, config) when is_atom(adapter) do
case ensure_adapter_module(adapter) do
{:ok, adapter} ->
{:ok, adapter}
{:error, error} ->
{:error,
Error.normalize(
Map.put(Map.from_struct(error), :metadata, %{configured_value: inspect(config)})
)}
end
end
defp normalize_configured_adapter(_adapter, config) do
{:error, ConfigError.store_adapter_invalid(metadata: %{configured_value: inspect(config)})}
end
defp ensure_adapter_module(adapter) when is_atom(adapter) do
cond do
not Code.ensure_loaded?(adapter) ->
{:error,
ConfigError.store_adapter_invalid(
metadata: %{adapter: inspect(adapter)},
details: [%{message: "module could not be loaded"}]
)}
true ->
{:ok, adapter}
end
end
defp invoke_store(adapter, operation, args, command) do
arity = length(args)
cond do
not function_exported?(adapter, operation, arity) ->
{:error,
ConfigError.store_adapter_invalid(
metadata: %{adapter: inspect(adapter), operation: Atom.to_string(operation)},
details: [%{message: "callback is not exported"}]
)}
true ->
do_invoke_store(adapter, operation, args, command)
end
end
defp do_invoke_store(adapter, operation, args, command) do
kind = store_event_kind(operation)
command = command || List.first(args)
Telemetry.span(
[:rulestead, :store, kind],
Telemetry.metadata(
Telemetry.command_metadata(command, %{operation: Atom.to_string(operation)})
),
fn ->
result =
adapter
|> apply(operation, args)
|> normalize_store_result(adapter, operation)
{result, store_stop_metadata(result, operation)}
end
)
rescue
error in [Error] ->
{:error, error}
exception ->
{:error,
StoreError.unavailable(
metadata: %{adapter: inspect(adapter), operation: Atom.to_string(operation)},
cause: exception
)}
end
defp normalize_store_result({:ok, value}, _adapter, _operation), do: {:ok, value}
defp normalize_store_result({:error, %Error{} = error}, _adapter, _operation),
do: {:error, error}
defp normalize_store_result(nil, adapter, operation) do
{:error,
StoreError.unavailable(
metadata: %{adapter: inspect(adapter), operation: Atom.to_string(operation)},
details: [%{message: "store adapters must not return nil"}]
)}
end
defp normalize_store_result(other, adapter, operation) do
{:error,
StoreError.unavailable(
metadata: %{adapter: inspect(adapter), operation: Atom.to_string(operation)},
details: [%{message: "store adapter returned an invalid response"}],
cause: other
)}
end
defp unwrap!({:ok, value}), do: value
defp unwrap!({:error, %Error{} = error}), do: raise(error)
defp normalize_eval_context(context, opts) do
context = Context.normalize(context)
if Keyword.has_key?(opts, :strict?) do
Context.normalize(Map.put(Map.from_struct(context), :strict?, Keyword.get(opts, :strict?)))
else
context
end
end
defp emit_warnings(%Result{debug_trace: %{warnings: warnings}} = result)
when is_list(warnings) do
Enum.each(warnings, fn warning ->
Telemetry.execute(
[:rulestead, :eval, :warning],
%{count: 1},
Telemetry.result_metadata(result, %{environment: result.debug_trace[:environment]}, %{
reason: warning[:type]
})
)
end)
end
defp emit_warnings(_result), do: :ok
defp eval_stop_metadata({:ok, %Result{} = result}, flag_payload, context) do
flag_payload
|> Telemetry.base_metadata(context)
|> Map.merge(Telemetry.result_metadata(result, context))
end
defp eval_stop_metadata({:error, %Error{} = error}, flag_payload, context) do
flag_payload
|> Telemetry.base_metadata(context)
|> Map.merge(%{reason: error.type, matched_rule_count: 0})
end
defp admin_stop_metadata({:ok, value}, command) do
command
|> Telemetry.command_metadata()
|> Map.merge(result_like_metadata(value))
|> Map.put(:reason, :ok)
end
defp admin_stop_metadata({:error, %Error{} = error}, command) do
command
|> Telemetry.command_metadata()
|> Map.put(:reason, error.type)
end
defp admin_write(operation, command) do
redacted_command = redact_command(command)
resource = command_resource(redacted_command)
action = command_action(operation, redacted_command)
Telemetry.span(
[:rulestead, :admin, :mutation],
Telemetry.metadata(
redacted_command
|> Telemetry.command_metadata(%{
operation: Atom.to_string(operation),
audit_action: Atom.to_string(action)
})
|> Map.merge(Telemetry.governance_metadata(redacted_command, %{action: action}))
),
fn ->
{result, executed_command} =
case authorize_admin_write(operation, redacted_command, action, resource) do
:ok ->
{run_store(operation, [redacted_command], redacted_command), redacted_command}
{:ok, authorized_command} ->
{run_store(operation, [authorized_command], authorized_command), authorized_command}
{:error, error} ->
{{:error, error}, redacted_command}
{:error, error, denied_audit} ->
maybe_persist_denied_mutation(operation, redacted_command, denied_audit)
{{:error, error}, redacted_command}
end
{result, admin_stop_metadata(result, executed_command)}
end
)
end
defp authorize_admin_write(:approve_change_request, command, _action, resource) do
with {:ok, change_request} <-
fetch_change_request_for_authorization(command.change_request_id) do
command
|> Map.get(:actor)
|> Authorizer.authorize_change_request_approval(
change_request.submitted_by,
change_request.action,
governance_change_request_resource(change_request, resource),
change_request.environment_key
)
|> normalize_governance_authorization()
end
end
defp authorize_admin_write(operation, command, action, resource)
when operation in [:reject_change_request, :cancel_change_request, :execute_change_request] do
with {:ok, change_request} <-
fetch_change_request_for_authorization(command.change_request_id) do
Authorizer.authorize(
Map.get(command, :actor),
action,
governance_change_request_resource(change_request, resource),
change_request.environment_key
)
end
end
defp authorize_admin_write(:submit_change_request, command, action, resource) do
Authorizer.authorize(
Map.get(command, :actor),
action,
resource,
governance_environment(command, Map.get(command, :metadata, %{}))
)
end
defp authorize_admin_write(:publish_ruleset, command, _action, resource) do
command
|> Map.get(:actor)
|> Authorizer.authorize_governed_action(
:publish_ruleset,
resource,
Map.get(command, :environment_key)
)
|> normalize_governance_authorization()
end
defp authorize_admin_write(
:schedule_governed_action,
%Command.ScheduleGovernedAction{} = command,
_action,
resource
) do
actor = Map.get(command, :actor)
environment_key = Map.get(command, :environment_key)
with :ok <- ensure_bounded_scheduled_action(command.action),
requirement <-
Authorizer.approval_requirement(actor, command.action, resource, environment_key),
{:ok, approval_requirement} <-
authorize_direct_scheduled_execution(
actor,
command,
resource,
environment_key,
requirement
) do
{:ok,
%{command | approval_requirement: ApprovalRequirement.serialize(approval_requirement)}}
end
end
defp authorize_admin_write(_operation, command, action, resource) do
Authorizer.authorize(
Map.get(command, :actor),
action,
resource,
Map.get(command, :environment_key)
)
end
defp admin_read(operation, command) do
action = command_action(operation, command)
with :ok <-
authorize_admin_read(
Map.get(command, :actor),
action,
command_resource(command),
Map.get(command, :environment_key)
) do
run_store(operation, [command], command)
end
end
defp authorize_admin_read(actor, action, resource, environment_key) do
case Authorizer.authorize(actor, action, resource, environment_key) do
:ok -> :ok
{:error, error, _denied_audit} -> {:error, error}
end
end
defp redact_command(command) do
allow = [
"request_id",
"source",
"reason",
"emergency_reason",
"targeting_key",
"plan",
"nested.region",
"webhook_provider",
"webhook_delivery_id",
"webhook_receipt_id",
"correlation_id",
"environment_key"
]
redacted_metadata = Redaction.redact_metadata(Map.get(command, :metadata, %{}), allow: allow)
Map.put(command, :metadata, redacted_metadata.audit)
end
defp command_resource(%Command.PreviewAudienceImpact{audience_key: audience_key}) do
%{resource_type: :audience, resource_key: audience_key}
end
defp command_resource(%Command.ApplyAudienceMutation{audience_key: audience_key}) do
%{resource_type: :audience, resource_key: audience_key}
end
defp command_resource(%Command.ListAudienceDependencies{audience_key: audience_key}) do
%{resource_type: :dependency_inventory, resource_key: audience_key || "*"}
end
defp command_resource(%Command.SubmitChangeRequest{
resource_type: resource_type,
resource_key: resource_key
}) do
%{resource_type: stringify_resource(resource_type), resource_key: resource_key}
end
defp command_resource(%Command.ScheduleGovernedAction{
resource_type: resource_type,
resource_key: resource_key
}) do
%{resource_type: stringify_resource(resource_type), resource_key: resource_key}
end
defp command_resource(%Command.RollbackAuditEvent{audit_event_id: audit_event_id}) do
%{resource_type: :audit_event, resource_key: audit_event_id}
end
defp command_resource(command)
when is_map(command) do
metadata = Map.get(command, :metadata, %{})
case {Map.get(metadata, "resource_type"), Map.get(metadata, "resource_key")} do
{nil, _} ->
%{resource_type: :flag, resource_key: Map.get(command, :flag_key)}
{resource_type, resource_key} ->
%{resource_type: stringify_resource(resource_type), resource_key: resource_key}
end
end
defp command_action(:submit_change_request, _command), do: :submit_change_request
defp command_action(:approve_change_request, _command), do: :approve_change_request
defp command_action(:reject_change_request, _command), do: :reject_change_request
defp command_action(:cancel_change_request, _command), do: :cancel_change_request
defp command_action(:execute_change_request, _command), do: :execute_change_request
defp command_action(:schedule_change_request, _command), do: :execute_change_request
defp command_action(:cancel_scheduled_execution, _command), do: :execute_change_request
defp command_action(:requeue_scheduled_execution, _command), do: :execute_change_request
defp command_action(:schedule_governed_action, %Command.ScheduleGovernedAction{action: action}),
do: action
defp command_action(:engage_kill_switch, _command), do: :engage_kill_switch
defp command_action(:release_kill_switch, _command), do: :release_kill_switch
defp command_action(:upsert_rollout_auto_advance_policy, _command), do: :advance_rollout
defp command_action(:fetch_guardrail_status, _command), do: :read_rollouts
defp command_action(:rollback_audit_event, _command), do: :rollback_audit_event
defp command_action(:list_audit_events, _command), do: :list_audit_events
defp command_action(:apply_promotion, _command), do: :promote_environment
defp command_action(:preview_audience_impact, _command), do: :preview_audience_impact
defp command_action(:list_audience_dependencies, _command), do: :list_audience_dependencies
defp command_action(:apply_audience_mutation, _command), do: :apply_audience_mutation
defp command_action(:create_webhook_destination, _command), do: :manage_webhooks
defp command_action(:update_webhook_destination, _command), do: :manage_webhooks
defp command_action(:list_webhook_destinations, _command), do: :manage_webhooks
defp command_action(:fetch_webhook_destination, _command), do: :manage_webhooks
defp command_action(:list_webhook_deliveries, _command), do: :manage_webhooks
defp command_action(:retry_webhook_delivery, _command), do: :manage_webhooks
defp command_action(operation, _command), do: operation
defp maybe_persist_denied_mutation(operation, command, denied_audit)
when operation in [
:save_draft_ruleset,
:publish_ruleset,
:advance_rollout,
:archive_flag,
:apply_audience_mutation,
:engage_kill_switch,
:release_kill_switch,
:rollback_audit_event
] do
denied_command =
command
|> Map.put(:reason, Map.get(command, :reason) || "unauthorized")
|> Map.put(
:metadata,
Map.merge(command.metadata, %{
audit_result: :denied,
denied_action: denied_audit.action,
denied_actor_id: get_in(denied_audit, [:actor, :id])
})
)
_ = run_store(operation, [denied_command], denied_command)
:ok
end
defp maybe_persist_denied_mutation(_operation, _command, _denied_audit), do: :ok
defp validate_audience_mutation_confirmation(%Command.ApplyAudienceMutation{} = command) do
cond do
blank?(command.preview_fingerprint) ->
{:error,
StoreError.invalid_command(
"audience mutation requires preview_fingerprint",
metadata: %{audience_key: command.audience_key, operation: command.operation}
)}
command.preview_schema_version != ImpactPreview.schema_version() ->
{:error,
StoreError.invalid_command(
"audience mutation requires current preview_schema_version",
metadata: %{
audience_key: command.audience_key,
operation: command.operation,
expected_preview_schema_version: ImpactPreview.schema_version(),
preview_schema_version: command.preview_schema_version
}
)}
command.operation in ["update", "archive", "delete_attempt"] and blank?(command.reason) ->
{:error,
StoreError.invalid_command(
"audience mutation requires reason",
metadata: %{audience_key: command.audience_key, operation: command.operation}
)}
true ->
:ok
end
end
defp blank?(nil), do: true
defp blank?(value) when is_binary(value), do: String.trim(value) == ""
defp blank?(_value), do: false
defp store_stop_metadata({:ok, value}, operation) do
value
|> result_like_metadata()
|> Map.put_new(:reason, store_success_reason(operation))
end
defp store_stop_metadata({:error, %Error{} = error}, _operation) do
%{reason: error.type}
end
defp result_like_metadata(value) when is_map(value) do
if Map.get(value, :__struct__) == Command.Page do
%{}
else
%{}
|> Map.put(:flag_key, get_in(value, [:flag, :key]))
|> Map.put(:flag_type, get_in(value, [:flag, :flag_type]))
|> Map.put(
:environment,
get_in(value, [:environment, :key]) || Map.get(value, :environment_key)
)
|> Map.put(
:snapshot_version,
Map.get(value, :version) || get_in(value, [:flag_environment, :active_ruleset_version])
)
end
end
defp result_like_metadata(_value), do: %{}
defp store_event_kind(operation)
when operation in [
:fetch_flag,
:fetch_snapshot,
:list_flags,
:list_environments,
:list_audiences,
:list_audience_dependencies
],
do: :read
defp store_event_kind(_operation), do: :write
defp store_success_reason(:fetch_snapshot), do: :fetched
defp store_success_reason(:fetch_flag), do: :fetched
defp store_success_reason(:list_flags), do: :listed
defp store_success_reason(:list_audience_dependencies), do: :listed
defp store_success_reason(_operation), do: :stored
defp governance_environment(command, metadata) do
Map.get(command, :environment_key) || Map.get(metadata, "environment_key")
end
defp fetch_change_request_for_authorization(change_request_id) do
case run_store(
:fetch_change_request,
[Command.FetchChangeRequest.new(change_request_id)],
nil
) do
{:ok, %{change_request: change_request}} -> {:ok, change_request}
{:error, %Error{} = error} -> {:error, error}
end
end
defp normalize_governance_authorization({:ok, _requirement}), do: :ok
defp normalize_governance_authorization({:error, error, denied_audit}),
do: {:error, error, denied_audit}
defp authorize_direct_scheduled_execution(
actor,
%Command.ScheduleGovernedAction{execution_mode: :policy_bypass, action: action} =
_command,
resource,
environment_key,
_requirement
) do
case Authorizer.authorize_governed_action(actor, action, resource, environment_key) do
{:ok, requirement} -> {:ok, requirement}
{:error, error, denied_audit} -> {:error, error, denied_audit}
end
end
defp authorize_direct_scheduled_execution(
actor,
%Command.ScheduleGovernedAction{
execution_mode: :emergency_bypass,
action: action,
metadata: metadata,
reason: reason
},
resource,
environment_key,
requirement
) do
with :ok <- ensure_emergency_metadata(reason, metadata),
:ok <- Authorizer.authorize(actor, action, resource, environment_key) do
{:ok, requirement}
end
end
defp authorize_direct_scheduled_execution(
_actor,
%Command.ScheduleGovernedAction{},
_resource,
_environment_key,
_requirement
) do
{:error,
StoreError.invalid_command(
"direct scheduled actions must use policy_bypass or emergency_bypass"
)}
end
defp ensure_bounded_scheduled_action(action) do
if action in Rulestead.Admin.Policy.governance_actions() do
:ok
else
{:error,
StoreError.invalid_command(
"scheduled direct actions are limited to publish, rollout, and kill-switch operations"
)}
end
end
defp ensure_emergency_metadata(reason, metadata) do
emergency_reason = Map.get(metadata, "emergency_reason")
cond do
is_nil(reason) or String.trim(reason) == "" ->
{:error,
StoreError.invalid_command("emergency_bypass requires an explicit operator reason")}
is_nil(emergency_reason) or String.trim(emergency_reason) == "" ->
{:error,
StoreError.invalid_command("emergency_bypass requires metadata[\"emergency_reason\"]")}
true ->
:ok
end
end
defp promotion_plan_status(compare) do
cond do
compare.flags == [] -> "no_changes"
Compare.protected_target?(compare.target_environment.key) -> "governance_required"
true -> "changes"
end
end
defp promotion_findings(compare) do
top_level =
Enum.map(compare.findings, fn finding ->
ManifestResult.finding(finding.code, finding.severity, compare.target_environment.key,
message: finding[:message]
)
end)
dependency_findings =
compare
|> Map.get(:dependency_findings, [])
|> Enum.map(fn finding ->
scope =
[
"source=#{finding.source_environment_key}",
"target=#{finding.target_environment_key}",
"tenant=#{finding.tenant_key || "global"}",
"flag=#{finding.flag_key}",
"ruleset=#{finding.ruleset_version}",
"rule=#{finding.rule_key}",
"audience=#{finding.audience_key}"
]
|> Enum.join("|")
ManifestResult.finding(finding.code, finding.severity, scope, message: finding[:message])
end)
per_flag =
Enum.flat_map(compare.flags, fn flag ->
Enum.map(flag.findings, fn finding ->
ManifestResult.finding(finding.code, finding.severity, flag.flag_key,
message: finding[:message]
)
end)
end)
ManifestResult.sort_findings(top_level ++ dependency_findings ++ per_flag)
end
defp validate_promotion_plan_mode(plan) do
if plan["mode"] == "promote" do
:ok
else
{:error, Manifest.invalid("apply plan is not a promote plan")}
end
end
defp require_promotion_reason(opts) do
case Manifest.normalize_string(Keyword.get(opts, :reason)) do
nil -> {:error, Manifest.invalid("promote apply requires an explicit reason")}
value -> {:ok, value}
end
end
defp promotion_apply_command(plan, reason, opts) do
Command.ApplyPromotion.new(
%{
source_environment_key: plan["source_environment_key"],
target_environment_key: plan["target_environment_key"],
tenant_key: plan["tenant_key"],
flag_keys: plan["flag_keys"],
compare_token: plan["compare_token"],
compare_schema_version: Compare.schema_version(),
source_fingerprint: plan["source_fingerprint"],
target_fingerprint: plan["target_fingerprint"],
dependency_closure_keys: plan["dependency_closure_keys"],
proposed_target_bundle: plan["proposed_target_bundle"]
},
actor: Keyword.get(opts, :actor, default_cli_actor()),
reason: reason,
metadata: Keyword.get(opts, :metadata, default_cli_metadata(plan))
)
end
defp dispatch_promotion_plan(plan, command, _reason, _opts) do
if Compare.protected_target?(plan["target_environment_key"]) or
plan["status"] == "governance_required" do
with :ok <- Apply.validate_governed(command),
approval_requirement <-
Authorizer.approval_requirement(
command.actor,
:promote_environment,
%{resource_type: "environment", resource_key: plan["target_environment_key"]},
plan["target_environment_key"]
),
{:ok, %{change_request: change_request}} <-
submit_change_request(
Command.SubmitChangeRequest.new(
%{
action: :promote_environment,
environment_key: plan["target_environment_key"],
resource_type: "environment",
resource_key: plan["target_environment_key"],
command: promotion_governance_command_payload(plan),
approval_requirement: approval_requirement
},
actor: command.actor,
reason: command.reason,
metadata: command.metadata
)
) do
{:ok,
ManifestResult.new(%{
status: "queued",
command: "rulestead.promote.apply",
summary: %{
"source_environment_key" => plan["source_environment_key"],
"target_environment_key" => plan["target_environment_key"],
"flag_count" => length(plan["flag_keys"]),
"plan_token" => plan["plan_token"],
"change_request_id" => change_request.id
},
details: %{
"plan" => plan,
"change_request" => Manifest.normalize_map(change_request)
}
})}
end
else
with {:ok, result} <- apply_promotion(command) do
{:ok,
ManifestResult.new(%{
status: "applied",
command: "rulestead.promote.apply",
summary: %{
"source_environment_key" => plan["source_environment_key"],
"target_environment_key" => plan["target_environment_key"],
"flag_count" => length(plan["flag_keys"]),
"plan_token" => plan["plan_token"]
},
details: %{
"plan" => plan,
"apply" => Manifest.normalize_map(result)
}
})}
end
end
end
defp promotion_governance_command_payload(plan) do
%{
"source_environment_key" => plan["source_environment_key"],
"target_environment_key" => plan["target_environment_key"],
"tenant_key" => plan["tenant_key"],
"flag_keys" => plan["flag_keys"],
"compare_token" => plan["compare_token"],
"compare_schema_version" => Compare.schema_version(),
"source_fingerprint" => plan["source_fingerprint"],
"target_fingerprint" => plan["target_fingerprint"],
"dependency_closure_keys" => plan["dependency_closure_keys"],
"proposed_target_bundle" => plan["proposed_target_bundle"]
}
end
defp stale_promotion_result(
content,
message \\ "saved promote plan no longer matches live compare state"
) do
with {:ok, plan} <- Plan.load(content) do
{:ok,
ManifestResult.new(%{
status: "stale",
command: "rulestead.promote.apply",
summary: %{
"target_environment_key" => plan["target_environment_key"],
"plan_token" => plan["plan_token"]
},
findings: [
ManifestResult.finding("stale_plan", "blocker", plan["target_environment_key"],
message: message
)
],
details: %{"plan" => plan}
})}
end
end
defp validate_target_tenant(plan, opts) do
live_tenant = Rulestead.Manifest.normalize_string(Keyword.get(opts, :tenant_key))
plan_tenant = plan["tenant_key"]
if live_tenant == plan_tenant do
:ok
else
{:error, StoreError.invalid_command("promotion target tenant drifted")}
end
end
defp map_promotion_apply_error(content, %Error{} = error) do
status =
cond do
String.contains?(error.message, "drifted") or String.contains?(error.message, "stale") ->
"stale"
String.contains?(error.message, "dependency") or
String.contains?(error.message, "blocker") ->
"blocked"
true ->
"invalid"
end
with {:ok, plan} <- Plan.load(content) do
{:ok,
ManifestResult.new(%{
status: status,
command: "rulestead.promote.apply",
summary: %{
"target_environment_key" => plan["target_environment_key"],
"plan_token" => plan["plan_token"]
},
findings: [
ManifestResult.finding(
promotion_status_code(status),
"blocker",
plan["target_environment_key"],
message: error.message
)
],
details: %{
"plan" => plan,
"error" => Manifest.normalize_map(%{message: error.message, type: error.type})
}
})}
end
end
defp promotion_status_code("stale"), do: "stale_plan"
defp promotion_status_code("blocked"), do: "blocked_promotion"
defp promotion_status_code(_status), do: "invalid_promotion"
defp default_cli_actor do
%{id: "rulestead-cli", type: "system", display: "Rulestead CLI", roles: [:admin]}
end
defp default_cli_metadata(plan) do
%{request_id: plan["plan_token"], source: :mix_task}
end
defp governance_change_request_resource(change_request, fallback_resource) do
case {Map.get(change_request, :resource_type), Map.get(change_request, :resource_key)} do
{nil, nil} ->
fallback_resource
{resource_type, resource_key} ->
%{resource_type: stringify_resource(resource_type), resource_key: resource_key}
end
end
defp stringify_resource(resource_type) when is_binary(resource_type) do
resource_type
|> String.trim()
|> case do
"" -> :flag
"flag" -> :flag
"audit_event" -> :audit_event
"ruleset" -> :ruleset
_ -> :flag
end
end
defp stringify_resource(resource_type) when is_atom(resource_type), do: resource_type
defp stringify_resource(_resource_type), do: :flag
end